Source code for compute.servers.test_novnc

# Copyright 2016-2017 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import struct
import urllib.parse as urlparse
import urllib3

from tempest.api.compute import base
from tempest.common import compute
from tempest import config
from tempest.lib import decorators

CONF = config.CONF


[docs] class NoVNCConsoleTestJSON(base.BaseV2ComputeTest): """Test novnc console""" create_default_network = True @classmethod def skip_checks(cls): super(NoVNCConsoleTestJSON, cls).skip_checks() if not CONF.compute_feature_enabled.vnc_console: raise cls.skipException('VNC Console feature is disabled.') def setUp(self): super(NoVNCConsoleTestJSON, self).setUp() self._websocket = None def tearDown(self): super(NoVNCConsoleTestJSON, self).tearDown() if self._websocket is not None: self._websocket.close() # NOTE(zhufl): Because server_check_teardown will raise Exception # which will prevent other cleanup steps from being executed, so # server_check_teardown should be called after super's tearDown. self.server_check_teardown() @classmethod def setup_clients(cls): super(NoVNCConsoleTestJSON, cls).setup_clients() cls.client = cls.servers_client @classmethod def resource_setup(cls): super(NoVNCConsoleTestJSON, cls).resource_setup() cls.server = cls.create_test_server(wait_until="ACTIVE") cls.use_get_remote_console = False if not cls.is_requested_microversion_compatible('2.5'): cls.use_get_remote_console = True def _validate_novnc_html(self, vnc_url): """Verify we can connect to novnc and get back the javascript.""" resp = urllib3.PoolManager().request('GET', vnc_url) # Make sure that the GET request was accepted by the novncproxy self.assertEqual(resp.status, 200, 'Got a Bad HTTP Response on the ' 'initial call: ' + str(resp.status)) # Do some basic validation to make sure it is an expected HTML document resp_data = resp.data.decode() # This is needed in the case of example: <html lang="en"> self.assertRegex(resp_data, '<html.*>', 'Not a valid html document in the response.') self.assertIn('</html>', resp_data, 'Not a valid html document in the response.') # Just try to make sure we got JavaScript back for noVNC, since we # won't actually use it since not inside of a browser self.assertIn('noVNC', resp_data, 'Not a valid noVNC javascript html document.') self.assertIn('<script', resp_data, 'Not a valid noVNC javascript html document.') def _validate_rfb_negotiation(self): """Verify we can connect to novnc and do the websocket connection.""" # Turn the Socket into a WebSocket to do the communication data = self._websocket.receive_frame() self.assertFalse(data is None or not data, 'Token must be invalid because the connection ' 'closed.') # Parse the RFB version from the data to make sure it is valid # and belong to the known supported RFB versions. version = float("%d.%d" % (int(data[4:7], base=10), int(data[8:11], base=10))) # Add the max RFB versions supported supported_versions = [3.3, 3.8] self.assertIn(version, supported_versions, 'Bad RFB Version: ' + str(version)) # Send our RFB version to the server self._websocket.send_frame(data) # Get the sever authentication type and make sure None is supported data = self._websocket.receive_frame() self.assertIsNotNone(data, 'Expected authentication type None.') data_length = len(data) if version == 3.3: # For RFB 3.3: in the security handshake, rather than a two-way # negotiation, the server decides the security type and sends a # single word(4 bytes). self.assertEqual( data_length, 4, 'Expected authentication type None.') self.assertIn(1, [int(data[i]) for i in (0, 3)], 'Expected authentication type None.') else: self.assertGreaterEqual( len(data), 2, 'Expected authentication type None.') self.assertIn( 1, [int(data[i + 1]) for i in range(int(data[0]))], 'Expected authentication type None.') # Send to the server that we only support authentication # type None self._websocket.send_frame(bytes((1,))) # The server should send 4 bytes of 0's if security # handshake succeeded data = self._websocket.receive_frame() self.assertEqual( len(data), 4, 'Server did not think security was successful.') self.assertEqual( [int(i) for i in data], [0, 0, 0, 0], 'Server did not think security was successful.') # Say to leave the desktop as shared as part of client initialization self._websocket.send_frame(bytes((1,))) # Get the server initialization packet back and make sure it is the # right structure where bytes 20-24 is the name length and # 24-N is the name data = self._websocket.receive_frame() data_length = len(data) if data is not None else 0 self.assertFalse(data_length <= 24 or data_length != (struct.unpack(">L", data[20:24])[0] + 24), 'Server initialization was not the right format.') # Since the rest of the data on the screen is arbitrary, we will # close the socket and end our validation of the data at this point # Assert that the latest check was false, meaning that the server # initialization was the right format self.assertFalse(data_length <= 24 or data_length != (struct.unpack(">L", data[20:24])[0] + 24)) def _validate_websocket_upgrade(self): """Verify that the websocket upgrade was successful. Parses response and ensures that required response fields are present and accurate. (https://tools.ietf.org/html/rfc7231#section-6.2.2) """ self.assertTrue( self._websocket.response.startswith(b'HTTP/1.1 101 Switching ' b'Protocols'), 'Incorrect HTTP return status code: {}'.format( str(self._websocket.response) ) ) _required_header = 'upgrade: websocket' _response = str(self._websocket.response).lower() self.assertIn( _required_header, _response, 'Did not get the expected WebSocket HTTP Response.' )
[docs] @decorators.idempotent_id('c640fdff-8ab4-45a4-a5d8-7e6146cbd0dc') def test_novnc(self): """Test accessing novnc console of server""" if self.use_get_remote_console: body = self.client.get_remote_console( self.server['id'], console_type='novnc', protocol='vnc')['remote_console'] else: body = self.client.get_vnc_console(self.server['id'], type='novnc')['console'] self.assertEqual('novnc', body['type']) # Do the initial HTTP Request to novncproxy to get the NoVNC JavaScript self._validate_novnc_html(body['url']) # Do the WebSockify HTTP Request to novncproxy to do the RFB connection self._websocket = compute.create_websocket(body['url']) # Validate that we successfully connected and upgraded to Web Sockets self._validate_websocket_upgrade() # Validate the RFB Negotiation to determine if a valid VNC session self._validate_rfb_negotiation()
[docs] @decorators.idempotent_id('f9c79937-addc-4aaa-9e0e-841eef02aeb7') def test_novnc_bad_token(self): """Test accessing novnc console with bad token Do the WebSockify HTTP Request to novnc proxy with a bad token, the novnc proxy should reject the connection and closed it. """ if self.use_get_remote_console: body = self.client.get_remote_console( self.server['id'], console_type='novnc', protocol='vnc')['remote_console'] else: body = self.client.get_vnc_console(self.server['id'], type='novnc')['console'] self.assertEqual('novnc', body['type']) # Do the WebSockify HTTP Request to novncproxy with a bad token parts = urlparse.urlparse(body['url']) qparams = urlparse.parse_qs(parts.query) if 'path' in qparams: qparams['path'] = urlparse.unquote(qparams['path'][0]).replace( 'token=', 'token=bad') elif 'token' in qparams: qparams['token'] = 'bad' + qparams['token'][0] new_query = urlparse.urlencode(qparams) new_parts = urlparse.ParseResult(parts.scheme, parts.netloc, parts.path, parts.params, new_query, parts.fragment) url = urlparse.urlunparse(new_parts) self._websocket = compute.create_websocket(url) # Make sure the novncproxy rejected the connection and closed it data = self._websocket.receive_frame() self.assertTrue(data is None or not data, "The novnc proxy actually sent us some data, but we " "expected it to close the connection.")