Source code for keystone.tests.unit.test_v3_trust

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import uuid

from six.moves import http_client

import keystone.conf
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import test_v3

CONF = keystone.conf.CONF


[docs]class TestTrustOperations(test_v3.RestfulTestCase): """Test module for create, read, update and delete operations on trusts. This module is specific to tests for trust CRUD operations. All other tests related to trusts that are authentication or authorization specific should live in the keystone/tests/unit/test_v3_auth.py module. """
[docs] def setUp(self): super(TestTrustOperations, self).setUp() # create a trustee to delegate stuff to self.trustee_user = unit.create_user(self.identity_api, domain_id=self.domain_id) self.trustee_user_id = self.trustee_user['id']
[docs] def test_create_trust_bad_request(self): # The server returns a 403 Forbidden rather than a 400 Bad Request, see # bug 1133435 self.post('/OS-TRUST/trusts', body={'trust': {}}, expected_status=http_client.FORBIDDEN)
[docs] def test_trust_crud(self): # create a new trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_ids=[self.role_id]) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r, ref) # get the trust r = self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) self.assertValidTrustResponse(r, ref) # validate roles on the trust r = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles' % { 'trust_id': trust['id']}) roles = self.assertValidRoleListResponse(r, self.role) self.assertIn(self.role['id'], [x['id'] for x in roles]) self.head( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}, expected_status=http_client.OK) r = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}) self.assertValidRoleResponse(r, self.role) # list all trusts r = self.get('/OS-TRUST/trusts') self.assertValidTrustListResponse(r, trust) # trusts are immutable self.patch( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, body={'trust': ref}, expected_status=http_client.NOT_FOUND) # delete the trust self.delete( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) # ensure the trust is not found self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, expected_status=http_client.NOT_FOUND)
[docs] def test_list_trusts(self): # create three trusts with the same trustor and trustee ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) for i in range(3): ref['expires_at'] = datetime.datetime.utcnow().replace( year=2032).strftime(unit.TIME_FORMAT) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) self.assertValidTrustResponse(r, ref) # list all trusts r = self.get('/OS-TRUST/trusts') trusts = r.result['trusts'] self.assertEqual(3, len(trusts)) self.assertValidTrustListResponse(r) # list all trusts for the trustor r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.user_id) trusts = r.result['trusts'] self.assertEqual(3, len(trusts)) self.assertValidTrustListResponse(r) # list all trusts as the trustor as the trustee. r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' % self.user_id) trusts = r.result['trusts'] self.assertEqual(0, len(trusts)) # list all trusts as the trustee is forbidden r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' % self.trustee_user_id, expected_status=http_client.FORBIDDEN)
[docs] def test_delete_trust(self): # create a trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r, ref) # delete the trust self.delete('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}) # ensure the trust isn't found self.get('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, expected_status=http_client.NOT_FOUND)
[docs] def test_create_trust_without_trustee_returns_bad_request(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_ids=[self.role_id]) # trustee_user_id is required to create a trust del ref['trustee_user_id'] self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.BAD_REQUEST)
[docs] def test_create_trust_without_impersonation_returns_bad_request(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_ids=[self.role_id]) # impersonation is required to create a trust del ref['impersonation'] self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.BAD_REQUEST)
[docs] def test_create_trust_with_bad_remaining_uses_returns_bad_request(self): # negative numbers, strings, non-integers, and 0 are not value values for value in [-1, 0, "a bad value", 7.2]: ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, remaining_uses=value, role_ids=[self.role_id]) self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.BAD_REQUEST)
[docs] def test_create_trust_with_non_existant_trustee_returns_not_found(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=uuid.uuid4().hex, project_id=self.project_id, role_ids=[self.role_id]) self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.NOT_FOUND)
[docs] def test_create_trust_with_trustee_as_trustor_returns_forbidden(self): ref = unit.new_trust_ref( trustor_user_id=self.trustee_user_id, trustee_user_id=self.user_id, project_id=self.project_id, role_ids=[self.role_id]) # NOTE(lbragstad): This fails because the user making the request isn't # the trustor defined in the request. self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.FORBIDDEN)
[docs] def test_create_trust_with_non_existant_project_returns_not_found(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=uuid.uuid4().hex, role_ids=[self.role_id]) self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.NOT_FOUND)
[docs] def test_create_trust_with_non_existant_role_id_returns_not_found(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_ids=[uuid.uuid4().hex]) self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.NOT_FOUND)
[docs] def test_create_trust_with_non_existant_role_name_returns_not_found(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_names=[uuid.uuid4().hex]) self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.NOT_FOUND)
[docs] def test_validate_trust_scoped_token_against_v2(self): # get a project-scoped token auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project_id) token = self.get_requested_token(auth_data) user = unit.new_user_ref(CONF.identity.default_domain_id) trustee = self.identity_api.create_user(user) # create a new trust ref = unit.new_trust_ref( trustor_user_id=self.default_domain_user['id'], trustee_user_id=trustee['id'], project_id=self.default_domain_project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) trust = self.assertValidTrustResponse(r) # get a v3 trust-scoped token as the trustee auth_data = self.build_authentication_request( user_id=trustee['id'], password=user['password'], trust_id=trust['id']) r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse( r, trustee) token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token=self.get_admin_token(), method='GET' )
[docs] def test_v3_v2_intermix_trustee_not_in_default_domain_failed(self): # get a project-scoped token auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project_id) token = self.get_requested_token(auth_data) # create a new trust ref = unit.new_trust_ref( trustor_user_id=self.default_domain_user_id, trustee_user_id=self.trustee_user_id, project_id=self.default_domain_project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) trust = self.assertValidTrustResponse(r) # get a trust-scoped token as the trustee auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse( r, self.trustee_user) token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token=self.get_admin_token(), method='GET', expected_status=http_client.UNAUTHORIZED)
[docs] def test_v3_v2_intermix_project_not_in_default_domain_failed(self): # create a trustee in default domain to delegate stuff to trustee_user = unit.create_user(self.identity_api, domain_id=test_v3.DEFAULT_DOMAIN_ID) trustee_user_id = trustee_user['id'] # create a new trust ref = unit.new_trust_ref( trustor_user_id=self.default_domain_user_id, trustee_user_id=trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) # get a project-scoped token as the default_domain_user auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project_id) token = self.get_requested_token(auth_data) r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) trust = self.assertValidTrustResponse(r) # get a trust-scoped token as the trustee auth_data = self.build_authentication_request( user_id=trustee_user['id'], password=trustee_user['password'], trust_id=trust['id']) r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse(r, trustee_user) token = r.headers.get('X-Subject-Token') # ensure the token is invalid against v2 path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token=self.get_admin_token(), method='GET', expected_status=http_client.UNAUTHORIZED)
[docs] def test_exercise_trust_scoped_token_without_impersonation(self): # create a new trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) resp = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(resp) # get a trust-scoped token as the trustee auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) resp = self.v3_create_token(auth_data) resp_body = resp.json_body['token'] self.assertValidProjectScopedTokenResponse(resp, self.trustee_user) self.assertEqual(self.trustee_user['id'], resp_body['user']['id']) self.assertEqual(self.trustee_user['name'], resp_body['user']['name']) self.assertEqual(self.domain['id'], resp_body['user']['domain']['id']) self.assertEqual(self.domain['name'], resp_body['user']['domain']['name']) self.assertEqual(self.project['id'], resp_body['project']['id']) self.assertEqual(self.project['name'], resp_body['project']['name'])
[docs] def test_exercise_trust_scoped_token_with_impersonation(self): # create a new trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=True, expires=dict(minutes=1), role_ids=[self.role_id]) resp = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(resp) # get a trust-scoped token as the trustee auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) resp = self.v3_create_token(auth_data) resp_body = resp.json_body['token'] self.assertValidProjectScopedTokenResponse(resp, self.user) self.assertEqual(self.user['id'], resp_body['user']['id']) self.assertEqual(self.user['name'], resp_body['user']['name']) self.assertEqual(self.domain['id'], resp_body['user']['domain']['id']) self.assertEqual(self.domain['name'], resp_body['user']['domain']['name']) self.assertEqual(self.project['id'], resp_body['project']['id']) self.assertEqual(self.project['name'], resp_body['project']['name'])
[docs] def test_forbidden_trust_impersonation_in_redelegation(self): """Test forbiddance of impersonation in trust redelegation. Check that trustee not allowed to create a trust (with impersonation set to true) from a redelegated trust (with impersonation set to false) """ # create trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, role_ids=[self.role_id], allow_redelegation=True) resp = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(resp) auth_data = self.build_authentication_request( user_id=self.trustee_user_id, password=self.trustee_user['password'], trust_id=trust['id']) resp = self.v3_create_token(auth_data) # create third-party user, which will be trustee in trust created from # redelegated trust third_party_trustee = unit.create_user(self.identity_api, domain_id=self.domain_id) third_party_trustee_id = third_party_trustee['id'] # create trust from redelegated trust ref = unit.new_trust_ref( trustor_user_id=self.trustee_user_id, trustee_user_id=third_party_trustee_id, project_id=self.project_id, impersonation=True, role_ids=[self.role_id]) ref['redelegated_trust_id'] = trust['id'] self.admin_request(path='/v3/OS-TRUST/trusts', body={'trust': ref}, token=resp.headers.get('X-Subject-Token'), method='POST', expected_status=http_client.FORBIDDEN)
[docs] def test_trust_deleted_when_user_deleted(self): # create trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, role_ids=[self.role_id], allow_redelegation=True) resp = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(resp) # list all trusts r = self.get('/OS-TRUST/trusts') self.assertEqual(1, len(r.result['trusts'])) # delete the trustee will delete the trust self.delete( '/users/%(user_id)s' % {'user_id': trust['trustee_user_id']}) self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, expected_status=http_client.NOT_FOUND) # create another user as the new trustee trustee_user = unit.create_user(self.identity_api, domain_id=self.domain_id) trustee_user_id = trustee_user['id'] # create the trust again ref['trustee_user_id'] = trustee_user_id resp = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(resp) r = self.get('/OS-TRUST/trusts') self.assertEqual(1, len(r.result['trusts'])) # delete the trustor will delete the trust self.delete( '/users/%(user_id)s' % {'user_id': trust['trustor_user_id']}) # call the backend method directly to bypass authentication since the # user has been deleted. self.assertRaises(exception.TrustNotFound, self.trust_api.get_trust, trust['id'])
[docs] def test_trust_deleted_when_project_deleted(self): # create trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, role_ids=[self.role_id], allow_redelegation=True) resp = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(resp) # list all trusts r = self.get('/OS-TRUST/trusts') self.assertEqual(1, len(r.result['trusts'])) # delete the project will delete the trust. self.delete( '/projects/%(project_id)s' % {'project_id': trust['project_id']}) # call the backend method directly to bypass authentication since the # user no longer has the assignment on the project. self.assertRaises(exception.TrustNotFound, self.trust_api.get_trust, trust['id'])