Source code for keystone.tests.unit.test_v2

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import time
import uuid

from keystoneclient.common import cms
from oslo_serialization import jsonutils
import six
from six.moves import http_client
from testtools import matchers

from keystone.common import extension as keystone_extension
import keystone.conf
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import rest
from keystone.tests.unit.schema import v2

CONF = keystone.conf.CONF


[docs]class CoreApiTests(object):
[docs] def assertValidError(self, error): self.assertIsNotNone(error.get('code')) self.assertIsNotNone(error.get('title')) self.assertIsNotNone(error.get('message'))
[docs] def assertValidVersion(self, version): self.assertIsNotNone(version) self.assertIsNotNone(version.get('id')) self.assertIsNotNone(version.get('status')) self.assertIsNotNone(version.get('updated'))
[docs] def assertValidExtension(self, extension): self.assertIsNotNone(extension) self.assertIsNotNone(extension.get('name')) self.assertIsNotNone(extension.get('namespace')) self.assertIsNotNone(extension.get('alias')) self.assertIsNotNone(extension.get('updated'))
[docs] def assertValidTenant(self, tenant): self.assertIsNotNone(tenant.get('id')) self.assertIsNotNone(tenant.get('name')) self.assertNotIn('domain_id', tenant) self.assertNotIn('parent_id', tenant)
[docs] def assertValidUser(self, user): self.assertIsNotNone(user.get('id')) self.assertIsNotNone(user.get('name'))
[docs] def assertValidRole(self, tenant): self.assertIsNotNone(tenant.get('id')) self.assertIsNotNone(tenant.get('name'))
[docs] def test_public_not_found(self): r = self.public_request( path='/%s' % uuid.uuid4().hex, expected_status=http_client.NOT_FOUND) self.assertValidErrorResponse(r)
[docs] def test_admin_not_found(self): r = self.admin_request( path='/%s' % uuid.uuid4().hex, expected_status=http_client.NOT_FOUND) self.assertValidErrorResponse(r)
[docs] def test_public_multiple_choice(self): r = self.public_request(path='/', expected_status=300) self.assertValidMultipleChoiceResponse(r)
[docs] def test_admin_multiple_choice(self): r = self.admin_request(path='/', expected_status=300) self.assertValidMultipleChoiceResponse(r)
[docs] def test_public_version(self): r = self.public_request(path='/v2.0/') self.assertValidVersionResponse(r)
[docs] def test_admin_version(self): r = self.admin_request(path='/v2.0/') self.assertValidVersionResponse(r)
[docs] def test_public_extensions(self): r = self.public_request(path='/v2.0/extensions') self.assertValidExtensionListResponse( r, keystone_extension.PUBLIC_EXTENSIONS)
[docs] def test_admin_extensions(self): r = self.admin_request(path='/v2.0/extensions') self.assertValidExtensionListResponse( r, keystone_extension.ADMIN_EXTENSIONS)
[docs] def test_admin_extensions_returns_not_found(self): self.admin_request(path='/v2.0/extensions/invalid-extension', expected_status=http_client.NOT_FOUND)
[docs] def test_public_osksadm_extension_returns_not_found(self): self.public_request(path='/v2.0/extensions/OS-KSADM', expected_status=http_client.NOT_FOUND)
[docs] def test_admin_osksadm_extension(self): r = self.admin_request(path='/v2.0/extensions/OS-KSADM') self.assertValidExtensionResponse( r, keystone_extension.ADMIN_EXTENSIONS)
[docs] def test_authenticate(self): r = self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'passwordCredentials': { 'username': self.user_foo['name'], 'password': self.user_foo['password'], }, 'tenantId': self.tenant_bar['id'], }, }, expected_status=http_client.OK) self.assertValidAuthenticationResponse(r, require_service_catalog=True)
[docs] def test_authenticate_unscoped(self): r = self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'passwordCredentials': { 'username': self.user_foo['name'], 'password': self.user_foo['password'], }, }, }, expected_status=http_client.OK) self.assertValidAuthenticationResponse(r)
[docs] def test_get_tenants_for_token(self): r = self.public_request(path='/v2.0/tenants', token=self.get_scoped_token()) self.assertValidTenantListResponse(r)
[docs] def test_validate_token(self): token = self.get_scoped_token() r = self.admin_request( path='/v2.0/tokens/%(token_id)s' % { 'token_id': token, }, token=token) self.assertValidAuthenticationResponse(r)
[docs] def test_invalid_token_returns_not_found(self): token = self.get_scoped_token() self.admin_request( path='/v2.0/tokens/%(token_id)s' % { 'token_id': 'invalid', }, token=token, expected_status=http_client.NOT_FOUND)
[docs] def test_validate_token_service_role(self): self.md_foobar = self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_service['id'], self.role_service['id']) token = self.get_scoped_token( tenant_id=default_fixtures.SERVICE_TENANT_ID) r = self.admin_request( path='/v2.0/tokens/%s' % token, token=token) self.assertValidAuthenticationResponse(r)
[docs] def test_remove_role_revokes_token(self): self.md_foobar = self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_service['id'], self.role_service['id']) token = self.get_scoped_token( tenant_id=default_fixtures.SERVICE_TENANT_ID) r = self.admin_request( path='/v2.0/tokens/%s' % token, token=token) self.assertValidAuthenticationResponse(r) self.assignment_api.remove_role_from_user_and_project( self.user_foo['id'], self.tenant_service['id'], self.role_service['id']) r = self.admin_request( path='/v2.0/tokens/%s' % token, token=token, expected_status=http_client.UNAUTHORIZED)
[docs] def test_validate_token_belongs_to(self): token = self.get_scoped_token() path = ('/v2.0/tokens/%s?belongsTo=%s' % (token, self.tenant_bar['id'])) r = self.admin_request(path=path, token=token) self.assertValidAuthenticationResponse(r, require_service_catalog=True)
[docs] def test_validate_token_no_belongs_to_still_returns_catalog(self): token = self.get_scoped_token() path = ('/v2.0/tokens/%s' % token) r = self.admin_request(path=path, token=token) self.assertValidAuthenticationResponse(r, require_service_catalog=True)
[docs] def test_validate_token_head(self): """The same call as above, except using HEAD. There's no response to validate here, but this is included for the sake of completely covering the core API. """ token = self.get_scoped_token() self.admin_request( method='HEAD', path='/v2.0/tokens/%(token_id)s' % { 'token_id': token, }, token=token, expected_status=http_client.OK)
[docs] def test_endpoints(self): token = self.get_scoped_token() r = self.admin_request( path='/v2.0/tokens/%(token_id)s/endpoints' % { 'token_id': token, }, token=token) self.assertValidEndpointListResponse(r)
[docs] def test_get_tenant(self): token = self.get_scoped_token() r = self.admin_request( path='/v2.0/tenants/%(tenant_id)s' % { 'tenant_id': self.tenant_bar['id'], }, token=token) self.assertValidTenantResponse(r)
[docs] def test_get_tenant_by_name(self): token = self.get_scoped_token() r = self.admin_request( path='/v2.0/tenants?name=%(tenant_name)s' % { 'tenant_name': self.tenant_bar['name'], }, token=token) self.assertValidTenantResponse(r)
[docs] def test_get_user_roles_with_tenant(self): token = self.get_scoped_token() r = self.admin_request( path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % { 'tenant_id': self.tenant_bar['id'], 'user_id': self.user_foo['id'], }, token=token) self.assertValidRoleListResponse(r)
[docs] def test_get_user_roles_without_tenant(self): token = self.get_scoped_token() self.admin_request( path='/v2.0/users/%(user_id)s/roles' % { 'user_id': self.user_foo['id'], }, token=token, expected_status=http_client.NOT_IMPLEMENTED)
[docs] def test_get_user(self): token = self.get_scoped_token() r = self.admin_request( path='/v2.0/users/%(user_id)s' % { 'user_id': self.user_foo['id'], }, token=token) self.assertValidUserResponse(r)
[docs] def test_get_user_by_name(self): token = self.get_scoped_token() r = self.admin_request( path='/v2.0/users?name=%(user_name)s' % { 'user_name': self.user_foo['name'], }, token=token) self.assertValidUserResponse(r)
[docs] def test_create_update_user_invalid_enabled_type(self): # Enforce usage of boolean for 'enabled' field token = self.get_scoped_token() # Test CREATE request r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'enabled': "False", }, }, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, # In JSON, 0|1 are not booleans 'enabled': 0, }, }, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) # Test UPDATE request path = '/v2.0/users/%(user_id)s' % { 'user_id': self.user_foo['id'], } r = self.admin_request( method='PUT', path=path, body={ 'user': { 'enabled': "False", }, }, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) r = self.admin_request( method='PUT', path=path, body={ 'user': { # In JSON, 0|1 are not booleans 'enabled': 1, }, }, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r)
[docs] def test_create_update_user_valid_enabled_type(self): # Enforce usage of boolean for 'enabled' field token = self.get_scoped_token() # Test CREATE request self.admin_request(method='POST', path='/v2.0/users', body={ 'user': { 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'enabled': False, }, }, token=token, expected_status=http_client.OK)
[docs] def test_error_response(self): """Trigger assertValidErrorResponse by convention.""" self.public_request(path='/v2.0/tenants', expected_status=http_client.UNAUTHORIZED)
[docs] def test_invalid_parameter_error_response(self): token = self.get_scoped_token() bad_body = { 'OS-KSADM:service%s' % uuid.uuid4().hex: { 'name': uuid.uuid4().hex, 'type': uuid.uuid4().hex, }, } res = self.admin_request(method='POST', path='/v2.0/OS-KSADM/services', body=bad_body, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(res) res = self.admin_request(method='POST', path='/v2.0/users', body=bad_body, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(res)
def _get_user_id(self, r): """Helper method to return user ID from a response. This needs to be overridden by child classes based on their content type. """ raise NotImplementedError() def _get_role_id(self, r): """Helper method to return a role ID from a response. This needs to be overridden by child classes based on their content type. """ raise NotImplementedError() def _get_role_name(self, r): """Helper method to return role NAME from a response. This needs to be overridden by child classes based on their content type. """ raise NotImplementedError() def _get_project_id(self, r): """Helper method to return project ID from a response. This needs to be overridden by child classes based on their content type. """ raise NotImplementedError()
[docs] def assertNoRoles(self, r): """Helper method to assert No Roles. This needs to be overridden by child classes based on their content type. """ raise NotImplementedError()
[docs] def test_update_user_tenant(self): token = self.get_scoped_token() # Create a new user r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'tenantId': self.tenant_bar['id'], 'enabled': True, }, }, token=token, expected_status=http_client.OK) user_id = self._get_user_id(r.result) # Check if member_role is in tenant_bar r = self.admin_request( path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % { 'project_id': self.tenant_bar['id'], 'user_id': user_id }, token=token, expected_status=http_client.OK) self.assertEqual(CONF.member_role_name, self._get_role_name(r.result)) # Create a new tenant r = self.admin_request( method='POST', path='/v2.0/tenants', body={ 'tenant': { 'name': 'test_update_user', 'description': 'A description ...', 'enabled': True, }, }, token=token, expected_status=http_client.OK) project_id = self._get_project_id(r.result) # Update user's tenant r = self.admin_request( method='PUT', path='/v2.0/users/%(user_id)s' % { 'user_id': user_id, }, body={ 'user': { 'tenantId': project_id, }, }, token=token, expected_status=http_client.OK) # 'member_role' should be in new_tenant r = self.admin_request( path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % { 'project_id': project_id, 'user_id': user_id }, token=token, expected_status=http_client.OK) self.assertEqual('_member_', self._get_role_name(r.result)) # 'member_role' should not be in tenant_bar any more r = self.admin_request( path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % { 'project_id': self.tenant_bar['id'], 'user_id': user_id }, token=token, expected_status=http_client.OK) self.assertNoRoles(r.result)
[docs] def test_update_user_with_invalid_tenant(self): token = self.get_scoped_token() # Create a new user r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': 'test_invalid_tenant', 'password': uuid.uuid4().hex, 'tenantId': self.tenant_bar['id'], 'enabled': True, }, }, token=token, expected_status=http_client.OK) user_id = self._get_user_id(r.result) # Update user with an invalid tenant r = self.admin_request( method='PUT', path='/v2.0/users/%(user_id)s' % { 'user_id': user_id, }, body={ 'user': { 'tenantId': 'abcde12345heha', }, }, token=token, expected_status=http_client.NOT_FOUND)
[docs] def test_update_user_with_invalid_tenant_no_prev_tenant(self): token = self.get_scoped_token() # Create a new user r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': 'test_invalid_tenant', 'password': uuid.uuid4().hex, 'enabled': True, }, }, token=token, expected_status=http_client.OK) user_id = self._get_user_id(r.result) # Update user with an invalid tenant r = self.admin_request( method='PUT', path='/v2.0/users/%(user_id)s' % { 'user_id': user_id, }, body={ 'user': { 'tenantId': 'abcde12345heha', }, }, token=token, expected_status=http_client.NOT_FOUND)
[docs] def test_update_user_with_old_tenant(self): token = self.get_scoped_token() # Create a new user r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'tenantId': self.tenant_bar['id'], 'enabled': True, }, }, token=token, expected_status=http_client.OK) user_id = self._get_user_id(r.result) # Check if member_role is in tenant_bar r = self.admin_request( path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % { 'project_id': self.tenant_bar['id'], 'user_id': user_id }, token=token, expected_status=http_client.OK) self.assertEqual(CONF.member_role_name, self._get_role_name(r.result)) # Update user's tenant with old tenant id r = self.admin_request( method='PUT', path='/v2.0/users/%(user_id)s' % { 'user_id': user_id, }, body={ 'user': { 'tenantId': self.tenant_bar['id'], }, }, token=token, expected_status=http_client.OK) # 'member_role' should still be in tenant_bar r = self.admin_request( path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % { 'project_id': self.tenant_bar['id'], 'user_id': user_id }, token=token, expected_status=http_client.OK) self.assertEqual('_member_', self._get_role_name(r.result))
[docs] def test_authenticating_a_user_with_no_password(self): token = self.get_scoped_token() username = uuid.uuid4().hex # create the user self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': username, 'enabled': True, }, }, token=token) # fail to authenticate r = self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'passwordCredentials': { 'username': username, 'password': 'password', }, }, }, expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r)
[docs] def test_www_authenticate_header(self): r = self.public_request( path='/v2.0/tenants', expected_status=http_client.UNAUTHORIZED) self.assertEqual('Keystone uri="http://localhost"', r.headers.get('WWW-Authenticate'))
[docs] def test_www_authenticate_header_host(self): test_url = 'http://%s:4187' % uuid.uuid4().hex self.config_fixture.config(public_endpoint=test_url) r = self.public_request( path='/v2.0/tenants', expected_status=http_client.UNAUTHORIZED) self.assertEqual('Keystone uri="%s"' % test_url, r.headers.get('WWW-Authenticate'))
[docs]class LegacyV2UsernameTests(object): """Test to show the broken username behavior in V2. The V2 API is documented to use `username` instead of `name`. The API forced used to use name and left the username to fall into the `extra` field. These tests ensure this behavior works so fixes to `username`/`name` will be backward compatible. """
[docs] def create_user(self, **user_attrs): """Create a users and returns the response object. :param user_attrs: attributes added to the request body (optional) """ token = self.get_scoped_token() body = { 'user': { 'name': uuid.uuid4().hex, 'enabled': True, }, } body['user'].update(user_attrs) return self.admin_request( method='POST', path='/v2.0/users', token=token, body=body, expected_status=http_client.OK)
[docs] def test_create_with_extra_username(self): """The response for creating a user will contain the extra fields.""" fake_username = uuid.uuid4().hex r = self.create_user(username=fake_username) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(fake_username, user.get('username'))
[docs] def test_get_returns_username_from_extra(self): """The response for getting a user will contain the extra fields.""" token = self.get_scoped_token() fake_username = uuid.uuid4().hex r = self.create_user(username=fake_username) id_ = self.get_user_attribute_from_response(r, 'id') r = self.admin_request(path='/v2.0/users/%s' % id_, token=token) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(fake_username, user.get('username'))
[docs] def test_update_returns_new_username_when_adding_username(self): """The response for updating a user will contain the extra fields. This is specifically testing for updating a username when a value was not previously set. """ token = self.get_scoped_token() r = self.create_user() id_ = self.get_user_attribute_from_response(r, 'id') name = self.get_user_attribute_from_response(r, 'name') enabled = self.get_user_attribute_from_response(r, 'enabled') r = self.admin_request( method='PUT', path='/v2.0/users/%s' % id_, token=token, body={ 'user': { 'name': name, 'username': 'new_username', 'enabled': enabled, }, }, expected_status=http_client.OK) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual('new_username', user.get('username'))
[docs] def test_update_returns_new_username_when_updating_username(self): """The response for updating a user will contain the extra fields. This tests updating a username that was previously set. """ token = self.get_scoped_token() r = self.create_user(username='original_username') id_ = self.get_user_attribute_from_response(r, 'id') name = self.get_user_attribute_from_response(r, 'name') enabled = self.get_user_attribute_from_response(r, 'enabled') r = self.admin_request( method='PUT', path='/v2.0/users/%s' % id_, token=token, body={ 'user': { 'name': name, 'username': 'new_username', 'enabled': enabled, }, }, expected_status=http_client.OK) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual('new_username', user.get('username'))
[docs] def test_username_is_always_returned_create(self): """Username is set as the value of name if no username is provided. This matches the v2.0 spec where we really should be using username and not name. """ r = self.create_user() self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(user.get('name'), user.get('username'))
[docs] def test_username_is_always_returned_get(self): """Username is set as the value of name if no username is provided. This matches the v2.0 spec where we really should be using username and not name. """ token = self.get_scoped_token() r = self.create_user() id_ = self.get_user_attribute_from_response(r, 'id') r = self.admin_request(path='/v2.0/users/%s' % id_, token=token) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(user.get('name'), user.get('username'))
[docs] def test_username_is_always_returned_get_by_name(self): """Username is set as the value of name if no username is provided. This matches the v2.0 spec where we really should be using username and not name. """ token = self.get_scoped_token() r = self.create_user() name = self.get_user_attribute_from_response(r, 'name') r = self.admin_request(path='/v2.0/users?name=%s' % name, token=token) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(user.get('name'), user.get('username'))
[docs] def test_username_is_always_returned_update_no_username_provided(self): """Username is set as the value of name if no username is provided. This matches the v2.0 spec where we really should be using username and not name. """ token = self.get_scoped_token() r = self.create_user() id_ = self.get_user_attribute_from_response(r, 'id') name = self.get_user_attribute_from_response(r, 'name') enabled = self.get_user_attribute_from_response(r, 'enabled') r = self.admin_request( method='PUT', path='/v2.0/users/%s' % id_, token=token, body={ 'user': { 'name': name, 'enabled': enabled, }, }, expected_status=http_client.OK) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(user.get('name'), user.get('username'))
[docs] def test_updated_username_is_returned(self): """Username is set as the value of name if no username is provided. This matches the v2.0 spec where we really should be using username and not name. """ token = self.get_scoped_token() r = self.create_user() id_ = self.get_user_attribute_from_response(r, 'id') name = self.get_user_attribute_from_response(r, 'name') enabled = self.get_user_attribute_from_response(r, 'enabled') r = self.admin_request( method='PUT', path='/v2.0/users/%s' % id_, token=token, body={ 'user': { 'name': name, 'enabled': enabled, }, }, expected_status=http_client.OK) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(user.get('name'), user.get('username'))
[docs] def test_username_can_be_used_instead_of_name_create(self): token = self.get_scoped_token() r = self.admin_request( method='POST', path='/v2.0/users', token=token, body={ 'user': { 'username': uuid.uuid4().hex, 'enabled': True, }, }, expected_status=http_client.OK) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(user.get('name'), user.get('username'))
[docs] def test_username_can_be_used_instead_of_name_update(self): token = self.get_scoped_token() r = self.create_user() id_ = self.get_user_attribute_from_response(r, 'id') new_username = uuid.uuid4().hex enabled = self.get_user_attribute_from_response(r, 'enabled') r = self.admin_request( method='PUT', path='/v2.0/users/%s' % id_, token=token, body={ 'user': { 'username': new_username, 'enabled': enabled, }, }, expected_status=http_client.OK) self.assertValidUserResponse(r) user = self.get_user_from_response(r) self.assertEqual(new_username, user.get('name')) self.assertEqual(user.get('name'), user.get('username'))
[docs]class RestfulTestCase(rest.RestfulTestCase):
[docs] def setUp(self): super(RestfulTestCase, self).setUp() # TODO(termie): add an admin user to the fixtures and use that user # override the fixtures, for now self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id'])
[docs]class V2TestCase(object):
[docs] def config_overrides(self): super(V2TestCase, self).config_overrides() self.config_fixture.config( group='catalog', driver='templated', template_file=unit.dirs.tests('default_catalog.templates'))
def _get_user_id(self, r): return r['user']['id'] def _get_role_name(self, r): return r['roles'][0]['name'] def _get_role_id(self, r): return r['roles'][0]['id'] def _get_project_id(self, r): return r['tenant']['id'] def _get_token_id(self, r): return r.result['access']['token']['id']
[docs] def assertNoRoles(self, r): self.assertEqual([], r['roles'])
[docs] def assertValidErrorResponse(self, r): self.assertIsNotNone(r.result.get('error')) self.assertValidError(r.result['error']) self.assertEqual(r.result['error']['code'], r.status_code)
[docs] def assertValidExtension(self, extension, expected): super(V2TestCase, self).assertValidExtension(extension) descriptions = [ext['description'] for ext in expected.values()] description = extension.get('description') self.assertIsNotNone(description) self.assertIn(description, descriptions) self.assertIsNotNone(extension.get('links')) self.assertNotEmpty(extension.get('links')) for link in extension.get('links'): self.assertValidExtensionLink(link)
[docs] def assertValidExtensionListResponse(self, r, expected): self.assertIsNotNone(r.result.get('extensions')) self.assertIsNotNone(r.result['extensions'].get('values')) self.assertNotEmpty(r.result['extensions'].get('values')) for extension in r.result['extensions']['values']: self.assertValidExtension(extension, expected)
[docs] def assertValidExtensionResponse(self, r, expected): self.assertValidExtension(r.result.get('extension'), expected)
[docs] def assertValidUser(self, user): super(V2TestCase, self).assertValidUser(user) self.assertNotIn('default_project_id', user) if 'tenantId' in user: # NOTE(morganfainberg): tenantId should never be "None", it gets # filtered out of the object if it is there. This is suspenders # and a belt check to avoid unintended regressions. self.assertIsNotNone(user.get('tenantId'))
[docs] def assertValidAuthenticationResponse(self, r, require_service_catalog=False): self.assertIsNotNone(r.result.get('access')) self.assertIsNotNone(r.result['access'].get('token')) self.assertIsNotNone(r.result['access'].get('user')) # validate token self.assertIsNotNone(r.result['access']['token'].get('id')) self.assertIsNotNone(r.result['access']['token'].get('expires')) tenant = r.result['access']['token'].get('tenant') if tenant is not None: # validate tenant self.assertIsNotNone(tenant.get('id')) self.assertIsNotNone(tenant.get('name')) # validate user self.assertIsNotNone(r.result['access']['user'].get('id')) self.assertIsNotNone(r.result['access']['user'].get('name')) if require_service_catalog: # roles are only provided with a service catalog roles = r.result['access']['user'].get('roles') self.assertNotEmpty(roles) for role in roles: self.assertIsNotNone(role.get('name')) serviceCatalog = r.result['access'].get('serviceCatalog') # validate service catalog if require_service_catalog: self.assertIsNotNone(serviceCatalog) if serviceCatalog is not None: self.assertIsInstance(serviceCatalog, list) if require_service_catalog: self.assertNotEmpty(serviceCatalog) for service in r.result['access']['serviceCatalog']: # validate service self.assertIsNotNone(service.get('name')) self.assertIsNotNone(service.get('type')) # services contain at least one endpoint self.assertIsNotNone(service.get('endpoints')) self.assertNotEmpty(service['endpoints']) for endpoint in service['endpoints']: # validate service endpoint self.assertIsNotNone(endpoint.get('publicURL'))
[docs] def assertValidTenantListResponse(self, r): self.assertIsNotNone(r.result.get('tenants')) self.assertNotEmpty(r.result['tenants']) for tenant in r.result['tenants']: self.assertValidTenant(tenant) self.assertIsNotNone(tenant.get('enabled')) self.assertIn(tenant.get('enabled'), [True, False])
[docs] def assertValidUserResponse(self, r): self.assertIsNotNone(r.result.get('user')) self.assertValidUser(r.result['user'])
[docs] def assertValidTenantResponse(self, r): self.assertIsNotNone(r.result.get('tenant')) self.assertValidTenant(r.result['tenant'])
[docs] def assertValidRoleListResponse(self, r): self.assertIsNotNone(r.result.get('roles')) self.assertNotEmpty(r.result['roles']) for role in r.result['roles']: self.assertValidRole(role)
[docs] def assertValidVersion(self, version): super(V2TestCase, self).assertValidVersion(version) self.assertIsNotNone(version.get('links')) self.assertNotEmpty(version.get('links')) for link in version.get('links'): self.assertIsNotNone(link.get('rel')) self.assertIsNotNone(link.get('href')) self.assertIsNotNone(version.get('media-types')) self.assertNotEmpty(version.get('media-types')) for media in version.get('media-types'): self.assertIsNotNone(media.get('base')) self.assertIsNotNone(media.get('type'))
[docs] def assertValidMultipleChoiceResponse(self, r): self.assertIsNotNone(r.result.get('versions')) self.assertIsNotNone(r.result['versions'].get('values')) self.assertNotEmpty(r.result['versions']['values']) for version in r.result['versions']['values']: self.assertValidVersion(version)
[docs] def assertValidVersionResponse(self, r): self.assertValidVersion(r.result.get('version'))
[docs] def assertValidEndpointListResponse(self, r): self.assertIsNotNone(r.result.get('endpoints')) self.assertNotEmpty(r.result['endpoints']) for endpoint in r.result['endpoints']: self.assertIsNotNone(endpoint.get('id')) self.assertIsNotNone(endpoint.get('name')) self.assertIsNotNone(endpoint.get('type')) self.assertIsNotNone(endpoint.get('publicURL')) self.assertIsNotNone(endpoint.get('internalURL')) self.assertIsNotNone(endpoint.get('adminURL'))
[docs] def get_user_from_response(self, r): return r.result.get('user')
[docs] def get_user_attribute_from_response(self, r, attribute_name): return r.result['user'][attribute_name]
[docs] def test_service_crud_requires_auth(self): """Service CRUD should return unauthorized without an X-Auth-Token.""" # values here don't matter because it will be unauthorized before # they're checked (bug 1006822). service_path = '/v2.0/OS-KSADM/services/%s' % uuid.uuid4().hex service_body = { 'OS-KSADM:service': { 'name': uuid.uuid4().hex, 'type': uuid.uuid4().hex, }, } r = self.admin_request(method='GET', path='/v2.0/OS-KSADM/services', expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) r = self.admin_request(method='POST', path='/v2.0/OS-KSADM/services', body=service_body, expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) r = self.admin_request(method='GET', path=service_path, expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) r = self.admin_request(method='DELETE', path=service_path, expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r)
[docs] def test_user_role_list_requires_auth(self): """User role list return unauthorized without an X-Auth-Token.""" # values here don't matter because it will be unauthorized before # they're checked (bug 1006815). path = '/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % { 'tenant_id': uuid.uuid4().hex, 'user_id': uuid.uuid4().hex, } r = self.admin_request(path=path, expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r)
[docs] def test_fetch_revocation_list_nonadmin_fails(self): self.admin_request( method='GET', path='/v2.0/tokens/revoked', expected_status=http_client.UNAUTHORIZED)
[docs] def test_fetch_revocation_list_admin_200(self): token = self.get_scoped_token() r = self.admin_request( method='GET', path='/v2.0/tokens/revoked', token=token, expected_status=http_client.OK) self.assertValidRevocationListResponse(r)
[docs] def assertValidRevocationListResponse(self, response): self.assertIsNotNone(response.result['signed'])
def _fetch_parse_revocation_list(self): token1 = self.get_scoped_token() # TODO(morganfainberg): Because this is making a restful call to the # app a change to UTCNOW via mock.patch will not affect the returned # token. The only surefire way to ensure there is not a transient bug # based upon when the second token is issued is with a sleep. This # issue all stems from the limited resolution (no microseconds) on the # expiry time of tokens and the way revocation events utilizes token # expiry to revoke individual tokens. This is a stop-gap until all # associated issues with resolution on expiration and revocation events # are resolved. time.sleep(1) token2 = self.get_scoped_token() self.admin_request(method='DELETE', path='/v2.0/tokens/%s' % token2, token=token1) r = self.admin_request( method='GET', path='/v2.0/tokens/revoked', token=token1, expected_status=http_client.OK) signed_text = r.result['signed'] data_json = cms.cms_verify(signed_text, CONF.signing.certfile, CONF.signing.ca_certs) data = jsonutils.loads(data_json) return (data, token2)
[docs] def test_fetch_revocation_list_md5(self): """Hash for tokens in revocation list and server config should match. If the server is configured for md5, then the revocation list has tokens hashed with MD5. """ # The default hash algorithm is md5. hash_algorithm = 'md5' (data, token) = self._fetch_parse_revocation_list() token_hash = cms.cms_hash_token(token, mode=hash_algorithm) self.assertThat(token_hash, matchers.Equals(data['revoked'][0]['id']))
[docs] def test_fetch_revocation_list_sha256(self): """Hash for tokens in revocation list and server config should match. If the server is configured for sha256, then the revocation list has tokens hashed with SHA256. """ hash_algorithm = 'sha256' self.config_fixture.config(group='token', hash_algorithm=hash_algorithm) (data, token) = self._fetch_parse_revocation_list() token_hash = cms.cms_hash_token(token, mode=hash_algorithm) self.assertThat(token_hash, matchers.Equals(data['revoked'][0]['id']))
[docs] def test_create_update_user_invalid_enabled_type(self): # Enforce usage of boolean for 'enabled' field token = self.get_scoped_token() # Test CREATE request r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, # In JSON, "true|false" are not boolean 'enabled': "true", }, }, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) # Test UPDATE request r = self.admin_request( method='PUT', path='/v2.0/users/%(user_id)s' % { 'user_id': self.user_foo['id'], }, body={ 'user': { # In JSON, "true|false" are not boolean 'enabled': "true", }, }, token=token, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r)
[docs] def test_authenticating_a_user_with_an_OSKSADM_password(self): token = self.get_scoped_token() username = uuid.uuid4().hex password = uuid.uuid4().hex # create the user r = self.admin_request( method='POST', path='/v2.0/users', body={ 'user': { 'name': username, 'OS-KSADM:password': password, 'enabled': True, }, }, token=token) # successfully authenticate self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'passwordCredentials': { 'username': username, 'password': password, }, }, }, expected_status=http_client.OK) # ensure password doesn't leak user_id = r.result['user']['id'] r = self.admin_request( method='GET', path='/v2.0/users/%s' % user_id, token=token, expected_status=http_client.OK) self.assertNotIn('OS-KSADM:password', r.result['user'])
[docs] def test_updating_a_user_with_an_OSKSADM_password(self): token = self.get_scoped_token() user_id = self.user_foo['id'] password = uuid.uuid4().hex # update the user self.admin_request( method='PUT', path='/v2.0/users/%s/OS-KSADM/password' % user_id, body={ 'user': { 'password': password, }, }, token=token, expected_status=http_client.OK) # successfully authenticate self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'passwordCredentials': { 'username': self.user_foo['name'], 'password': password, }, }, }, expected_status=http_client.OK)
[docs] def test_enable_or_disable_user(self): token = self.get_scoped_token() user_id = self.user_badguy['id'] self.assertFalse(self.user_badguy['enabled']) def _admin_request(body, status): resp = self.admin_request( method='PUT', path='/v2.0/users/%s/OS-KSADM/enabled' % user_id, token=token, body=body, expected_status=status) return resp # Enable the user. body = {'user': {'enabled': True}} resp = _admin_request(body, http_client.OK) self.assertTrue(resp.json['user']['enabled']) # Disable the user. body = {'user': {'enabled': False}} resp = _admin_request(body, http_client.OK) self.assertFalse(resp.json['user']['enabled']) # Attributes other than `enabled` should still work due to bug 1607751 body = { 'user': { 'description': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True } } _admin_request(body, http_client.OK) # `enabled` is boolean, type other than boolean is not allowed. body = {'user': {'enabled': uuid.uuid4().hex}} _admin_request(body, http_client.BAD_REQUEST)
[docs]class V2TestCaseUUID(V2TestCase, RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
[docs] def config_overrides(self): super(V2TestCaseUUID, self).config_overrides() self.config_fixture.config(group='token', provider='uuid')
[docs]class V2TestCaseFernet(V2TestCase, RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
[docs] def config_overrides(self): super(V2TestCaseFernet, self).config_overrides() self.config_fixture.config(group='token', provider='fernet') self.useFixture( ksfixtures.KeyRepository( self.config_fixture, 'fernet_tokens', CONF.fernet_tokens.max_active_keys ) )
[docs] def test_fetch_revocation_list_md5(self): self.skipTest('Revocation lists do not support Fernet')
[docs] def test_fetch_revocation_list_sha256(self): self.skipTest('Revocation lists do not support Fernet')
[docs]class RevokeApiTestCase(V2TestCase, RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
[docs] def config_overrides(self): super(RevokeApiTestCase, self).config_overrides() self.config_fixture.config( group='token', provider='pki', revoke_by_id=False)
[docs] def test_fetch_revocation_list_admin_200(self): self.skip_test_overrides('Revoke API disables revocation_list.')
[docs] def test_fetch_revocation_list_md5(self): self.skip_test_overrides('Revoke API disables revocation_list.')
[docs] def test_fetch_revocation_list_sha256(self): self.skip_test_overrides('Revoke API disables revocation_list.')
[docs]class TestFernetTokenProviderV2(RestfulTestCase):
[docs] def setUp(self): super(TestFernetTokenProviderV2, self).setUp() # Add catalog data self.region = unit.new_region_ref() self.region_id = self.region['id'] self.catalog_api.create_region(self.region) self.service = unit.new_service_ref() self.service_id = self.service['id'] self.catalog_api.create_service(self.service_id, self.service) self.endpoint = unit.new_endpoint_ref(service_id=self.service_id, interface='public', region_id=self.region_id) self.endpoint_id = self.endpoint['id'] self.catalog_api.create_endpoint(self.endpoint_id, self.endpoint)
[docs] def assertValidUnscopedTokenResponse(self, r): v2.unscoped_validator.validate(r.json['access'])
[docs] def assertValidScopedTokenResponse(self, r): v2.scoped_validator.validate(r.json['access']) # Used by RestfulTestCase
def _get_token_id(self, r): return r.result['access']['token']['id']
[docs] def new_project_ref(self): return {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'description': uuid.uuid4().hex, 'domain_id': 'default', 'enabled': True}
[docs] def config_overrides(self): super(TestFernetTokenProviderV2, self).config_overrides() self.config_fixture.config(group='token', provider='fernet') self.useFixture( ksfixtures.KeyRepository( self.config_fixture, 'fernet_tokens', CONF.fernet_tokens.max_active_keys ) )
[docs] def test_authenticate_unscoped_token(self): unscoped_token = self.get_unscoped_token() # Fernet token must be of length 255 per usability requirements self.assertLess(len(unscoped_token), 255)
[docs] def test_validate_unscoped_token(self): # Grab an admin token to validate with project_ref = self.new_project_ref() self.resource_api.create_project(project_ref['id'], project_ref) self.assignment_api.add_role_to_user_and_project(self.user_foo['id'], project_ref['id'], self.role_admin['id']) admin_token = self.get_scoped_token(tenant_id=project_ref['id']) unscoped_token = self.get_unscoped_token() path = ('/v2.0/tokens/%s' % unscoped_token) resp = self.admin_request( method='GET', path=path, token=admin_token, expected_status=http_client.OK) self.assertValidUnscopedTokenResponse(resp)
[docs] def test_authenticate_scoped_token(self): project_ref = self.new_project_ref() self.resource_api.create_project(project_ref['id'], project_ref) self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], project_ref['id'], self.role_service['id']) token = self.get_scoped_token(tenant_id=project_ref['id']) # Fernet token must be of length 255 per usability requirements self.assertLess(len(token), 255)
[docs] def test_validate_scoped_token(self): project_ref = self.new_project_ref() self.resource_api.create_project(project_ref['id'], project_ref) self.assignment_api.add_role_to_user_and_project(self.user_foo['id'], project_ref['id'], self.role_admin['id']) project2_ref = self.new_project_ref() self.resource_api.create_project(project2_ref['id'], project2_ref) self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], project2_ref['id'], self.role_member['id']) admin_token = self.get_scoped_token(tenant_id=project_ref['id']) member_token = self.get_scoped_token(tenant_id=project2_ref['id']) path = ('/v2.0/tokens/%s?belongsTo=%s' % (member_token, project2_ref['id'])) # Validate token belongs to project resp = self.admin_request( method='GET', path=path, token=admin_token, expected_status=http_client.OK) self.assertValidScopedTokenResponse(resp)
[docs] def test_token_authentication_and_validation(self): """Test token authentication for Fernet token provider. Verify that token authentication returns validate response code and valid token belongs to project. """ project_ref = self.new_project_ref() self.resource_api.create_project(project_ref['id'], project_ref) unscoped_token = self.get_unscoped_token() self.assignment_api.add_role_to_user_and_project(self.user_foo['id'], project_ref['id'], self.role_admin['id']) token_id = unscoped_token if six.PY2: token_id = token_id.encode('ascii') r = self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'tenantName': project_ref['name'], 'token': { 'id': token_id, } } }, expected_status=http_client.OK) token_id = self._get_token_id(r) path = ('/v2.0/tokens/%s?belongsTo=%s' % (token_id, project_ref['id'])) # Validate token belongs to project resp = self.admin_request( method='GET', path=path, token=self.get_admin_token(), expected_status=http_client.OK) self.assertValidScopedTokenResponse(resp)
[docs] def test_rescoped_tokens_maintain_original_expiration(self): project_ref = self.new_project_ref() self.resource_api.create_project(project_ref['id'], project_ref) self.assignment_api.add_role_to_user_and_project(self.user_foo['id'], project_ref['id'], self.role_admin['id']) resp = self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'tenantName': project_ref['name'], 'passwordCredentials': { 'username': self.user_foo['name'], 'password': self.user_foo['password'] } } }, # NOTE(lbragstad): This test may need to be refactored if Keystone # decides to disallow rescoping using a scoped token. expected_status=http_client.OK) original_token = resp.result['access']['token']['id'] original_expiration = resp.result['access']['token']['expires'] resp = self.public_request( method='POST', path='/v2.0/tokens', body={ 'auth': { 'tenantName': project_ref['name'], 'token': { 'id': original_token, } } }, expected_status=http_client.OK) rescoped_token = resp.result['access']['token']['id'] rescoped_expiration = resp.result['access']['token']['expires'] self.assertNotEqual(original_token, rescoped_token) self.assertEqual(original_expiration, rescoped_expiration) self.assertValidScopedTokenResponse(resp)

Project Source