Source code for keystone.limit.core

# Copyright 2018 Huawei
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy

from keystone.common import cache
from keystone.common import driver_hints
from keystone.common import manager
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone.limit.models import base

CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs

MEMOIZE = cache.get_memoization_decorator(group='unified_limit')


[docs]class Manager(manager.Manager): driver_namespace = 'keystone.unified_limit' _provides_api = 'unified_limit_api' def __init__(self): unified_limit_driver = CONF.unified_limit.driver super(Manager, self).__init__(unified_limit_driver) self.enforcement_model = base.load_driver( CONF.unified_limit.enforcement_model)
[docs] def check_project_depth(self): """Check if project depth satisfies current enforcement model.""" PROVIDERS.resource_api.check_project_depth( self.enforcement_model.MAX_PROJECT_TREE_DEPTH)
def _assert_resource_exist(self, unified_limit, target): try: service_id = unified_limit.get('service_id') if service_id is not None: PROVIDERS.catalog_api.get_service(service_id) region_id = unified_limit.get('region_id') if region_id is not None: PROVIDERS.catalog_api.get_region(region_id) project_id = unified_limit.get('project_id') if project_id is not None: project = PROVIDERS.resource_api.get_project(project_id) if project['is_domain']: # Treat the input limit as domain level limit. unified_limit['domain_id'] = unified_limit.pop( 'project_id') domain_id = unified_limit.get('domain_id') if domain_id is not None: PROVIDERS.resource_api.get_domain(domain_id) except exception.ServiceNotFound: raise exception.ValidationError(attribute='service_id', target=target) except exception.RegionNotFound: raise exception.ValidationError(attribute='region_id', target=target) except exception.ProjectNotFound: raise exception.ValidationError(attribute='project_id', target=target) except exception.DomainNotFound: raise exception.ValidationError(attribute='domain_id', target=target)
[docs] def get_model(self): """Return information of the configured enforcement model.""" return { 'name': self.enforcement_model.NAME, 'description': self.enforcement_model.DESCRIPTION }
[docs] def create_registered_limits(self, registered_limits): for registered_limit in registered_limits: self._assert_resource_exist(registered_limit, 'registered_limit') return self.driver.create_registered_limits(registered_limits)
[docs] def update_registered_limit(self, registered_limit_id, registered_limit): self._assert_resource_exist(registered_limit, 'registered_limit') updated_registered_limit = self.driver.update_registered_limit( registered_limit_id, registered_limit) self.get_registered_limit.invalidate(self, updated_registered_limit['id']) return updated_registered_limit
[docs] @manager.response_truncated def list_registered_limits(self, hints=None): return self.driver.list_registered_limits( hints or driver_hints.Hints())
[docs] @MEMOIZE def get_registered_limit(self, registered_limit_id): return self.driver.get_registered_limit(registered_limit_id)
[docs] def delete_registered_limit(self, registered_limit_id): self.driver.delete_registered_limit(registered_limit_id) self.get_registered_limit.invalidate(self, registered_limit_id)
[docs] def create_limits(self, limits): for limit in limits: self._assert_resource_exist(limit, 'limit') self.enforcement_model.check_limit(copy.deepcopy(limits)) return self.driver.create_limits(limits)
[docs] def update_limit(self, limit_id, limit): self._assert_resource_exist(limit, 'limit') limit_ref = self.get_limit(limit_id) limit_ref.update(limit) self.enforcement_model.check_limit(copy.deepcopy([limit_ref])) updated_limit = self.driver.update_limit(limit_id, limit) self.get_limit.invalidate(self, updated_limit['id']) return updated_limit
[docs] @manager.response_truncated def list_limits(self, hints=None): return self.driver.list_limits(hints or driver_hints.Hints())
[docs] @MEMOIZE def get_limit(self, limit_id): return self.driver.get_limit(limit_id)
[docs] def delete_limit(self, limit_id): self.driver.delete_limit(limit_id) self.get_limit.invalidate(self, limit_id)
[docs] def delete_limits_for_project(self, project_id): limit_ids = self.driver.delete_limits_for_project(project_id) for limit_id in limit_ids: self.get_limit.invalidate(self, limit_id)