# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
import json
import os
import subprocess
import sys
import tempfile
import mock
import six.moves.builtins as __builtin__
from ironicclient.common import utils
from ironicclient import exc
from ironicclient.tests.unit import utils as test_utils
[docs]class UtilsTest(test_utils.BaseTestCase):
[docs]    def test_key_value_pairs_to_dict(self):
        kv_list = ['str=foo', 'int=1', 'bool=true',
                   'list=[1, 2, 3]', 'dict={"foo": "bar"}']
        d = utils.key_value_pairs_to_dict(kv_list)
        self.assertEqual(
            {'str': 'foo', 'int': 1, 'bool': True,
             'list': [1, 2, 3], 'dict': {'foo': 'bar'}},
            d) 
[docs]    def test_key_value_pairs_to_dict_nothing(self):
        self.assertEqual({}, utils.key_value_pairs_to_dict(None))
        self.assertEqual({}, utils.key_value_pairs_to_dict([])) 
[docs]    def test_args_array_to_dict(self):
        my_args = {
            'matching_metadata': ['str=foo', 'int=1', 'bool=true',
                                  'list=[1, 2, 3]', 'dict={"foo": "bar"}'],
            'other': 'value'
        }
        cleaned_dict = utils.args_array_to_dict(my_args,
                                                "matching_metadata")
        self.assertEqual({
            'matching_metadata': {'str': 'foo', 'int': 1, 'bool': True,
                                  'list': [1, 2, 3], 'dict': {'foo': 'bar'}},
            'other': 'value'
        }, cleaned_dict) 
[docs]    def test_args_array_to_patch(self):
        my_args = {
            'attributes': ['str=foo', 'int=1', 'bool=true',
                           'list=[1, 2, 3]', 'dict={"foo": "bar"}'],
            'op': 'add',
        }
        patch = utils.args_array_to_patch(my_args['op'],
                                          my_args['attributes'])
        self.assertEqual([{'op': 'add', 'value': 'foo', 'path': '/str'},
                          {'op': 'add', 'value': 1, 'path': '/int'},
                          {'op': 'add', 'value': True, 'path': '/bool'},
                          {'op': 'add', 'value': [1, 2, 3], 'path': '/list'},
                          {'op': 'add', 'value': {"foo": "bar"},
                           'path': '/dict'}], patch) 
[docs]    def test_args_array_to_patch_remove(self):
        my_args = {
            'attributes': ['/foo', 'extra/bar'],
            'op': 'remove',
        }
        patch = utils.args_array_to_patch(my_args['op'],
                                          my_args['attributes'])
        self.assertEqual([{'op': 'remove', 'path': '/foo'},
                          {'op': 'remove', 'path': '/extra/bar'}], patch) 
[docs]    def test_split_and_deserialize(self):
        ret = utils.split_and_deserialize('str=foo')
        self.assertEqual(('str', 'foo'), ret)
        ret = utils.split_and_deserialize('int=1')
        self.assertEqual(('int', 1), ret)
        ret = utils.split_and_deserialize('bool=false')
        self.assertEqual(('bool', False), ret)
        ret = utils.split_and_deserialize('list=[1, "foo", 2]')
        self.assertEqual(('list', [1, "foo", 2]), ret)
        ret = utils.split_and_deserialize('dict={"foo": 1}')
        self.assertEqual(('dict', {"foo": 1}), ret)
        ret = utils.split_and_deserialize('str_int="1"')
        self.assertEqual(('str_int', "1"), ret) 
[docs]    def test_split_and_deserialize_fail(self):
        self.assertRaises(exc.CommandError,
                          utils.split_and_deserialize, 'foo:bar') 
[docs]    def test_bool_arg_value(self):
        self.assertTrue(utils.bool_argument_value('arg', 'y', strict=True))
        self.assertTrue(utils.bool_argument_value('arg', 'TrUe', strict=True))
        self.assertTrue(utils.bool_argument_value('arg', '1', strict=True))
        self.assertTrue(utils.bool_argument_value('arg', 1, strict=True))
        self.assertFalse(utils.bool_argument_value('arg', '0', strict=True))
        self.assertFalse(utils.bool_argument_value('arg', 'No', strict=True))
        self.assertRaises(exc.CommandError,
                          utils.bool_argument_value, 'arg', 'ee', strict=True)
        self.assertFalse(utils.bool_argument_value('arg', 'ee', strict=False))
        self.assertTrue(utils.bool_argument_value('arg', 'ee', strict=False,
                                                  default=True))
        # No check that default is a Boolean...
        self.assertEqual('foo', utils.bool_argument_value('arg', 'ee',
                         strict=False, default='foo')) 
[docs]    def test_check_for_invalid_fields(self):
        self.assertIsNone(utils.check_for_invalid_fields(
                          ['a', 'b'], ['a', 'b', 'c']))
        # 'd' is not a valid field
        self.assertRaises(exc.CommandError, utils.check_for_invalid_fields,
                          ['a', 'd'], ['a', 'b', 'c'])  
[docs]class CommonParamsForListTest(test_utils.BaseTestCase):
[docs]    def setUp(self):
        super(CommonParamsForListTest, self).setUp()
        self.args = mock.Mock(marker=None, limit=None, sort_key=None,
                              sort_dir=None, detail=False, fields=None,
                              spec=True)
        self.expected_params = {'detail': False} 
[docs]    def test_nothing_set(self):
        self.assertEqual(self.expected_params,
                         utils.common_params_for_list(self.args, [], [])) 
[docs]    def test_marker_and_limit(self):
        self.args.marker = 'foo'
        self.args.limit = 42
        self.expected_params.update({'marker': 'foo', 'limit': 42})
        self.assertEqual(self.expected_params,
                         utils.common_params_for_list(self.args, [], [])) 
[docs]    def test_invalid_limit(self):
        self.args.limit = -42
        self.assertRaises(exc.CommandError,
                          utils.common_params_for_list,
                          self.args, [], []) 
[docs]    def test_sort_key_and_sort_dir(self):
        self.args.sort_key = 'field'
        self.args.sort_dir = 'desc'
        self.expected_params.update({'sort_key': 'field', 'sort_dir': 'desc'})
        self.assertEqual(self.expected_params,
                         utils.common_params_for_list(self.args,
                                                      ['field'],
                                                      [])) 
[docs]    def test_sort_key_allows_label(self):
        self.args.sort_key = 'Label'
        self.expected_params.update({'sort_key': 'field'})
        self.assertEqual(self.expected_params,
                         utils.common_params_for_list(self.args,
                                                      ['field', 'field2'],
                                                      ['Label', 'Label2'])) 
[docs]    def test_sort_key_invalid(self):
        self.args.sort_key = 'something'
        self.assertRaises(exc.CommandError,
                          utils.common_params_for_list,
                          self.args,
                          ['field', 'field2'],
                          []) 
[docs]    def test_sort_dir_invalid(self):
        self.args.sort_dir = 'something'
        self.assertRaises(exc.CommandError,
                          utils.common_params_for_list,
                          self.args,
                          [],
                          []) 
[docs]    def test_detail(self):
        self.args.detail = True
        self.expected_params['detail'] = True
        self.assertEqual(self.expected_params,
                         utils.common_params_for_list(self.args, [], [])) 
[docs]    def test_fields(self):
        self.args.fields = [['a', 'b', 'c']]
        self.expected_params.update({'fields': ['a', 'b', 'c']})
        self.assertEqual(self.expected_params,
                         utils.common_params_for_list(self.args, [], []))  
[docs]class CommonFiltersTest(test_utils.BaseTestCase):
[docs]    def test_limit(self):
        result = utils.common_filters(limit=42)
        self.assertEqual(['limit=42'], result) 
[docs]    def test_limit_0(self):
        result = utils.common_filters(limit=0)
        self.assertEqual([], result) 
[docs]    def test_other(self):
        for key in ('marker', 'sort_key', 'sort_dir'):
            result = utils.common_filters(**{key: 'test'})
            self.assertEqual(['%s=test' % key], result) 
[docs]    def test_fields(self):
        result = utils.common_filters(fields=['a', 'b', 'c'])
        self.assertEqual(['fields=a,b,c'], result)  
@mock.patch.object(subprocess, 'Popen')
[docs]class MakeConfigDriveTest(test_utils.BaseTestCase):
[docs]    def setUp(self):
        super(MakeConfigDriveTest, self).setUp()
        # expected genisoimage cmd
        self.genisoimage_cmd = ['genisoimage', '-o', mock.ANY,
                                '-ldots', '-allow-lowercase',
                                '-allow-multidot', '-l',
                                '-publisher', 'ironicclient-configdrive 0.1',
                                '-quiet', '-J', '-r', '-V',
                                'config-2', mock.ANY] 
[docs]    def test_make_configdrive(self, mock_popen):
        fake_process = mock.Mock(returncode=0)
        fake_process.communicate.return_value = ('', '')
        mock_popen.return_value = fake_process
        with utils.tempdir() as dirname:
            utils.make_configdrive(dirname)
        mock_popen.assert_called_once_with(self.genisoimage_cmd,
                                           stderr=subprocess.PIPE,
                                           stdout=subprocess.PIPE)
        fake_process.communicate.assert_called_once_with() 
    @mock.patch.object(os, 'access')
[docs]    def test_make_configdrive_non_readable_dir(self, mock_access, mock_popen):
        mock_access.return_value = False
        self.assertRaises(exc.CommandError, utils.make_configdrive, 'fake-dir')
        mock_access.assert_called_once_with('fake-dir', os.R_OK)
        self.assertFalse(mock_popen.called) 
    @mock.patch.object(os, 'access')
[docs]    def test_make_configdrive_oserror(self, mock_access, mock_popen):
        mock_access.return_value = True
        mock_popen.side_effect = OSError('boom')
        self.assertRaises(exc.CommandError, utils.make_configdrive, 'fake-dir')
        mock_access.assert_called_once_with('fake-dir', os.R_OK)
        mock_popen.assert_called_once_with(self.genisoimage_cmd,
                                           stderr=subprocess.PIPE,
                                           stdout=subprocess.PIPE) 
    @mock.patch.object(os, 'access')
[docs]    def test_make_configdrive_non_zero_returncode(self, mock_access,
                                                  mock_popen):
        fake_process = mock.Mock(returncode=123)
        fake_process.communicate.return_value = ('', '')
        mock_popen.return_value = fake_process
        self.assertRaises(exc.CommandError, utils.make_configdrive, 'fake-dir')
        mock_access.assert_called_once_with('fake-dir', os.R_OK)
        mock_popen.assert_called_once_with(self.genisoimage_cmd,
                                           stderr=subprocess.PIPE,
                                           stdout=subprocess.PIPE)
        fake_process.communicate.assert_called_once_with()  
[docs]class GetFromStdinTest(test_utils.BaseTestCase):
    @mock.patch.object(sys, 'stdin', autospec=True)
[docs]    def test_get_from_stdin(self, mock_stdin):
        contents = '[{"step": "upgrade", "interface": "deploy"}]'
        mock_stdin.read.return_value = contents
        desc = 'something'
        info = utils.get_from_stdin(desc)
        self.assertEqual(info, contents)
        mock_stdin.read.assert_called_once_with() 
    @mock.patch.object(sys, 'stdin', autospec=True)
[docs]    def test_get_from_stdin_fail(self, mock_stdin):
        mock_stdin.read.side_effect = IOError
        desc = 'something'
        self.assertRaises(exc.InvalidAttribute, utils.get_from_stdin, desc)
        mock_stdin.read.assert_called_once_with()  
[docs]class HandleJsonFileTest(test_utils.BaseTestCase):
[docs]    def test_handle_json_or_file_arg(self):
        cleansteps = '[{"step": "upgrade", "interface": "deploy"}]'
        steps = utils.handle_json_or_file_arg(cleansteps)
        self.assertEqual(json.loads(cleansteps), steps) 
[docs]    def test_handle_json_or_file_arg_bad_json(self):
        cleansteps = 'foo'
        self.assertRaisesRegex(exc.InvalidAttribute,
                               'For JSON',
                               utils.handle_json_or_file_arg, cleansteps) 
[docs]    def test_handle_json_or_file_arg_file(self):
        contents = '[{"step": "upgrade", "interface": "deploy"}]'
        with tempfile.NamedTemporaryFile(mode='w') as f:
            f.write(contents)
            f.flush()
            steps = utils.handle_json_or_file_arg(f.name)
        self.assertEqual(json.loads(contents), steps) 
    @mock.patch.object(__builtin__, 'open', autospec=True)
[docs]    def test_handle_json_or_file_arg_file_fail(self, mock_open):
        mock_file_object = mock.MagicMock()
        mock_file_handle = mock.MagicMock()
        mock_file_handle.__enter__.return_value = mock_file_object
        mock_open.return_value = mock_file_handle
        mock_file_object.read.side_effect = IOError
        with tempfile.NamedTemporaryFile(mode='w') as f:
            self.assertRaisesRegex(exc.InvalidAttribute,
                                   "from file",
                                   utils.handle_json_or_file_arg, f.name)
            mock_open.assert_called_once_with(f.name, 'r')
            mock_file_object.read.assert_called_once_with()