Source code for ceilometer.tests.unit.network.services.test_fwaas

#
# Copyright 2014 Cisco Systems,Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import mock
from oslotest import base
from oslotest import mockpatch

from ceilometer.agent import manager
from ceilometer.agent import plugin_base
from ceilometer.network.services import discovery
from ceilometer.network.services import fwaas


class _BaseTestFWPollster(base.BaseTestCase):

    @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
    def setUp(self):
        super(_BaseTestFWPollster, self).setUp()
        self.addCleanup(mock.patch.stopall)
        self.manager = manager.AgentManager()
        plugin_base._get_keystone = mock.Mock()
        catalog = (plugin_base._get_keystone.session.auth.get_access.
                   return_value.service_catalog)
        catalog.get_endpoints = mock.MagicMock(
            return_value={'network': mock.ANY})


[docs]class TestFirewallPollster(_BaseTestFWPollster):
[docs] def setUp(self): super(TestFirewallPollster, self).setUp() self.pollster = fwaas.FirewallPollster() fake_fw = self.fake_get_fw_service() self.useFixture(mockpatch.Patch('ceilometer.neutron_client.Client.' 'firewall_get_all', return_value=fake_fw))
@staticmethod
[docs] def fake_get_fw_service(): return [{'status': 'ACTIVE', 'name': 'myfw', 'description': '', 'admin_state_up': True, 'id': 'fdde3d818-fdcb-fg4b-de7f-6750dc8a9d7a', 'firewall_policy_id': 'bbe3d818-bdcb-4e4b-b47f-5650dc8a9d7a', 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa'}, {'status': 'INACTIVE', 'name': 'myfw', 'description': '', 'admin_state_up': True, 'id': 'fdde3d818-fdcb-fg4b-de7f-6750dc8a9d7a', 'firewall_policy_id': 'bbe3d818-bdcb-4e4b-b47f-5650dc8a9d7a', 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa'}, {'status': 'PENDING_CREATE', 'name': 'myfw', 'description': '', 'admin_state_up': True, 'id': 'fdde3d818-fdcb-fg4b-de7f-6750dc8a9d7a', 'firewall_policy_id': 'bbe3d818-bdcb-4e4b-b47f-5650dc8a9d7a', 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa'}, {'status': 'error', 'name': 'myfw', 'description': '', 'admin_state_up': True, 'id': 'fdde3d818-fdcb-fg4b-de7f-6750dc8a9d7a', 'firewall_policy_id': 'bbe3d818-bdcb-4e4b-b47f-5650dc8a9d7a', 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa'}, ]
[docs] def test_fw_get_samples(self): samples = list(self.pollster.get_samples( self.manager, {}, resources=self.fake_get_fw_service())) self.assertEqual(4, len(samples)) for field in self.pollster.FIELDS: self.assertEqual(self.fake_get_fw_service()[0][field], samples[0].resource_metadata[field])
[docs] def test_vpn_volume(self): samples = list(self.pollster.get_samples( self.manager, {}, resources=self.fake_get_fw_service())) self.assertEqual(1, samples[0].volume) self.assertEqual(0, samples[1].volume) self.assertEqual(2, samples[2].volume)
[docs] def test_get_vpn_meter_names(self): samples = list(self.pollster.get_samples( self.manager, {}, resources=self.fake_get_fw_service())) self.assertEqual(set(['network.services.firewall']), set([s.name for s in samples]))
[docs] def test_vpn_discovery(self): discovered_fws = discovery.FirewallDiscovery().discover(self.manager) self.assertEqual(3, len(discovered_fws)) for vpn in self.fake_get_fw_service(): if vpn['status'] == 'error': self.assertNotIn(vpn, discovered_fws) else: self.assertIn(vpn, discovered_fws)
[docs]class TestIPSecConnectionsPollster(_BaseTestFWPollster):
[docs] def setUp(self): super(TestIPSecConnectionsPollster, self).setUp() self.pollster = fwaas.FirewallPolicyPollster() fake_fw_policy = self.fake_get_fw_policy() self.useFixture(mockpatch.Patch('ceilometer.neutron_client.Client.' 'fw_policy_get_all', return_value=fake_fw_policy))
@staticmethod
[docs] def fake_get_fw_policy(): return [{'name': 'my_fw_policy', 'description': 'fw_policy', 'admin_state_up': True, 'tenant_id': 'abe3d818-fdcb-fg4b-de7f-6650dc8a9d7a', 'firewall_rules': [{'enabled': True, 'action': 'allow', 'ip_version': 4, 'protocol': 'tcp', 'destination_port': '80', 'source_ip_address': '10.24.4.2'}, {'enabled': True, 'action': 'deny', 'ip_version': 4, 'protocol': 'tcp', 'destination_port': '22'}], 'shared': True, 'audited': True, 'id': 'fdfbcec-fdcb-fg4b-de7f-6650dc8a9d7a'} ]
[docs] def test_policy_get_samples(self): samples = list(self.pollster.get_samples( self.manager, {}, resources=self.fake_get_fw_policy())) self.assertEqual(1, len(samples)) for field in self.pollster.FIELDS: self.assertEqual(self.fake_get_fw_policy()[0][field], samples[0].resource_metadata[field])
[docs] def test_get_policy_meter_names(self): samples = list(self.pollster.get_samples( self.manager, {}, resources=self.fake_get_fw_policy())) self.assertEqual(set(['network.services.firewall.policy']), set([s.name for s in samples]))
[docs] def test_fw_policy_discovery(self): discovered_policy = discovery.FirewallPolicyDiscovery().discover( self.manager) self.assertEqual(1, len(discovered_policy)) self.assertEqual(self.fake_get_fw_policy(), discovered_policy)

Project Source