Source code for ironic_lib.tests.test_utils

# Copyright 2011 Justin Santa Barbara
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import copy
import errno
import os
import os.path
import tempfile

import mock
from oslo_concurrency import processutils
from oslo_config import cfg
from oslotest import base as test_base

from ironic_lib import exception
from ironic_lib import utils

CONF = cfg.CONF


[docs]class BareMetalUtilsTestCase(test_base.BaseTestCase):
[docs]class ExecuteTestCase(test_base.BaseTestCase):
[docs] def test_retry_on_failure(self): fd, tmpfilename = tempfile.mkstemp() _, tmpfilename2 = tempfile.mkstemp() try: fp = os.fdopen(fd, 'w+') fp.write('''#!/bin/sh # If stdin fails to get passed during one of the runs, make a note. if ! grep -q foo then echo 'failure' > "$1" fi # If stdin has failed to get passed during this or a previous run, exit early. if grep failure "$1" then exit 1 fi runs="$(cat $1)" if [ -z "$runs" ] then runs=0 fi runs=$(($runs + 1)) echo $runs > "$1" exit 1 ''') fp.close() os.chmod(tmpfilename, 0o755) try: self.assertRaises(processutils.ProcessExecutionError, utils.execute, tmpfilename, tmpfilename2, attempts=10, process_input=b'foo', delay_on_retry=False) except OSError as e: if e.errno == errno.EACCES: self.skipTest("Permissions error detected. " "Are you running with a noexec /tmp?") else: raise fp = open(tmpfilename2, 'r') runs = fp.read() fp.close() self.assertNotEqual(runs.strip(), 'failure', 'stdin did not ' 'always get passed ' 'correctly') runs = int(runs.strip()) self.assertEqual(10, runs, 'Ran %d times instead of 10.' % (runs,)) finally: os.unlink(tmpfilename) os.unlink(tmpfilename2)
[docs] def test_unknown_kwargs_raises_error(self): self.assertRaises(processutils.UnknownArgumentError, utils.execute, '/usr/bin/env', 'true', this_is_not_a_valid_kwarg=True)
[docs] def test_check_exit_code_boolean(self): utils.execute('/usr/bin/env', 'false', check_exit_code=False) self.assertRaises(processutils.ProcessExecutionError, utils.execute, '/usr/bin/env', 'false', check_exit_code=True)
[docs] def test_no_retry_on_success(self): fd, tmpfilename = tempfile.mkstemp() _, tmpfilename2 = tempfile.mkstemp() try: fp = os.fdopen(fd, 'w+') fp.write('''#!/bin/sh # If we've already run, bail out. grep -q foo "$1" && exit 1 # Mark that we've run before. echo foo > "$1" # Check that stdin gets passed correctly. grep foo ''') fp.close() os.chmod(tmpfilename, 0o755) try: utils.execute(tmpfilename, tmpfilename2, process_input=b'foo', attempts=2) except OSError as e: if e.errno == errno.EACCES: self.skipTest("Permissions error detected. " "Are you running with a noexec /tmp?") else: raise finally: os.unlink(tmpfilename) os.unlink(tmpfilename2)
@mock.patch.object(processutils, 'execute', autospec=True) @mock.patch.object(os.environ, 'copy', return_value={}, autospec=True)
[docs] def test_execute_use_standard_locale_no_env_variables(self, env_mock, execute_mock): utils.execute('foo', use_standard_locale=True) execute_mock.assert_called_once_with('foo', env_variables={'LC_ALL': 'C'})
@mock.patch.object(processutils, 'execute', autospec=True)
[docs] def test_execute_use_standard_locale_with_env_variables(self, execute_mock): utils.execute('foo', use_standard_locale=True, env_variables={'foo': 'bar'}) execute_mock.assert_called_once_with('foo', env_variables={'LC_ALL': 'C', 'foo': 'bar'})
@mock.patch.object(processutils, 'execute', autospec=True)
[docs] def test_execute_not_use_standard_locale(self, execute_mock): utils.execute('foo', use_standard_locale=False, env_variables={'foo': 'bar'}) execute_mock.assert_called_once_with('foo', env_variables={'foo': 'bar'})
[docs] def test_execute_without_root_helper(self): CONF.set_override('root_helper', None, group='ironic_lib') with mock.patch.object( processutils, 'execute', autospec=True) as execute_mock: utils.execute('foo', run_as_root=False) execute_mock.assert_called_once_with('foo', run_as_root=False)
[docs] def test_execute_without_root_helper_run_as_root(self): CONF.set_override('root_helper', None, group='ironic_lib') with mock.patch.object( processutils, 'execute', autospec=True) as execute_mock: utils.execute('foo', run_as_root=True) execute_mock.assert_called_once_with('foo', run_as_root=False)
[docs] def test_execute_with_root_helper(self): with mock.patch.object( processutils, 'execute', autospec=True) as execute_mock: utils.execute('foo', run_as_root=False) execute_mock.assert_called_once_with('foo', run_as_root=False)
[docs] def test_execute_with_root_helper_run_as_root(self): with mock.patch.object( processutils, 'execute', autospec=True) as execute_mock: utils.execute('foo', run_as_root=True) execute_mock.assert_called_once_with( 'foo', run_as_root=True, root_helper=CONF.ironic_lib.root_helper)
@mock.patch.object(utils, 'LOG', autospec=True) def _test_execute_with_log_stdout(self, log_mock, log_stdout=None): with mock.patch.object(processutils, 'execute') as execute_mock: execute_mock.return_value = ('stdout', 'stderr') if log_stdout is not None: utils.execute('foo', log_stdout=log_stdout) else: utils.execute('foo') execute_mock.assert_called_once_with('foo') name, args, kwargs = log_mock.debug.mock_calls[1] if log_stdout is False: self.assertEqual(2, log_mock.debug.call_count) self.assertNotIn('stdout', args[0]) else: self.assertEqual(3, log_mock.debug.call_count) self.assertIn('stdout', args[0])
[docs] def test_execute_with_log_stdout_default(self): self._test_execute_with_log_stdout()
[docs] def test_execute_with_log_stdout_true(self): self._test_execute_with_log_stdout(log_stdout=True)
[docs] def test_execute_with_log_stdout_false(self): self._test_execute_with_log_stdout(log_stdout=False)
[docs]class MkfsTestCase(test_base.BaseTestCase): @mock.patch.object(utils, 'execute', autospec=True)
[docs] def test_mkfs(self, execute_mock): utils.mkfs('ext4', '/my/block/dev') utils.mkfs('msdos', '/my/msdos/block/dev') utils.mkfs('swap', '/my/swap/block/dev') expected = [mock.call('mkfs', '-t', 'ext4', '-F', '/my/block/dev', run_as_root=True, use_standard_locale=True), mock.call('mkfs', '-t', 'msdos', '/my/msdos/block/dev', run_as_root=True, use_standard_locale=True), mock.call('mkswap', '/my/swap/block/dev', run_as_root=True, use_standard_locale=True)] self.assertEqual(expected, execute_mock.call_args_list)
@mock.patch.object(utils, 'execute', autospec=True)
[docs] def test_mkfs_with_label(self, execute_mock): utils.mkfs('ext4', '/my/block/dev', 'ext4-vol') utils.mkfs('msdos', '/my/msdos/block/dev', 'msdos-vol') utils.mkfs('swap', '/my/swap/block/dev', 'swap-vol') expected = [mock.call('mkfs', '-t', 'ext4', '-F', '-L', 'ext4-vol', '/my/block/dev', run_as_root=True, use_standard_locale=True), mock.call('mkfs', '-t', 'msdos', '-n', 'msdos-vol', '/my/msdos/block/dev', run_as_root=True, use_standard_locale=True), mock.call('mkswap', '-L', 'swap-vol', '/my/swap/block/dev', run_as_root=True, use_standard_locale=True)] self.assertEqual(expected, execute_mock.call_args_list)
@mock.patch.object(utils, 'execute', autospec=True, side_effect=processutils.ProcessExecutionError( stderr=os.strerror(errno.ENOENT)))
[docs] def test_mkfs_with_unsupported_fs(self, execute_mock): self.assertRaises(exception.FileSystemNotSupported, utils.mkfs, 'foo', '/my/block/dev')
@mock.patch.object(utils, 'execute', autospec=True, side_effect=processutils.ProcessExecutionError( stderr='fake'))
[docs] def test_mkfs_with_unexpected_error(self, execute_mock): self.assertRaises(processutils.ProcessExecutionError, utils.mkfs, 'ext4', '/my/block/dev', 'ext4-vol')
[docs]class IsHttpUrlTestCase(test_base.BaseTestCase):
[docs] def test_is_http_url(self): self.assertTrue(utils.is_http_url('http://127.0.0.1')) self.assertTrue(utils.is_http_url('https://127.0.0.1')) self.assertTrue(utils.is_http_url('HTTP://127.1.2.3')) self.assertTrue(utils.is_http_url('HTTPS://127.3.2.1')) self.assertFalse(utils.is_http_url('Zm9vYmFy')) self.assertFalse(utils.is_http_url('11111111'))
[docs]class ParseRootDeviceTestCase(test_base.BaseTestCase):
[docs] def test_parse_root_device_hints_without_operators(self): root_device = { 'wwn': '123456', 'model': 'FOO model', 'size': 12345, 'serial': 'foo-serial', 'vendor': 'foo VENDOR with space', 'name': '/dev/sda', 'wwn_with_extension': '123456111', 'wwn_vendor_extension': '111', 'rotational': True} result = utils.parse_root_device_hints(root_device) expected = { 'wwn': 's== 123456', 'model': 's== foo%20model', 'size': '== 12345', 'serial': 's== foo-serial', 'vendor': 's== foo%20vendor%20with%20space', 'name': 's== /dev/sda', 'wwn_with_extension': 's== 123456111', 'wwn_vendor_extension': 's== 111', 'rotational': True} self.assertEqual(expected, result)
[docs] def test_parse_root_device_hints_with_operators(self): root_device = { 'wwn': 's== 123456', 'model': 's== foo MODEL', 'size': '>= 12345', 'serial': 's!= foo-serial', 'vendor': 's== foo VENDOR with space', 'name': '<or> /dev/sda <or> /dev/sdb', 'wwn_with_extension': 's!= 123456111', 'wwn_vendor_extension': 's== 111', 'rotational': True} # Validate strings being normalized expected = copy.deepcopy(root_device) expected['model'] = 's== foo%20model' expected['vendor'] = 's== foo%20vendor%20with%20space' result = utils.parse_root_device_hints(root_device) # The hints already contain the operators, make sure we keep it self.assertEqual(expected, result)
[docs] def test_parse_root_device_hints_no_hints(self): result = utils.parse_root_device_hints({}) self.assertIsNone(result)
[docs] def test_parse_root_device_hints_convert_size(self): for size in (12345, '12345'): result = utils.parse_root_device_hints({'size': size}) self.assertEqual({'size': '== 12345'}, result)
[docs] def test_parse_root_device_hints_invalid_size(self): for value in ('not-int', -123, 0): self.assertRaises(ValueError, utils.parse_root_device_hints, {'size': value})
[docs] def test_parse_root_device_hints_int_or(self): expr = '<or> 123 <or> 456 <or> 789' result = utils.parse_root_device_hints({'size': expr}) self.assertEqual({'size': expr}, result)
[docs] def test_parse_root_device_hints_int_or_invalid(self): expr = '<or> 123 <or> non-int <or> 789' self.assertRaises(ValueError, utils.parse_root_device_hints, {'size': expr})
[docs] def test_parse_root_device_hints_string_or_space(self): expr = '<or> foo <or> foo bar <or> bar' expected = '<or> foo <or> foo%20bar <or> bar' result = utils.parse_root_device_hints({'model': expr}) self.assertEqual({'model': expected}, result)
def _parse_root_device_hints_convert_rotational(self, values, expected_value): for value in values: result = utils.parse_root_device_hints({'rotational': value}) self.assertEqual({'rotational': expected_value}, result)
[docs] def test_parse_root_device_hints_convert_rotational(self): self._parse_root_device_hints_convert_rotational( (True, 'true', 'on', 'y', 'yes'), True) self._parse_root_device_hints_convert_rotational( (False, 'false', 'off', 'n', 'no'), False)
[docs] def test_parse_root_device_hints_invalid_rotational(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'rotational': 'not-bool'})
[docs] def test_parse_root_device_hints_invalid_wwn(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'wwn': 123})
[docs] def test_parse_root_device_hints_invalid_wwn_with_extension(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'wwn_with_extension': 123})
[docs] def test_parse_root_device_hints_invalid_wwn_vendor_extension(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'wwn_vendor_extension': 123})
[docs] def test_parse_root_device_hints_invalid_model(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'model': 123})
[docs] def test_parse_root_device_hints_invalid_serial(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'serial': 123})
[docs] def test_parse_root_device_hints_invalid_vendor(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'vendor': 123})
[docs] def test_parse_root_device_hints_invalid_name(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'name': 123})
[docs] def test_parse_root_device_hints_non_existent_hint(self): self.assertRaises(ValueError, utils.parse_root_device_hints, {'non-existent': 'foo'})
[docs] def test_extract_hint_operator_and_values_single_value(self): expected = {'op': '>=', 'values': ['123']} self.assertEqual( expected, utils._extract_hint_operator_and_values( '>= 123', 'size'))
[docs] def test_extract_hint_operator_and_values_multiple_values(self): expected = {'op': '<or>', 'values': ['123', '456', '789']} expr = '<or> 123 <or> 456 <or> 789' self.assertEqual( expected, utils._extract_hint_operator_and_values(expr, 'size'))
[docs] def test_extract_hint_operator_and_values_multiple_values_space(self): expected = {'op': '<or>', 'values': ['foo', 'foo bar', 'bar']} expr = '<or> foo <or> foo bar <or> bar' self.assertEqual( expected, utils._extract_hint_operator_and_values(expr, 'model'))
[docs] def test_extract_hint_operator_and_values_no_operator(self): expected = {'op': '', 'values': ['123']} self.assertEqual( expected, utils._extract_hint_operator_and_values('123', 'size'))
[docs] def test_extract_hint_operator_and_values_empty_value(self): self.assertRaises( ValueError, utils._extract_hint_operator_and_values, '', 'size')
[docs] def test_extract_hint_operator_and_values_integer(self): expected = {'op': '', 'values': ['123']} self.assertEqual( expected, utils._extract_hint_operator_and_values(123, 'size'))
[docs] def test__append_operator_to_hints(self): root_device = {'serial': 'foo', 'size': 12345, 'model': 'foo model', 'rotational': True} expected = {'serial': 's== foo', 'size': '== 12345', 'model': 's== foo model', 'rotational': True} result = utils._append_operator_to_hints(root_device) self.assertEqual(expected, result)
[docs] def test_normalize_hint_expression_or(self): expr = '<or> foo <or> foo bar <or> bar' expected = '<or> foo <or> foo%20bar <or> bar' result = utils._normalize_hint_expression(expr, 'model') self.assertEqual(expected, result)
[docs] def test_normalize_hint_expression_in(self): expr = '<in> foo <in> foo bar <in> bar' expected = '<in> foo <in> foo%20bar <in> bar' result = utils._normalize_hint_expression(expr, 'model') self.assertEqual(expected, result)
[docs] def test_normalize_hint_expression_op_space(self): expr = 's== test string with space' expected = 's== test%20string%20with%20space' result = utils._normalize_hint_expression(expr, 'model') self.assertEqual(expected, result)
[docs] def test_normalize_hint_expression_op_no_space(self): expr = 's!= SpongeBob' expected = 's!= spongebob' result = utils._normalize_hint_expression(expr, 'model') self.assertEqual(expected, result)
[docs] def test_normalize_hint_expression_no_op_space(self): expr = 'no operators' expected = 'no%20operators' result = utils._normalize_hint_expression(expr, 'model') self.assertEqual(expected, result)
[docs] def test_normalize_hint_expression_no_op_no_space(self): expr = 'NoSpace' expected = 'nospace' result = utils._normalize_hint_expression(expr, 'model') self.assertEqual(expected, result)
[docs] def test_normalize_hint_expression_empty_value(self): self.assertRaises( ValueError, utils._normalize_hint_expression, '', 'size')
[docs]class MatchRootDeviceTestCase(test_base.BaseTestCase):
[docs] def setUp(self): super(MatchRootDeviceTestCase, self).setUp() self.devices = [ {'name': '/dev/sda', 'size': 64424509440, 'model': 'ok model', 'serial': 'fakeserial'}, {'name': '/dev/sdb', 'size': 128849018880, 'model': 'big model', 'serial': 'veryfakeserial', 'rotational': 'yes'}, {'name': '/dev/sdc', 'size': 10737418240, 'model': 'small model', 'serial': 'veryveryfakeserial', 'rotational': False}, ]
[docs] def test_match_root_device_hints_one_hint(self): root_device_hints = {'size': '>= 70'} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertEqual('/dev/sdb', dev['name'])
[docs] def test_match_root_device_hints_rotational(self): root_device_hints = {'rotational': False} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertEqual('/dev/sdc', dev['name'])
[docs] def test_match_root_device_hints_rotational_convert_devices_bool(self): root_device_hints = {'size': '>=100', 'rotational': True} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertEqual('/dev/sdb', dev['name'])
[docs] def test_match_root_device_hints_multiple_hints(self): root_device_hints = {'size': '>= 50', 'model': 's==big model', 'serial': 's==veryfakeserial'} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertEqual('/dev/sdb', dev['name'])
[docs] def test_match_root_device_hints_multiple_hints2(self): root_device_hints = { 'size': '<= 20', 'model': '<or> model 5 <or> foomodel <or> small model <or>', 'serial': 's== veryveryfakeserial'} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertEqual('/dev/sdc', dev['name'])
[docs] def test_match_root_device_hints_multiple_hints3(self): root_device_hints = {'rotational': False, 'model': '<in> small'} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertEqual('/dev/sdc', dev['name'])
[docs] def test_match_root_device_hints_no_operators(self): root_device_hints = {'size': '120', 'model': 'big model', 'serial': 'veryfakeserial'} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertEqual('/dev/sdb', dev['name'])
[docs] def test_match_root_device_hints_no_device_found(self): root_device_hints = {'size': '>=50', 'model': 's==foo'} dev = utils.match_root_device_hints(self.devices, root_device_hints) self.assertIsNone(dev)
@mock.patch.object(utils.LOG, 'warning', autospec=True)
[docs] def test_match_root_device_hints_empty_device_attribute(self, mock_warn): empty_dev = [{'name': '/dev/sda', 'model': ' '}] dev = utils.match_root_device_hints(empty_dev, {'model': 'foo'}) self.assertIsNone(dev) self.assertTrue(mock_warn.called)