Source code for ironic.tests.unit.common.test_raid

# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import json

from ironic.common import exception
from ironic.common import raid
from ironic.drivers import base as drivers_base
from ironic.tests import base
from ironic.tests.unit.db import base as db_base
from ironic.tests.unit.objects import utils as obj_utils
from ironic.tests.unit import raid_constants


[docs]class ValidateRaidConfigurationTestCase(base.TestCase):
[docs] def setUp(self): with open(drivers_base.RAID_CONFIG_SCHEMA, 'r') as raid_schema_fobj: self.schema = json.load(raid_schema_fobj) super(ValidateRaidConfigurationTestCase, self).setUp()
[docs] def test_validate_configuration_okay(self): raid_config = json.loads(raid_constants.RAID_CONFIG_OKAY) raid.validate_configuration( raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_no_logical_disk(self): self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, {}, raid_config_schema=self.schema)
[docs] def test_validate_configuration_zero_logical_disks(self): raid_config = json.loads(raid_constants.RAID_CONFIG_NO_LOGICAL_DISKS) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_no_raid_level(self): raid_config = json.loads(raid_constants.RAID_CONFIG_NO_RAID_LEVEL) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_raid_level(self): raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_RAID_LEVEL) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_no_size_gb(self): raid_config = json.loads(raid_constants.RAID_CONFIG_NO_SIZE_GB) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_max_size_gb(self): raid_config = json.loads(raid_constants.RAID_CONFIG_MAX_SIZE_GB) raid.validate_configuration(raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_size_gb(self): raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_SIZE_GB) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_is_root_volume(self): raid_config_str = raid_constants.RAID_CONFIG_INVALID_IS_ROOT_VOL raid_config = json.loads(raid_config_str) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_multiple_is_root_volume(self): raid_config_str = raid_constants.RAID_CONFIG_MULTIPLE_IS_ROOT_VOL raid_config = json.loads(raid_config_str) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_share_physical_disks(self): raid_config_str = raid_constants.RAID_CONFIG_INVALID_SHARE_PHY_DISKS raid_config = json.loads(raid_config_str) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_disk_type(self): raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_DISK_TYPE) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_int_type(self): raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_INT_TYPE) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_number_of_phy_disks(self): raid_config_str = raid_constants.RAID_CONFIG_INVALID_NUM_PHY_DISKS raid_config = json.loads(raid_config_str) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_physical_disks(self): raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_PHY_DISKS) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_additional_property(self): raid_config = json.loads(raid_constants.RAID_CONFIG_ADDITIONAL_PROP) self.assertRaises(exception.InvalidParameterValue, raid.validate_configuration, raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_with_jbod_volume(self): raid_config = json.loads(raid_constants.RAID_CONFIG_JBOD_VOLUME) raid.validate_configuration(raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_custom_schema(self): raid_config = json.loads(raid_constants.CUSTOM_SCHEMA_RAID_CONFIG) schema = json.loads(raid_constants.CUSTOM_RAID_SCHEMA) raid.validate_configuration(raid_config, raid_config_schema=schema)
[docs]class RaidPublicMethodsTestCase(db_base.DbTestCase):
[docs] def test_get_logical_disk_properties(self): with open(drivers_base.RAID_CONFIG_SCHEMA, 'r') as raid_schema_fobj: schema = json.load(raid_schema_fobj) logical_disk_properties = raid.get_logical_disk_properties(schema) self.assertIn('raid_level', logical_disk_properties) self.assertIn('size_gb', logical_disk_properties) self.assertIn('volume_name', logical_disk_properties) self.assertIn('is_root_volume', logical_disk_properties) self.assertIn('share_physical_disks', logical_disk_properties) self.assertIn('disk_type', logical_disk_properties) self.assertIn('interface_type', logical_disk_properties) self.assertIn('number_of_physical_disks', logical_disk_properties) self.assertIn('controller', logical_disk_properties) self.assertIn('physical_disks', logical_disk_properties)
[docs] def test_get_logical_disk_properties_custom_schema(self): raid_schema = json.loads(raid_constants.CUSTOM_RAID_SCHEMA) logical_disk_properties = raid.get_logical_disk_properties( raid_config_schema=raid_schema) self.assertIn('raid_level', logical_disk_properties) self.assertIn('size_gb', logical_disk_properties) self.assertIn('foo', logical_disk_properties)
def _test_update_raid_info(self, current_config, capabilities=None): node = obj_utils.create_test_node(self.context, driver='fake') if capabilities: properties = node.properties properties['capabilities'] = capabilities del properties['local_gb'] node.properties = properties target_raid_config = json.loads(raid_constants.RAID_CONFIG_OKAY) node.target_raid_config = target_raid_config node.save() raid.update_raid_info(node, current_config) properties = node.properties current = node.raid_config target = node.target_raid_config self.assertIsNotNone(current['last_updated']) self.assertIsInstance(current['logical_disks'][0], dict) if current_config['logical_disks'][0].get('is_root_volume'): self.assertEqual({'wwn': '600508B100'}, properties['root_device']) self.assertEqual(100, properties['local_gb']) self.assertIn('raid_level:1', properties['capabilities']) if capabilities: self.assertIn(capabilities, properties['capabilities']) else: self.assertNotIn('local_gb', properties) self.assertNotIn('root_device', properties) if capabilities: self.assertNotIn('raid_level:1', properties['capabilities']) # Verify node.target_raid_config is preserved. self.assertEqual(target_raid_config, target)
[docs] def test_update_raid_info_okay(self): current_config = json.loads(raid_constants.CURRENT_RAID_CONFIG) self._test_update_raid_info(current_config, capabilities='boot_mode:bios')
[docs] def test_update_raid_info_okay_no_root_volumes(self): current_config = json.loads(raid_constants.CURRENT_RAID_CONFIG) del current_config['logical_disks'][0]['is_root_volume'] del current_config['logical_disks'][0]['root_device_hint'] self._test_update_raid_info(current_config, capabilities='boot_mode:bios')
[docs] def test_update_raid_info_okay_current_capabilities_empty(self): current_config = json.loads(raid_constants.CURRENT_RAID_CONFIG) self._test_update_raid_info(current_config, capabilities=None)
[docs] def test_update_raid_info_multiple_root_volumes(self): current_config = json.loads(raid_constants.RAID_CONFIG_MULTIPLE_ROOT) self.assertRaises(exception.InvalidParameterValue, self._test_update_raid_info, current_config)