# Copyright 2011 Justin Santa Barbara
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import glob
import os
import shutil
import tempfile
import testtools
import mock
from oslo_concurrency import processutils
from oslotest import base as test_base
import six
from ironic_python_agent import errors
from ironic_python_agent import utils
if six.PY2:
OPEN_FUNCTION_NAME = '__builtin__.open'
else:
OPEN_FUNCTION_NAME = 'builtins.open'
[docs]class ExecuteTestCase(testtools.TestCase):
"""This class is a copy of the same class in openstack/ironic."""
[docs] def test_retry_on_failure(self):
fd, tmpfilename = tempfile.mkstemp()
_, tmpfilename2 = tempfile.mkstemp()
try:
fp = os.fdopen(fd, 'w+')
fp.write('''#!/bin/sh
# If stdin fails to get passed during one of the runs, make a note.
if ! grep -q foo
then
echo 'failure' > "$1"
fi
# If stdin has failed to get passed during this or a previous run, exit early.
if grep failure "$1"
then
exit 1
fi
runs="$(cat $1)"
if [ -z "$runs" ]
then
runs=0
fi
runs=$(($runs + 1))
echo $runs > "$1"
exit 1
''')
fp.close()
os.chmod(tmpfilename, 0o755)
try:
self.assertRaises(processutils.ProcessExecutionError,
utils.execute,
tmpfilename, tmpfilename2, attempts=10,
process_input=b'foo',
delay_on_retry=False)
except OSError as e:
if e.errno == errno.EACCES:
self.skipTest("Permissions error detected. "
"Are you running with a noexec /tmp?")
else:
raise
fp = open(tmpfilename2, 'r')
runs = fp.read()
fp.close()
self.assertNotEqual(runs.strip(), 'failure', 'stdin did not '
'always get passed '
'correctly')
runs = int(runs.strip())
self.assertEqual(10, runs,
'Ran %d times instead of 10.' % (runs,))
finally:
os.unlink(tmpfilename)
os.unlink(tmpfilename2)
[docs] def test_unknown_kwargs_raises_error(self):
self.assertRaises(processutils.UnknownArgumentError,
utils.execute,
'/usr/bin/env', 'true',
this_is_not_a_valid_kwarg=True)
[docs] def test_check_exit_code_boolean(self):
utils.execute('/usr/bin/env', 'false', check_exit_code=False)
self.assertRaises(processutils.ProcessExecutionError,
utils.execute,
'/usr/bin/env', 'false', check_exit_code=True)
[docs] def test_no_retry_on_success(self):
fd, tmpfilename = tempfile.mkstemp()
_, tmpfilename2 = tempfile.mkstemp()
try:
fp = os.fdopen(fd, 'w+')
fp.write('''#!/bin/sh
# If we've already run, bail out.
grep -q foo "$1" && exit 1
# Mark that we've run before.
echo foo > "$1"
# Check that stdin gets passed correctly.
grep foo
''')
fp.close()
os.chmod(tmpfilename, 0o755)
try:
utils.execute(tmpfilename,
tmpfilename2,
process_input=b'foo',
attempts=2)
except OSError as e:
if e.errno == errno.EACCES:
self.skipTest("Permissions error detected. "
"Are you running with a noexec /tmp?")
else:
raise
finally:
os.unlink(tmpfilename)
os.unlink(tmpfilename2)
[docs]class GetAgentParamsTestCase(test_base.BaseTestCase):
@mock.patch('oslo_log.log.getLogger')
@mock.patch(OPEN_FUNCTION_NAME)
[docs] def test__read_params_from_file_fail(self, logger_mock, open_mock):
open_mock.side_effect = Exception
params = utils._read_params_from_file('file-path')
self.assertEqual(params, {})
@mock.patch(OPEN_FUNCTION_NAME)
[docs] def test__read_params_from_file(self, open_mock):
kernel_line = 'api-url=http://localhost:9999 baz foo=bar\n'
open_mock.return_value.__enter__ = lambda s: s
open_mock.return_value.__exit__ = mock.Mock()
read_mock = open_mock.return_value.read
read_mock.return_value = kernel_line
params = utils._read_params_from_file('file-path')
open_mock.assert_called_once_with('file-path')
read_mock.assert_called_once_with()
self.assertEqual(params['api-url'], 'http://localhost:9999')
self.assertEqual(params['foo'], 'bar')
self.assertFalse('baz' in params)
@mock.patch.object(utils, '_set_cached_params')
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(utils, '_get_cached_params')
[docs] def test_get_agent_params_kernel_cmdline(self, get_cache_mock,
read_params_mock,
set_cache_mock):
get_cache_mock.return_value = {}
expected_params = {'a': 'b'}
read_params_mock.return_value = expected_params
returned_params = utils.get_agent_params()
read_params_mock.assert_called_once_with('/proc/cmdline')
self.assertEqual(expected_params, returned_params)
set_cache_mock.assert_called_once_with(expected_params)
@mock.patch.object(utils, '_set_cached_params')
@mock.patch.object(utils, '_get_vmedia_params')
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(utils, '_get_cached_params')
[docs] def test_get_agent_params_vmedia(self, get_cache_mock,
read_params_mock,
get_vmedia_params_mock,
set_cache_mock):
get_cache_mock.return_value = {}
kernel_params = {'boot_method': 'vmedia'}
vmedia_params = {'a': 'b'}
expected_params = dict(list(kernel_params.items()) +
list(vmedia_params.items()))
read_params_mock.return_value = kernel_params
get_vmedia_params_mock.return_value = vmedia_params
returned_params = utils.get_agent_params()
read_params_mock.assert_called_once_with('/proc/cmdline')
self.assertEqual(expected_params, returned_params)
# Make sure information is cached
set_cache_mock.assert_called_once_with(expected_params)
@mock.patch.object(utils, '_set_cached_params')
@mock.patch.object(utils, '_get_cached_params')
[docs] def test_get_agent_params_from_cache(self, get_cache_mock,
set_cache_mock):
get_cache_mock.return_value = {'a': 'b'}
returned_params = utils.get_agent_params()
expected_params = {'a': 'b'}
self.assertEqual(expected_params, returned_params)
self.assertEqual(0, set_cache_mock.call_count)
@mock.patch(OPEN_FUNCTION_NAME)
@mock.patch.object(glob, 'glob')
[docs] def test__get_vmedia_device(self, glob_mock, open_mock):
glob_mock.return_value = ['/sys/class/block/sda/device/model',
'/sys/class/block/sdb/device/model',
'/sys/class/block/sdc/device/model']
fobj_mock = mock.MagicMock()
mock_file_handle = mock.MagicMock()
mock_file_handle.__enter__.return_value = fobj_mock
open_mock.return_value = mock_file_handle
fobj_mock.read.side_effect = ['scsi disk', Exception, 'Virtual Media']
vmedia_device_returned = utils._get_vmedia_device()
self.assertEqual('sdc', vmedia_device_returned)
@mock.patch.object(shutil, 'rmtree', autospec=True)
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'mkdir')
@mock.patch.object(utils, 'execute')
[docs] def test__get_vmedia_params_by_label_lower_case(
self, execute_mock, mkdir_mock, exists_mock, read_params_mock,
mkdtemp_mock, rmtree_mock):
mkdtemp_mock.return_value = "/tempdir"
null_output = ["", ""]
expected_params = {'a': 'b'}
read_params_mock.return_value = expected_params
exists_mock.side_effect = [True, False]
execute_mock.side_effect = [null_output, null_output]
returned_params = utils._get_vmedia_params()
execute_mock.assert_any_call('mount', "/dev/disk/by-label/ir-vfd-dev",
"/tempdir")
read_params_mock.assert_called_once_with("/tempdir/parameters.txt")
exists_mock.assert_called_once_with("/dev/disk/by-label/ir-vfd-dev")
execute_mock.assert_any_call('umount', "/tempdir")
self.assertEqual(expected_params, returned_params)
mkdtemp_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with("/tempdir")
@mock.patch.object(shutil, 'rmtree', autospec=True)
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'mkdir')
@mock.patch.object(utils, 'execute')
[docs] def test__get_vmedia_params_by_label_upper_case(
self, execute_mock, mkdir_mock, exists_mock, read_params_mock,
mkdtemp_mock, rmtree_mock):
mkdtemp_mock.return_value = "/tempdir"
null_output = ["", ""]
expected_params = {'a': 'b'}
read_params_mock.return_value = expected_params
exists_mock.side_effect = [False, True]
execute_mock.side_effect = [null_output, null_output]
returned_params = utils._get_vmedia_params()
execute_mock.assert_any_call('mount', "/dev/disk/by-label/IR-VFD-DEV",
"/tempdir")
read_params_mock.assert_called_once_with("/tempdir/parameters.txt")
exists_mock.assert_has_calls(
[mock.call("/dev/disk/by-label/ir-vfd-dev"),
mock.call("/dev/disk/by-label/IR-VFD-DEV")])
execute_mock.assert_any_call('umount', "/tempdir")
self.assertEqual(expected_params, returned_params)
mkdtemp_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with("/tempdir")
@mock.patch.object(shutil, 'rmtree', autospec=True)
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(utils, '_get_vmedia_device')
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'mkdir')
@mock.patch.object(utils, 'execute')
[docs] def test__get_vmedia_params_by_device(self, execute_mock, mkdir_mock,
exists_mock, read_params_mock,
get_device_mock, mkdtemp_mock,
rmtree_mock):
mkdtemp_mock.return_value = "/tempdir"
null_output = ["", ""]
expected_params = {'a': 'b'}
read_params_mock.return_value = expected_params
exists_mock.side_effect = [False, False]
execute_mock.side_effect = [null_output, null_output]
get_device_mock.return_value = "sda"
returned_params = utils._get_vmedia_params()
exists_mock.assert_has_calls(
[mock.call("/dev/disk/by-label/ir-vfd-dev"),
mock.call("/dev/disk/by-label/IR-VFD-DEV")])
execute_mock.assert_any_call('mount', "/dev/sda",
"/tempdir")
read_params_mock.assert_called_once_with("/tempdir/parameters.txt")
execute_mock.assert_any_call('umount', "/tempdir")
self.assertEqual(expected_params, returned_params)
mkdtemp_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with("/tempdir")
@mock.patch.object(utils, '_get_vmedia_device')
@mock.patch.object(os.path, 'exists')
[docs] def test__get_vmedia_params_cannot_find_dev(self, exists_mock,
get_device_mock):
get_device_mock.return_value = None
exists_mock.return_value = False
self.assertRaises(errors.VirtualMediaBootError,
utils._get_vmedia_params)
@mock.patch.object(shutil, 'rmtree', autospec=True)
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(utils, '_get_vmedia_device')
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'mkdir')
@mock.patch.object(utils, 'execute')
[docs] def test__get_vmedia_params_mount_fails(self, execute_mock,
mkdir_mock, exists_mock,
read_params_mock,
get_device_mock, mkdtemp_mock,
rmtree_mock):
mkdtemp_mock.return_value = "/tempdir"
expected_params = {'a': 'b'}
exists_mock.return_value = True
read_params_mock.return_value = expected_params
get_device_mock.return_value = "sda"
execute_mock.side_effect = processutils.ProcessExecutionError()
self.assertRaises(errors.VirtualMediaBootError,
utils._get_vmedia_params)
execute_mock.assert_any_call('mount', "/dev/disk/by-label/ir-vfd-dev",
"/tempdir")
mkdtemp_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with("/tempdir")
@mock.patch.object(shutil, 'rmtree', autospec=True)
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(utils, '_get_vmedia_device')
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'mkdir')
@mock.patch.object(utils, 'execute')
[docs] def test__get_vmedia_params_umount_fails(self, execute_mock, mkdir_mock,
exists_mock, read_params_mock,
get_device_mock, mkdtemp_mock,
rmtree_mock):
mkdtemp_mock.return_value = "/tempdir"
null_output = ["", ""]
expected_params = {'a': 'b'}
exists_mock.return_value = True
read_params_mock.return_value = expected_params
get_device_mock.return_value = "sda"
execute_mock.side_effect = [null_output,
processutils.ProcessExecutionError()]
returned_params = utils._get_vmedia_params()
execute_mock.assert_any_call('mount', "/dev/disk/by-label/ir-vfd-dev",
"/tempdir")
read_params_mock.assert_called_once_with("/tempdir/parameters.txt")
execute_mock.assert_any_call('umount', "/tempdir")
self.assertEqual(expected_params, returned_params)
mkdtemp_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with("/tempdir")
@mock.patch.object(shutil, 'rmtree', autospec=True)
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(utils, '_get_vmedia_device')
@mock.patch.object(utils, '_read_params_from_file')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'mkdir')
@mock.patch.object(utils, 'execute')
[docs] def test__get_vmedia_params_rmtree_fails(self, execute_mock, mkdir_mock,
exists_mock, read_params_mock,
get_device_mock, mkdtemp_mock,
rmtree_mock):
mkdtemp_mock.return_value = "/tempdir"
rmtree_mock.side_effect = Exception
null_output = ["", ""]
expected_params = {'a': 'b'}
exists_mock.return_value = True
read_params_mock.return_value = expected_params
get_device_mock.return_value = "sda"
execute_mock.return_value = null_output
returned_params = utils._get_vmedia_params()
execute_mock.assert_any_call('mount', "/dev/disk/by-label/ir-vfd-dev",
"/tempdir")
read_params_mock.assert_called_once_with("/tempdir/parameters.txt")
execute_mock.assert_any_call('umount', "/tempdir")
self.assertEqual(expected_params, returned_params)
mkdtemp_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with("/tempdir")
@mock.patch.object(utils, 'get_agent_params')
[docs] def test_parse_root_device_hints(self, mock_get_params):
mock_get_params.return_value = {
'root_device': 'vendor=SpongeBob,model=Square%20Pants',
'ipa-api-url': 'http://1.2.3.4:1234'
}
expected = {'vendor': 'spongebob', 'model': 'square pants'}
result = utils.parse_root_device_hints()
self.assertEqual(expected, result)
@mock.patch.object(utils, 'get_agent_params')
[docs] def test_parse_root_device_hints_no_hints(self, mock_get_params):
mock_get_params.return_value = {
'ipa-api-url': 'http://1.2.3.4:1234'
}
result = utils.parse_root_device_hints()
self.assertEqual({}, result)
@mock.patch.object(utils, 'get_agent_params')
[docs] def test_parse_root_device_size(self, mock_get_params):
mock_get_params.return_value = {
'root_device': 'size=12345',
'ipa-api-url': 'http://1.2.3.4:1234'
}
result = utils.parse_root_device_hints()
self.assertEqual(12345, result['size'])
@mock.patch.object(utils, 'get_agent_params')
[docs] def test_parse_root_device_not_supported(self, mock_get_params):
mock_get_params.return_value = {
'root_device': 'foo=bar,size=12345',
'ipa-api-url': 'http://1.2.3.4:1234'
}
self.assertRaises(errors.DeviceNotFound,
utils.parse_root_device_hints)
[docs]class TestFailures(testtools.TestCase):
[docs] def test_get_error(self):
f = utils.AccumulatedFailures()
self.assertFalse(f)
self.assertIsNone(f.get_error())
f.add('foo')
f.add('%s', 'bar')
f.add(RuntimeError('baz'))
self.assertTrue(f)
exp = ('The following errors were encountered:\n* foo\n* bar\n* baz')
self.assertEqual(exp, f.get_error())
[docs] def test_raise(self):
class FakeException(Exception):
pass
f = utils.AccumulatedFailures(exc_class=FakeException)
self.assertIsNone(f.raise_if_needed())
f.add('foo')
self.assertRaisesRegexp(FakeException, 'foo', f.raise_if_needed)