# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
import netaddr
import testtools
from tempest.api.network import base_security_groups as sec_base
from tempest.common import custom_matchers
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
CONF = config.CONF
[docs]
class PortsTestJSON(sec_base.BaseSecGroupTest):
"""Test the following operations for ports:
port create
port delete
port list
port show
port update
"""
@classmethod
def resource_setup(cls):
super(PortsTestJSON, cls).resource_setup()
cls.network = cls.create_network()
cls.port = cls.create_port(cls.network)
def _delete_port(self, port_id):
self.ports_client.delete_port(port_id)
body = self.ports_client.list_ports()
ports_list = body['ports']
self.assertFalse(port_id in [n['id'] for n in ports_list])
def _create_subnet(self, network, gateway='',
cidr=None, mask_bits=None, **kwargs):
subnet = self.create_subnet(
network, gateway, cidr, mask_bits, **kwargs)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.subnets_client.delete_subnet, subnet['id'])
return subnet
def _create_network(self, network_name=None, **kwargs):
network_name = network_name or data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix)
network = self.networks_client.create_network(
name=network_name, **kwargs)['network']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.networks_client.delete_network,
network['id'])
return network
[docs]
@decorators.attr(type='smoke')
@decorators.idempotent_id('c72c1c0c-2193-4aca-aaa4-b1442640f51c')
def test_create_update_delete_port(self):
"""Test creating, updating and deleting port"""
# Verify port creation
body = self.ports_client.create_port(
network_id=self.network['id'],
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix))
port = body['port']
# Schedule port deletion with verification upon test completion
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port['id'])
self.addCleanup(self._delete_port, port['id'])
self.assertTrue(port['admin_state_up'])
# Verify port update
new_name = "New_Port"
body = self.ports_client.update_port(port['id'],
name=new_name,
admin_state_up=False)
updated_port = body['port']
self.assertEqual(updated_port['name'], new_name)
self.assertFalse(updated_port['admin_state_up'])
[docs]
@decorators.idempotent_id('67f1b811-f8db-43e2-86bd-72c074d4a42c')
def test_create_bulk_port(self):
"""Test creating multiple ports in a single request"""
network1 = self.network
network2 = self._create_network()
network_list = [network1['id'], network2['id']]
port_list = [{'network_id': net_id} for net_id in network_list]
body = self.ports_client.create_bulk_ports(ports=port_list)
created_ports = body['ports']
port1 = created_ports[0]
port2 = created_ports[1]
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port1['id'])
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port2['id'])
self.addCleanup(self._delete_port, port1['id'])
self.addCleanup(self._delete_port, port2['id'])
self.assertEqual(port1['network_id'], network1['id'])
self.assertEqual(port2['network_id'], network2['id'])
self.assertTrue(port1['admin_state_up'])
self.assertTrue(port2['admin_state_up'])
[docs]
@decorators.attr(type='smoke')
@decorators.idempotent_id('0435f278-40ae-48cb-a404-b8a087bc09b1')
def test_create_port_in_allowed_allocation_pools(self):
"""Test creating port in allowed allocation pools"""
network = self._create_network()
net_id = network['id']
address = self.cidr
address.prefixlen = self.mask_bits
if ((address.version == 4 and address.prefixlen >= 30) or
(address.version == 6 and address.prefixlen >= 126)):
msg = ("Subnet %s isn't large enough for the test" % address.cidr)
raise exceptions.InvalidConfiguration(msg)
allocation_pools = {'allocation_pools': [{'start': str(address[2]),
'end': str(address[-2])}]}
self._create_subnet(network, cidr=address,
mask_bits=address.prefixlen,
**allocation_pools)
body = self.ports_client.create_port(
network_id=net_id,
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix))
self.addCleanup(self.ports_client.wait_for_resource_deletion,
body['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, body['port']['id'])
port = body['port']
ip_address = port['fixed_ips'][0]['ip_address']
start_ip_address = allocation_pools['allocation_pools'][0]['start']
end_ip_address = allocation_pools['allocation_pools'][0]['end']
ip_range = netaddr.IPRange(start_ip_address, end_ip_address)
self.assertIn(ip_address, ip_range)
self.ports_client.delete_port(port['id'])
[docs]
@decorators.attr(type='smoke')
@decorators.idempotent_id('c9a685bd-e83f-499c-939f-9f7863ca259f')
def test_show_port(self):
"""Verify the details of port"""
body = self.ports_client.show_port(self.port['id'])
port = body['port']
self.assertIn('id', port)
# NOTE(rfolco): created_at and updated_at may get inconsistent values
# due to possible delay between POST request and resource creation.
# TODO(rfolco): Neutron Bug #1365341 is fixed, can remove the key
# extra_dhcp_opts in the O release (K/L gate jobs still need it).
self.assertThat(self.port,
custom_matchers.MatchesDictExceptForKeys
(port, excluded_keys=['extra_dhcp_opts',
'created_at',
'updated_at']))
[docs]
@decorators.idempotent_id('45fcdaf2-dab0-4c13-ac6c-fcddfb579dbd')
def test_show_port_fields(self):
"""Verify specific fields of a port"""
fields = ['id', 'mac_address']
body = self.ports_client.show_port(self.port['id'],
fields=fields)
port = body['port']
self.assertEqual(sorted(port.keys()), sorted(fields))
for field_name in fields:
self.assertEqual(port[field_name], self.port[field_name])
[docs]
@decorators.attr(type='smoke')
@decorators.idempotent_id('cf95b358-3e92-4a29-a148-52445e1ac50e')
def test_list_ports(self):
"""Verify the port exists in the list of all ports"""
body = self.ports_client.list_ports()
ports = [port['id'] for port in body['ports']
if port['id'] == self.port['id']]
self.assertNotEmpty(ports, "Created port not found in the list")
[docs]
@decorators.idempotent_id('e7fe260b-1e79-4dd3-86d9-bec6a7959fc5')
def test_port_list_filter_by_ip(self):
"""Test listing ports filtered by ip"""
# Create network and subnet
network = self._create_network()
self._create_subnet(network)
# Create two ports
port_1 = self.ports_client.create_port(
network_id=network['id'],
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix))
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port_1['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, port_1['port']['id'])
port_2 = self.ports_client.create_port(
network_id=network['id'],
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix))
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port_2['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, port_2['port']['id'])
# List ports filtered by fixed_ips
port_1_fixed_ip = port_1['port']['fixed_ips'][0]['ip_address']
fixed_ips = 'ip_address=' + port_1_fixed_ip
port_list = self.ports_client.list_ports(fixed_ips=fixed_ips)
# Check that we got the desired port
ports = port_list['ports']
project_ids = set([port['project_id'] for port in ports])
self.assertEqual(len(project_ids), 1,
'Ports from multiple projects are in the list resp')
port_ids = [port['id'] for port in ports]
fixed_ips = [port['fixed_ips'] for port in ports]
port_net_ids = [port['network_id'] for port in ports]
self.assertIn(port_1['port']['id'], port_ids)
self.assertIn(network['id'], port_net_ids)
# Check that every port has a fixed_ip that matches the query
for addr in fixed_ips:
port_ips = [port['ip_address'] for port in addr]
self.assertIn(port_1_fixed_ip, port_ips,
'Port not matching IP filter found')
[docs]
@decorators.idempotent_id('79895408-85d5-460d-94e7-9531c5fd9123')
@testtools.skipUnless(
utils.is_extension_enabled('ip-substring-filtering', 'network'),
'ip-substring-filtering extension not enabled.')
def test_port_list_filter_by_ip_substr(self):
"""Test listing ports filtered by part of ip address string"""
# Create network and subnet
network = self._create_network()
subnet = self._create_subnet(network)
# Get two IP addresses
ip_address_1 = None
ip_address_2 = None
ip_network = ipaddress.ip_network(str(subnet['cidr']))
for ip in ip_network:
if ip == ip_network.network_address:
continue
if ip_address_1 is None:
ip_address_1 = str(ip)
else:
ip_address_2 = ip_address_1
ip_address_1 = str(ip)
# Make sure these two IP addresses have different substring
if ip_address_1[:-1] != ip_address_2[:-1]:
break
# Create two ports
fixed_ips = [{'subnet_id': subnet['id'], 'ip_address': ip_address_1}]
port_1 = self.ports_client.create_port(
network_id=network['id'],
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix),
fixed_ips=fixed_ips)
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port_1['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, port_1['port']['id'])
fixed_ips = [{'subnet_id': subnet['id'], 'ip_address': ip_address_2}]
port_2 = self.ports_client.create_port(
network_id=network['id'],
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix),
fixed_ips=fixed_ips)
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port_2['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, port_2['port']['id'])
# Scenario 1: List port1 (port2 is filtered out)
if ip_address_1[:-1] != ip_address_2[:-1]:
ips_filter = 'ip_address_substr=' + ip_address_1[:-1]
else:
ips_filter = 'ip_address_substr=' + ip_address_1
ports = self.ports_client.list_ports(fixed_ips=ips_filter)['ports']
# Check that we got the desired port
port_ids = [port['id'] for port in ports]
fixed_ips = [port['fixed_ips'] for port in ports]
port_ips = []
for addr in fixed_ips:
port_ips.extend([a['ip_address'] for a in addr])
port_net_ids = [port['network_id'] for port in ports]
self.assertIn(network['id'], port_net_ids)
self.assertIn(port_1['port']['id'], port_ids)
self.assertIn(port_1['port']['fixed_ips'][0]['ip_address'], port_ips)
self.assertNotIn(port_2['port']['id'], port_ids)
self.assertNotIn(
port_2['port']['fixed_ips'][0]['ip_address'], port_ips)
# Scenario 2: List both port1 and port2
substr = ip_address_1
while substr not in ip_address_2:
substr = substr[:-1]
ips_filter = 'ip_address_substr=' + substr
ports = self.ports_client.list_ports(fixed_ips=ips_filter)['ports']
# Check that we got both port
port_ids = [port['id'] for port in ports]
fixed_ips = [port['fixed_ips'] for port in ports]
port_ips = []
for addr in fixed_ips:
port_ips.extend([a['ip_address'] for a in addr])
port_net_ids = [port['network_id'] for port in ports]
self.assertIn(network['id'], port_net_ids)
self.assertIn(port_1['port']['id'], port_ids)
self.assertIn(port_1['port']['fixed_ips'][0]['ip_address'], port_ips)
self.assertIn(port_2['port']['id'], port_ids)
self.assertIn(port_2['port']['fixed_ips'][0]['ip_address'], port_ips)
[docs]
@decorators.idempotent_id('5ad01ed0-0e6e-4c5d-8194-232801b15c72')
def test_port_list_filter_by_router_id(self):
"""Test listing ports filtered by router id"""
# Create a router
network = self._create_network()
self._create_subnet(network)
router = self.create_router()
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.routers_client.delete_router, router['id'])
port = self.ports_client.create_port(
network_id=network['id'],
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix))
# Add router interface to port created above
self.routers_client.add_router_interface(router['id'],
port_id=port['port']['id'])
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.routers_client.remove_router_interface,
router['id'], port_id=port['port']['id'])
# List ports filtered by router_id
port_list = self.ports_client.list_ports(device_id=router['id'])
ports = port_list['ports']
self.assertEqual(len(ports), 1)
self.assertEqual(ports[0]['id'], port['port']['id'])
self.assertEqual(ports[0]['device_id'], router['id'])
[docs]
@decorators.idempotent_id('ff7f117f-f034-4e0e-abff-ccef05c454b4')
def test_list_ports_fields(self):
"""Verify specific fields of ports"""
fields = ['id', 'mac_address']
body = self.ports_client.list_ports(fields=fields)
ports = body['ports']
self.assertNotEmpty(ports, "Port list returned is empty")
# Asserting the fields returned are correct
for port in ports:
self.assertEqual(sorted(fields), sorted(port.keys()))
[docs]
@decorators.idempotent_id('63aeadd4-3b49-427f-a3b1-19ca81f06270')
def test_create_update_port_with_second_ip(self):
"""Test updating port from 2 fixed ips to 1 fixed ip and vice versa"""
# Create a network with two subnets
network = self._create_network()
subnet_1 = self._create_subnet(network)
subnet_2 = self._create_subnet(network)
fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
fixed_ips = fixed_ip_1 + fixed_ip_2
# Create a port with multiple IP addresses
port = self.create_port(network,
fixed_ips=fixed_ips)
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, port['id'])
self.assertEqual(2, len(port['fixed_ips']))
check_fixed_ips = [subnet_1['id'], subnet_2['id']]
for item in port['fixed_ips']:
self.assertIn(item['subnet_id'], check_fixed_ips)
# Update the port to return to a single IP address
port = self.update_port(port, fixed_ips=fixed_ip_1)
self.assertEqual(1, len(port['fixed_ips']))
# Update the port with a second IP address from second subnet
port = self.update_port(port, fixed_ips=fixed_ips)
self.assertEqual(2, len(port['fixed_ips']))
def _update_port_with_security_groups(self, security_groups_names):
subnet_1 = self._create_subnet(self.network)
fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
security_groups_list = list()
sec_grps_client = self.security_groups_client
for name in security_groups_names:
group_create_body = sec_grps_client.create_security_group(
name=name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.security_groups_client.delete_security_group,
group_create_body['security_group']['id'])
security_groups_list.append(group_create_body['security_group']
['id'])
# Create a port
sec_grp_name = data_utils.rand_name(
name='secgroup', prefix=CONF.resource_name_prefix)
security_group = sec_grps_client.create_security_group(
name=sec_grp_name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.security_groups_client.delete_security_group,
security_group['security_group']['id'])
post_body = {
"name": data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix),
"security_groups": [security_group['security_group']['id']],
"network_id": self.network['id'],
"admin_state_up": True,
"fixed_ips": fixed_ip_1}
body = self.ports_client.create_port(**post_body)
self.addCleanup(self.ports_client.wait_for_resource_deletion,
body['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, body['port']['id'])
port = body['port']
# Update the port with security groups
subnet_2 = self.create_subnet(self.network)
fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
update_body = {
"name": data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix),
"admin_state_up": False,
"fixed_ips": fixed_ip_2,
"security_groups": security_groups_list}
body = self.ports_client.update_port(port['id'], **update_body)
port_show = body['port']
# Verify the security groups and other attributes updated to port
exclude_keys = set(port_show).symmetric_difference(update_body)
exclude_keys.add('fixed_ips')
exclude_keys.add('security_groups')
self.assertThat(port_show, custom_matchers.MatchesDictExceptForKeys(
update_body, exclude_keys))
self.assertEqual(fixed_ip_2[0]['subnet_id'],
port_show['fixed_ips'][0]['subnet_id'])
for security_group in security_groups_list:
self.assertIn(security_group, port_show['security_groups'])
[docs]
@decorators.idempotent_id('58091b66-4ff4-4cc1-a549-05d60c7acd1a')
@testtools.skipUnless(
utils.is_extension_enabled('security-group', 'network'),
'security-group extension not enabled.')
def test_update_port_with_security_group_and_extra_attributes(self):
"""Test updating port's security_group along with extra attributes
First we create a port with one security group, and then we update the
port's security_group, in the same update request we also change
the port's fixed ips.
"""
self._update_port_with_security_groups(
[data_utils.rand_name('secgroup',
prefix=CONF.resource_name_prefix)])
[docs]
@decorators.idempotent_id('edf6766d-3d40-4621-bc6e-2521a44c257d')
@testtools.skipUnless(
utils.is_extension_enabled('security-group', 'network'),
'security-group extension not enabled.')
def test_update_port_with_two_security_groups_and_extra_attributes(self):
"""Test updating port with two security_groups and extra attributes
First we create a port with one security group, and then we update the
port to two security_groups, in the same update request we also change
the port's fixed ips.
"""
self._update_port_with_security_groups(
[data_utils.rand_name('secgroup',
prefix=CONF.resource_name_prefix),
data_utils.rand_name('secgroup',
prefix=CONF.resource_name_prefix)])
[docs]
@decorators.idempotent_id('13e95171-6cbd-489c-9d7c-3f9c58215c18')
def test_create_show_delete_port_user_defined_mac(self):
"""Test creating port with user defined mac address"""
# Create a port for a legal mac
body = self.ports_client.create_port(
network_id=self.network['id'],
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix))
old_port = body['port']
free_mac_address = old_port['mac_address']
self.ports_client.delete_port(old_port['id'])
# Create a new port with user defined mac
body = self.ports_client.create_port(
network_id=self.network['id'],
mac_address=free_mac_address,
name=data_utils.rand_name(
self.__class__.__name__, prefix=CONF.resource_name_prefix))
self.addCleanup(self.ports_client.wait_for_resource_deletion,
body['port']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, body['port']['id'])
port = body['port']
body = self.ports_client.show_port(port['id'])
show_port = body['port']
self.assertEqual(free_mac_address,
show_port['mac_address'])
[docs]
@decorators.attr(type='smoke')
@decorators.idempotent_id('4179dcb9-1382-4ced-84fe-1b91c54f5735')
@testtools.skipUnless(
utils.is_extension_enabled('security-group', 'network'),
'security-group extension not enabled.')
def test_create_port_with_no_securitygroups(self):
"""Test creating port without security groups"""
network = self._create_network()
self._create_subnet(network)
port = self.create_port(network, security_groups=[])
self.addCleanup(self.ports_client.wait_for_resource_deletion,
port['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, port['id'])
self.assertIsNotNone(port['security_groups'])
self.assertEmpty(port['security_groups'])
[docs]
class PortsIpV6TestJSON(PortsTestJSON):
_ip_version = 6