Source code for ironic.common.inspection_rules.validation

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import enum

import jsonschema

from ironic.api.schemas.v1 import inspection_rule as schema
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common.inspection_rules import actions
from ironic.common.inspection_rules import operators
from ironic.common.inspection_rules import utils


[docs] class InspectionPhase(enum.Enum): MAIN = 'main'
# TODO(stephenfin): Everything here can and should be moved to the jsonschema # schemas, but doing so will change responses.
[docs] def validate_rule(rule): """Validate an inspection rule using the JSON schema. :param rule: The inspection rule to validate. :raises: Invalid if the rule is invalid. """ try: jsonschema.validate(rule, schema.create_request_body) except jsonschema.ValidationError as e: raise exception.Invalid( _('Validation failed for inspection rule: %s') % e) errors = [] phase = rule.get('phase', InspectionPhase.MAIN.value) if phase not in (p.value for p in InspectionPhase): errors.append( _('Invalid phase: %(phase)s. Valid phases are: %(valid)s') % { 'phase': phase, 'valid': ', '.join( [p.value for p in InspectionPhase]) }) priority = rule.get('priority', 0) if priority < 0 and not rule.get('built_in'): errors.append( _("Priority cannot be negative for user-defined rules.")) if priority > 9999 and not rule.get('built_in'): errors.append( _("Priority must be between 0 and 9999 for user-defined rules.")) # Additional plugin-specific validation for condition in rule.get('conditions', []): op, invtd = utils.parse_inverted_operator( condition['op']) plugin = operators.get_operator(op) if not plugin or not callable(plugin): errors.append( _('Unsupported condition operator: %s') % op) try: plugin().validate(condition.get('args', {})) except ValueError as exc: errors.append(_('Invalid parameters for condition operator ' '%(op)s: %(error)s') % {'op': op, 'error': exc}) for action in rule['actions']: plugin = actions.get_action(action['op']) if not plugin or not callable(plugin): errors.append(_('Unsupported action operator: %s') % action['op']) try: plugin().validate(action.get('args', {})) except ValueError as exc: errors.append(_('Invalid parameters for action operator %(op)s: ' '%(error)s') % {'op': action['op'], 'error': exc}) if errors: if len(errors) == 1: raise exception.Invalid(errors[0]) else: raise exception.Invalid(_('Multiple validation errors occurred: ' '%s') % '; '.join(errors))