# Copyright 2017 GoDaddy
# Copyright 2017 Catalyst IT Ltd
# Copyright 2018 Rackspace US Inc. All rights reserved.
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import ipaddress
import requests
import socket
import time
from urllib.parse import urlparse
from oslo_log import log as logging
from tempest import config
from tempest.lib import exceptions
from tempest import test
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.common import requests_adapters
CONF = config.CONF
LOG = logging.getLogger(__name__)
[docs]
class ValidatorsMixin(test.BaseTestCase):
[docs]
@staticmethod
def validate_URL_response(
URL, expected_status_code=200, requests_session=None,
expected_body=None, HTTPS_verify=True, client_cert_path=None,
CA_certs_path=None, source_port=None,
request_interval=CONF.load_balancer.build_interval,
request_timeout=CONF.load_balancer.build_timeout):
"""Check a URL response (HTTP or HTTPS).
:param URL: The URL to query.
:param expected_status_code: The expected HTTP status code.
:param requests_session: A requests session to use for the request.
If None, a new session will be created.
:param expected_body: The expected response text, None will not
compare.
:param HTTPS_verify: Should we verify the HTTPS server.
:param client_cert_path: Filesystem path to a file with the client
private key and certificate.
:param CA_certs_path: Filesystem path to a file containing CA
certificates to use for HTTPS validation.
:param source_port: If set, the request will come from this source port
number. If None, a random port will be used.
:param request_interval: Time, in seconds, to timeout a request.
:param request_timeout: The maximum time, in seconds, to attempt
requests. Failed validation of expected
results does not result in a retry.
:raises InvalidHttpSuccessCode: The expected_status_code did not match.
:raises InvalidHTTPResponseBody: The response body did not match the
expected content.
:raises TimeoutException: The request timed out.
:returns: The response data.
"""
session = requests_session
if requests_session is None:
session = requests.Session()
if source_port:
session.mount('http://',
requests_adapters.SourcePortAdapter(source_port))
session.mount('https://',
requests_adapters.SourcePortAdapter(source_port))
session_kwargs = {}
if not HTTPS_verify:
session_kwargs['verify'] = False
if CA_certs_path:
session_kwargs['verify'] = CA_certs_path
if client_cert_path:
session_kwargs['cert'] = client_cert_path
session_kwargs['timeout'] = request_interval
start = time.time()
while time.time() - start < request_timeout:
try:
response = session.get(URL, **session_kwargs)
response_status_code = response.status_code
response_text = response.text
response.close()
if response_status_code != expected_status_code:
raise exceptions.InvalidHttpSuccessCode(
'{0} is not the expected code {1}'.format(
response_status_code, expected_status_code))
if expected_body and response_text != expected_body:
details = '{} does not match expected {}'.format(
response_text, expected_body)
raise exceptions.InvalidHTTPResponseBody(
resp_body=details)
if requests_session is None:
session.close()
return response_text
except requests.exceptions.Timeout:
# Don't sleep as we have already waited the interval.
LOG.info('Request for %s timed out. Retrying.', URL)
except (exceptions.InvalidHttpSuccessCode,
exceptions.InvalidHTTPResponseBody,
requests.exceptions.SSLError):
if requests_session is None:
session.close()
raise
except Exception as e:
LOG.info('Validate URL got exception: %s. '
'Retrying.', e)
time.sleep(request_interval)
if requests_session is None:
session.close()
raise exceptions.TimeoutException()
[docs]
@classmethod
def make_udp_request(cls, vip_address, port=80, timeout=None,
source_port=None):
if ipaddress.ip_address(vip_address).version == 6:
family = socket.AF_INET6
else:
family = socket.AF_INET
sock = socket.socket(family, socket.SOCK_DGRAM)
# Force the use of an incremental port number for source to avoid
# re-use of a previous source port that will affect the round-robin
# dispatch
while True:
port_number = cls.src_port_number
cls.src_port_number += 1
if cls.src_port_number >= cls.SRC_PORT_NUMBER_MAX:
cls.src_port_number = cls.SRC_PORT_NUMBER_MIN
# catch and skip already used ports on the host
try:
if source_port:
sock.bind(('', source_port))
else:
sock.bind(('', port_number))
except OSError as e:
# if error is 'Address already in use', try next port number
# If source_port is defined and already in use, a test
# developer has made a mistake by using a duplicate source
# port.
if e.errno != errno.EADDRINUSE or source_port:
raise e
else:
# successfully bind the socket
break
server_address = (vip_address, port)
data = b"data\n"
if timeout is not None:
sock.settimeout(timeout)
try:
sock.sendto(data, server_address)
data, addr = sock.recvfrom(4096)
except socket.timeout:
# Normalize the timeout exception so that UDP and other protocol
# tests all return a common timeout exception.
raise exceptions.TimeoutException()
finally:
sock.close()
return data.decode('utf-8')
[docs]
def make_request(
self, vip_address, protocol=const.HTTP, HTTPS_verify=True,
protocol_port=80, requests_session=None, client_cert_path=None,
CA_certs_path=None, request_timeout=2, source_port=None):
"""Make a request to a VIP.
:param vip_address: The VIP address to test.
:param protocol: The protocol to use for the test.
:param HTTPS_verify: How to verify the TLS certificate. True: verify
using the system CA certificates. False: Do not
verify the VIP certificate. <path>: Filesytem path
to a CA certificate bundle file or directory. For
directories, the directory must be processed using
the c_rehash utility from openssl.
:param protocol_port: The port number to use for the test.
:param requests_session: A requests session to use for the request.
If None, a new session will be created.
:param request_timeout: The maximum time, in seconds, to attempt
requests.
:param client_cert_path: Filesystem path to a file with the client
private key and certificate.
:param CA_certs_path: Filesystem path to a file containing CA
certificates to use for HTTPS validation.
:param source_port: If set, the request will come from this source port
number. If None, a random port will be used.
:raises InvalidHttpSuccessCode: The expected_status_code did not match.
:raises InvalidHTTPResponseBody: The response body did not match the
expected content.
:raises TimeoutException: The request timed out.
:raises Exception: If a protocol is requested that is not implemented.
:returns: The response data.
"""
# Note: We are using HTTP as the TCP protocol check to simplify
# the test setup. HTTP is a TCP based protocol.
if protocol == const.HTTP or protocol == const.TCP:
url = "http://{0}{1}{2}".format(
vip_address, ':' if protocol_port else '',
protocol_port or '')
data = self.validate_URL_response(
url, HTTPS_verify=False, requests_session=requests_session,
request_timeout=request_timeout,
source_port=source_port)
elif (protocol == const.HTTPS or
protocol == const.TERMINATED_HTTPS):
url = "https://{0}{1}{2}".format(
vip_address, ':' if protocol_port else '',
protocol_port or '')
data = self.validate_URL_response(
url, HTTPS_verify=HTTPS_verify,
requests_session=requests_session,
client_cert_path=client_cert_path,
CA_certs_path=CA_certs_path, source_port=source_port,
request_timeout=request_timeout)
elif protocol == const.UDP:
data = self.make_udp_request(
vip_address, port=protocol_port, timeout=request_timeout,
source_port=source_port)
else:
message = ("Unknown protocol %s. Unable to check if the "
"load balancer is balanced.", protocol)
LOG.error(message)
raise Exception(message)
return data
[docs]
def check_members_balanced(
self, vip_address, traffic_member_count=2, protocol=const.HTTP,
HTTPS_verify=True, protocol_port=80, persistent=True, repeat=20,
client_cert_path=None, CA_certs_path=None, request_interval=2,
request_timeout=10, source_port=None, delay=None):
"""Checks that members are evenly balanced behind a VIP.
:param vip_address: The VIP address to test.
:param traffic_member_count: The expected number of members.
:param protocol: The protocol to use for the test.
:param HTTPS_verify: How to verify the TLS certificate. True: verify
using the system CA certificates. False: Do not
verify the VIP certificate. <path>: Filesytem path
to a CA certificate bundle file or directory. For
directories, the directory must be processed using
the c_rehash utility from openssl.
:param protocol_port: The port number to use for the test.
:param persistent: True when the test should persist cookies and use
the protocol keepalive mechanism with the target.
This may include maintaining a connection to the
member server across requests.
:param repeat: The number of requests to make against the VIP.
:param request_timeout: The maximum time, in seconds, to attempt
requests.
:param client_cert_path: Filesystem path to a file with the client
private key and certificate.
:param CA_certs_path: Filesystem path to a file containing CA
certificates to use for HTTPS validation.
:param source_port: If set, the request will come from this source port
number. If None, a random port will be used.
:param delay: The time to pause between requests in seconds, can be
fractional.
"""
if (ipaddress.ip_address(vip_address).version == 6 and
protocol != const.UDP):
vip_address = '[{}]'.format(vip_address)
requests_session = None
if persistent:
requests_session = requests.Session()
self._wait_for_lb_functional(
vip_address, traffic_member_count, protocol_port, protocol,
HTTPS_verify, requests_session=requests_session,
source_port=source_port)
if source_port:
LOG.debug('Using source port %s for request(s)', source_port)
response_counts = {}
# Send a number requests to lb vip
for i in range(repeat):
try:
data = self.make_request(
vip_address, protocol=protocol, HTTPS_verify=HTTPS_verify,
protocol_port=protocol_port,
requests_session=requests_session,
client_cert_path=client_cert_path,
CA_certs_path=CA_certs_path, source_port=source_port,
request_timeout=request_timeout)
if data in response_counts:
response_counts[data] += 1
else:
response_counts[data] = 1
if delay is not None:
time.sleep(delay)
except Exception:
LOG.exception('Failed to send request to loadbalancer vip')
if persistent:
requests_session.close()
raise Exception('Failed to connect to lb')
if persistent:
requests_session.close()
LOG.debug('Loadbalancer response totals: %s', response_counts)
# Ensure the correct number of members responded
self.assertEqual(traffic_member_count, len(response_counts))
# Ensure both members got the same number of responses
self.assertEqual(1, len(set(response_counts.values())))
[docs]
def assertConsistentResponse(self, response, url, method='GET', repeat=10,
redirect=False, timeout=2,
expect_connection_error=False, **kwargs):
"""Assert that a request to URL gets the expected response.
:param response: Expected response in format (status_code, content).
:param url: The URL to request.
:param method: The HTTP method to use (GET, POST, PUT, etc)
:param repeat: How many times to test the response.
:param data: Optional data to send in the request.
:param headers: Optional headers to send in the request.
:param cookies: Optional cookies to send in the request.
:param redirect: Is the request a redirect? If true, assume the passed
content should be the next URL in the chain.
:param timeout: Optional seconds to wait for the server to send data.
:param expect_connection_error: Should we expect a connection error
:param expect_timeout: Should we expect a connection timeout
:return: boolean success status
:raises: testtools.matchers.MismatchError
"""
session = requests.Session()
response_code, response_content = response
for i in range(repeat):
if url.startswith(const.HTTP.lower()):
if expect_connection_error:
self.assertRaises(
requests.exceptions.ConnectionError, session.request,
method, url, allow_redirects=not redirect,
timeout=timeout, **kwargs)
continue
req = session.request(method, url,
allow_redirects=not redirect,
timeout=timeout, **kwargs)
if response_code:
self.assertEqual(response_code, req.status_code)
if redirect:
self.assertTrue(req.is_redirect)
self.assertEqual(response_content,
session.get_redirect_target(req))
elif response_content:
self.assertEqual(str(response_content), req.text)
elif url.startswith(const.UDP.lower()):
parsed_url = urlparse(url)
if expect_connection_error:
self.assertRaises(exceptions.TimeoutException,
self.make_udp_request,
parsed_url.hostname,
port=parsed_url.port, timeout=timeout)
continue
data = self.make_udp_request(parsed_url.hostname,
port=parsed_url.port,
timeout=timeout)
self.assertEqual(response_content, data)
def _wait_for_lb_functional(
self, vip_address, traffic_member_count, protocol_port, protocol,
HTTPS_verify, client_cert_path=None, CA_certs_path=None,
request_interval=2, request_timeout=10, requests_session=None,
source_port=None):
start = time.time()
response_counts = {}
# Send requests to the load balancer until at least
# "traffic_member_count" members have replied (ensure network
# connectivity is functional between the load balancer and the members)
while time.time() - start < CONF.load_balancer.build_timeout:
try:
data = self.make_request(
vip_address, protocol=protocol, HTTPS_verify=HTTPS_verify,
protocol_port=protocol_port,
client_cert_path=client_cert_path,
CA_certs_path=CA_certs_path, source_port=source_port,
request_timeout=request_timeout,
requests_session=requests_session)
if data in response_counts:
response_counts[data] += 1
else:
response_counts[data] = 1
if traffic_member_count == len(response_counts):
LOG.debug('Loadbalancer response totals: %s',
response_counts)
time.sleep(1)
return
except Exception:
LOG.warning('Server is not passing initial traffic. Waiting.')
time.sleep(request_interval)
LOG.debug('Loadbalancer wait for load balancer response totals: %s',
response_counts)
message = ('Server %s on port %s did not begin passing traffic within '
'the timeout period. Failing test.' % (vip_address,
protocol_port))
LOG.error(message)
raise Exception(message)
[docs]
def make_udp_requests_with_retries(
self, vip_address, number_of_retries, dst_port,
src_port=None, socket_timeout=20):
"""Send UDP packets using retries mechanism
The delivery of data to the destination cannot be guaranteed in UDP.
In case when UDP package is getting lost and we might want to check
what could be the reason for that (Network issues or Server Side),
well need to send more packets to get into the conclusion.
:param vip_address: LB VIP address
:param number_of_retries: integer number of retries
:param dst_port: UDP server destination port
:param src_port: UDP source port to bind for UDP connection
:param socket_timeout: UDP socket timeout
:return: None if all UPD retries failed, else first successful
response data from UDP server.
"""
retry_number = 0
received_data = None
while retry_number < number_of_retries:
LOG.info('make_udp_requests_with_retries attempt number: %s',
retry_number)
retry_number += 1
try:
received_data = self.make_udp_request(
vip_address, dst_port, timeout=socket_timeout,
source_port=src_port)
break
except Exception as e:
LOG.warning('make_udp_request has failed with: %s', e)
return received_data