# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from ironic.common import exception
from ironic.common import raid
from ironic.drivers import base as drivers_base
from ironic.tests import base
from ironic.tests.unit.db import base as db_base
from ironic.tests.unit.objects import utils as obj_utils
from ironic.tests.unit import raid_constants
[docs]class ValidateRaidConfigurationTestCase(base.TestCase):
[docs] def setUp(self):
with open(drivers_base.RAID_CONFIG_SCHEMA, 'r') as raid_schema_fobj:
self.schema = json.load(raid_schema_fobj)
super(ValidateRaidConfigurationTestCase, self).setUp()
[docs] def test_validate_configuration_okay(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_OKAY)
raid.validate_configuration(
raid_config, raid_config_schema=self.schema)
[docs] def test_validate_configuration_no_logical_disk(self):
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
{},
raid_config_schema=self.schema)
[docs] def test_validate_configuration_zero_logical_disks(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_NO_LOGICAL_DISKS)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_no_raid_level(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_NO_RAID_LEVEL)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_raid_level(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_RAID_LEVEL)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_no_size_gb(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_NO_SIZE_GB)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_max_size_gb(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_MAX_SIZE_GB)
raid.validate_configuration(raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_size_gb(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_SIZE_GB)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_is_root_volume(self):
raid_config_str = raid_constants.RAID_CONFIG_INVALID_IS_ROOT_VOL
raid_config = json.loads(raid_config_str)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_multiple_is_root_volume(self):
raid_config_str = raid_constants.RAID_CONFIG_MULTIPLE_IS_ROOT_VOL
raid_config = json.loads(raid_config_str)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_share_physical_disks(self):
raid_config_str = raid_constants.RAID_CONFIG_INVALID_SHARE_PHY_DISKS
raid_config = json.loads(raid_config_str)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_disk_type(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_DISK_TYPE)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_int_type(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_INT_TYPE)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_number_of_phy_disks(self):
raid_config_str = raid_constants.RAID_CONFIG_INVALID_NUM_PHY_DISKS
raid_config = json.loads(raid_config_str)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_invalid_physical_disks(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_INVALID_PHY_DISKS)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_additional_property(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_ADDITIONAL_PROP)
self.assertRaises(exception.InvalidParameterValue,
raid.validate_configuration,
raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_with_jbod_volume(self):
raid_config = json.loads(raid_constants.RAID_CONFIG_JBOD_VOLUME)
raid.validate_configuration(raid_config,
raid_config_schema=self.schema)
[docs] def test_validate_configuration_custom_schema(self):
raid_config = json.loads(raid_constants.CUSTOM_SCHEMA_RAID_CONFIG)
schema = json.loads(raid_constants.CUSTOM_RAID_SCHEMA)
raid.validate_configuration(raid_config,
raid_config_schema=schema)
[docs]class RaidPublicMethodsTestCase(db_base.DbTestCase):
[docs] def test_get_logical_disk_properties(self):
with open(drivers_base.RAID_CONFIG_SCHEMA, 'r') as raid_schema_fobj:
schema = json.load(raid_schema_fobj)
logical_disk_properties = raid.get_logical_disk_properties(schema)
self.assertIn('raid_level', logical_disk_properties)
self.assertIn('size_gb', logical_disk_properties)
self.assertIn('volume_name', logical_disk_properties)
self.assertIn('is_root_volume', logical_disk_properties)
self.assertIn('share_physical_disks', logical_disk_properties)
self.assertIn('disk_type', logical_disk_properties)
self.assertIn('interface_type', logical_disk_properties)
self.assertIn('number_of_physical_disks', logical_disk_properties)
self.assertIn('controller', logical_disk_properties)
self.assertIn('physical_disks', logical_disk_properties)
[docs] def test_get_logical_disk_properties_custom_schema(self):
raid_schema = json.loads(raid_constants.CUSTOM_RAID_SCHEMA)
logical_disk_properties = raid.get_logical_disk_properties(
raid_config_schema=raid_schema)
self.assertIn('raid_level', logical_disk_properties)
self.assertIn('size_gb', logical_disk_properties)
self.assertIn('foo', logical_disk_properties)
def _test_update_raid_info(self, current_config,
capabilities=None):
node = obj_utils.create_test_node(self.context,
driver='fake')
if capabilities:
properties = node.properties
properties['capabilities'] = capabilities
del properties['local_gb']
node.properties = properties
target_raid_config = json.loads(raid_constants.RAID_CONFIG_OKAY)
node.target_raid_config = target_raid_config
node.save()
raid.update_raid_info(node, current_config)
properties = node.properties
current = node.raid_config
target = node.target_raid_config
self.assertIsNotNone(current['last_updated'])
self.assertIsInstance(current['logical_disks'][0], dict)
if current_config['logical_disks'][0].get('is_root_volume'):
self.assertEqual({'wwn': '600508B100'},
properties['root_device'])
self.assertEqual(100, properties['local_gb'])
self.assertIn('raid_level:1', properties['capabilities'])
if capabilities:
self.assertIn(capabilities, properties['capabilities'])
else:
self.assertNotIn('local_gb', properties)
self.assertNotIn('root_device', properties)
if capabilities:
self.assertNotIn('raid_level:1', properties['capabilities'])
# Verify node.target_raid_config is preserved.
self.assertEqual(target_raid_config, target)
[docs] def test_update_raid_info_okay(self):
current_config = json.loads(raid_constants.CURRENT_RAID_CONFIG)
self._test_update_raid_info(current_config,
capabilities='boot_mode:bios')
[docs] def test_update_raid_info_okay_no_root_volumes(self):
current_config = json.loads(raid_constants.CURRENT_RAID_CONFIG)
del current_config['logical_disks'][0]['is_root_volume']
del current_config['logical_disks'][0]['root_device_hint']
self._test_update_raid_info(current_config,
capabilities='boot_mode:bios')
[docs] def test_update_raid_info_okay_current_capabilities_empty(self):
current_config = json.loads(raid_constants.CURRENT_RAID_CONFIG)
self._test_update_raid_info(current_config,
capabilities=None)
[docs] def test_update_raid_info_multiple_root_volumes(self):
current_config = json.loads(raid_constants.RAID_CONFIG_MULTIPLE_ROOT)
self.assertRaises(exception.InvalidParameterValue,
self._test_update_raid_info,
current_config)