#
# Copyright © 2015 Yahoo! Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import functools
import consul
from oslo_utils import encodeutils
import requests
import tooz
from tooz import _retry
from tooz import coordination
from tooz import locking
from tooz import utils
@contextlib.contextmanager
def _failure_translator():
"""Translates common consul exceptions into tooz exceptions."""
try:
yield
except (consul.Timeout, requests.exceptions.RequestException) as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except (consul.ConsulException, ValueError) as e:
# ValueError = Typically json decoding failed for some reason.
utils.raise_with_cause(tooz.ToozError,
encodeutils.exception_to_unicode(e),
cause=e)
def _translate_failures(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with _failure_translator():
return func(*args, **kwargs)
return wrapper
class ConsulLock(locking.Lock):
def __init__(self, name, node, address, session_id, client, token=None):
super().__init__(name)
self._name = name
self._node = node
self._address = address
self._acl_token = token
self._session_id = session_id
self._client = client
self.acquired = False
def acquire(self, blocking=True, shared=False, timeout=None):
if shared:
raise tooz.NotImplemented
if timeout:
raise tooz.NotImplemented
@_retry.retry(stop_max_delay=blocking)
@_translate_failures
def _acquire():
# Check if we are the owner and if we are simulate
# blocking (because consul will not block a second
# acquisition attempt by the same owner).
_index, value = self._client.kv.get(key=self._name,
token=self._acl_token)
if value and value.get('Session') == self._session_id:
if blocking is False:
return False
else:
raise _retry.TryAgain
else:
# The value can be anything.
gotten = self._client.kv.put(key=self._name,
value="I got it!",
acquire=self._session_id,
token=self._acl_token)
if gotten:
self.acquired = True
return True
if blocking is False:
return False
else:
raise _retry.TryAgain
return _acquire()
def release(self):
if not self.acquired:
return False
# Get the lock to verify the session ID's are same
_index, contents = self._client.kv.get(key=self._name,
token=self._acl_token)
if not contents:
return False
owner = contents.get('Session')
if owner == self._session_id:
removed = self._client.kv.put(key=self._name,
value=self._session_id,
release=self._session_id,
token=self._acl_token)
if removed:
self.acquired = False
return True
return False
[docs]
class ConsulDriver(coordination.CoordinationDriverCachedRunWatchers,
coordination.CoordinationDriverWithExecutor):
"""This driver uses `python-consul`_ client against `consul`_ servers.
The ConsulDriver implements the coordination driver API(s) so that Consul
can be used as an option for Distributed Locking and Group Membership. The
data is stored in Consul's key-value store.
The Consul driver connection URI should look like::
consul://HOST[:PORT][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]]
If not specified, PORT defaults to 8500.
Available options are:
================== =======
Name Default
================== =======
ttl 15
namespace tooz
acl_token None
================== =======
For details on the available options, refer to
http://python-consul2.readthedocs.org/en/latest/.
The following Key/Value paths are utilized in Consul to implement the
coordination APIs:
+-------------------------------------------+--------------+--------------+
| Key | Value | Description |
+===========================================+==============+==============+
| <namespace>/groups/<group_id> | None | Group of |
| | | members. |
+-------------------------------------------+--------------+--------------+
| <namespace>/groups/<group_id>/<member_id> | Member | Member in a |
| | capabilities | group. |
| | encoded as | |
| | msgpack | |
+-------------------------------------------+--------------+--------------+
| <namespace>/group_locks/<group_id> | Consul | Lock for |
| | session ID | group |
| | | membership |
+-------------------------------------------+--------------+--------------+
| <namespace>/locks/<name> | Consul | Each key is |
| | session ID | a distributed|
| | | lock. |
+-------------------------------------------+--------------+--------------+
NOTE: A group in tooz is NOT the same as a Consul service, so tooz groups
are implemented using Consul Key/Values and not with native services.
Groups in tooz do not have host:port details or health checks that report
to consul to verify that the service is still alive and listening on that
host:port. If you need to use native Consul services, configure the Consul
agent directly (not via tooz).
.. _python-consul: http://python-consul2.readthedocs.org/
.. _consul: https://consul.io/
"""
#: Default namespace when none is provided
TOOZ_NAMESPACE = "tooz"
#: Default TTL
DEFAULT_TTL = 15
#: Default consul port if not provided.
DEFAULT_PORT = 8500
#: Consul ACL Token if not provided
ACL_TOKEN = None
CHARACTERISTICS = (
# client liveness is based on more than just timeouts
coordination.Characteristics.NON_TIMEOUT_BASED,
# multiple service instances (group members) could register
# with different ports per thread/proc
# but the agent is always on locahost: so not DISTRIBUTED_ACROSS_HOSTS
coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS,
coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES,
# https://www.consul.io/docs/internals/consensus#consistency-modes
# The consul consistency mode determines the history characteristics.
# Writes *always* go through a single leader process into raft log.
# Reads *may* use all servers depending on the request's mode:
# - 'consistent' = LINEARIZABLE
# - 'default' = SEQUENTIAL
# - 'stale' = CAUSAL
coordination.Characteristics.SEQUENTIAL, # 'default' consistency mode
# https://www.consul.io/docs/internals/consensus
# raft log of servers is snapshotted + compacted
coordination.Characteristics.SERIALIZABLE,
)
"""
Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
enum member(s) that can be used to interogate how this driver works.
"""
[docs]
def __init__(self, member_id, parsed_url, options):
super().__init__(member_id, parsed_url, options)
options = utils.collapse(options)
self._host = parsed_url.hostname
self._port = parsed_url.port or self.DEFAULT_PORT
self._session_id = None
self._session_name = encodeutils.safe_decode(member_id)
self._ttl = int(options.get('ttl', self.DEFAULT_TTL))
namespace = options.get('namespace', self.TOOZ_NAMESPACE)
self._namespace = encodeutils.safe_decode(namespace)
self._acl_token = options.get('acl_token', self.ACL_TOKEN)
# the empty group name adds a trailing / needed for lookups
self._groups_prefix = self._paths_join(self._namespace, "groups", "")
self._client = None
def _start(self):
"""Create a client, register a node and create a session."""
# Create a consul client
if self._client is None:
self._client = consul.Consul(host=self._host, port=self._port,
token=self._acl_token)
local_agent = self._client.agent.self()
self._node = local_agent['Member']['Name']
self._address = local_agent['Member']['Addr']
# implicitly uses the agent's datacenter (set in consul agent config)
# Register a Node
self._client.catalog.register(node=self._node,
address=self._address,
token=self._acl_token)
# Create a session
self._session_id = self._client.session.create(
name=self._session_name,
node=self._node,
ttl=self._ttl,
token=self._acl_token)
def _stop(self):
if self._client is not None:
if self._session_id is not None:
self._client.session.destroy(self._session_id,
token=self._acl_token)
self._session_id = None
self._client = None
[docs]
def heartbeat(self):
# THIS IS A REQUIREMENT FOR CONSUL TO WORK PROPERLY.
# Consul maintains a "session" token that is used to as the basis
# for all operations in the service. The session must be refreshed
# periodically or else it assumes that the agent maintaining the
# session has died or is unreachable. When a session expires all locks
# are released and any services that were registered with that session
# are marked as no longer active.
self._client.session.renew(self._session_id, token=self._acl_token)
# renew the session every half-TTL or 1 second, whatever is larger
sleep_sec = max(self._ttl / 2, 1)
return sleep_sec
def _get_lock(self, real_name):
return ConsulLock(real_name, self._node, self._address,
session_id=self._session_id,
client=self._client, token=self._acl_token)
[docs]
def get_lock(self, name):
real_name = self._path_lock(name)
return self._get_lock(real_name)
def _get_group_lock(self, group_id):
real_name = self._path_group_lock(group_id)
return self._get_lock(real_name)
@staticmethod
def _paths_join(*args):
pieces = []
for arg in args:
pieces.append(encodeutils.safe_decode(arg))
return "/".join(pieces)
def _path_lock(self, name):
return self._paths_join(self._namespace, "locks", name)
def _path_group_lock(self, name):
return self._paths_join(self._namespace, "group_locks", name)
def _path_group(self, group_id):
# add an extra '/' at the end so that searches with this path
# will go down and find their children
return self._paths_join(self._namespace, "groups", group_id) + "/"
def _path_member(self, group_id, member_id):
return self._paths_join(
self._namespace, "groups", group_id, member_id
)
def _group_path_to_id(self, base_path, group_path):
"""Translates a path into a group name.
The group name is the last part of the path. So, we simply split on
the path separator '/' and return the last element.
Example:
group_id = self._path_to_group_id("tooz/groups/helloworld")
print(group_id) # "helloworld"
"""
if group_path.startswith(base_path):
group_id = group_path[len(base_path):]
else:
group_id = group_path
# if a group has members (sub-keys) it will contain a trailing /
# we need to strip this to get just the name
# if a group has no members there is no trailing / (for some reason)
group_id = group_id.strip("/")
return utils.to_binary(group_id)
def _get_group_members(self, group_path):
index, data = self._client.kv.get(group_path, recurse=True)
group = None
members = []
for kv in (data or []):
if kv["Key"] == group_path:
group = kv
else:
members.append(kv)
return (group, members)
[docs]
def get_groups(self):
@_translate_failures
def _get_groups():
groups = []
index, data = self._client.kv.get(self._groups_prefix, keys=True,
separator="/")
for key in (data or []):
if key != self._groups_prefix:
group_id = self._group_path_to_id(self._groups_prefix, key)
groups.append(group_id)
return groups
return ConsulFutureResult(self._executor.submit(_get_groups))
[docs]
def create_group(self, group_id):
@_translate_failures
def _create_group():
group_path = self._path_group(group_id)
# create with Check-And-Set index 0 will only succeed if the key
# doesn't exit
result = self._client.kv.put(group_path, "", cas=0)
if not result:
raise coordination.GroupAlreadyExist(group_id)
return result
return ConsulFutureResult(self._executor.submit(_create_group))
def _destroy_group(self, group_id):
"""Should only be used in tests..."""
with self._get_group_lock(group_id) as lock:
group_path = self._path_group(group_id)
self._client.kv.delete(group_path, recurse=True)
self._client.kv.delete(lock._name)
[docs]
def delete_group(self, group_id):
@_translate_failures
def _delete_group():
# create a lock for the group so that other operations on this
# group do not conflict while the group is being deleted
with self._get_group_lock(group_id) as lock:
group_path = self._path_group(group_id)
group, members = self._get_group_members(group_path)
if not group:
raise coordination.GroupNotCreated(group_id)
if members:
raise coordination.GroupNotEmpty(group_id)
# delete the group recursively
result = self._client.kv.delete(group_path, recurse=True)
# delete the lock for the group
self._client.kv.delete(lock._name)
return result
return ConsulFutureResult(self._executor.submit(_delete_group))
[docs]
def join_group(self, group_id, capabilities=b""):
@_translate_failures
def _join_group():
# lock the group so that it doesn't get deleted while we join
with self._get_group_lock(group_id):
group_path = self._path_group(group_id)
member_path = self._path_member(group_id, self._member_id)
group, members = self._get_group_members(group_path)
if not group:
raise coordination.GroupNotCreated(group_id)
for m in members:
if m["Key"] == member_path:
raise coordination.MemberAlreadyExist(group_id,
self._member_id)
# create with Check-And-Set index 0 will only succeed if the
# key doesn't exit
self._client.kv.put(member_path, utils.dumps(capabilities),
cas=0)
self._joined_groups.add(group_id)
return ConsulFutureResult(self._executor.submit(_join_group))
[docs]
def leave_group(self, group_id):
@_translate_failures
def _leave_group():
# NOTE: We do NOT have to lock the group here because deletes in
# Consul are atomic and succeed even if the key doesn't exist
# This means that there is no race condition between checking
# if the member exists and then deleting it.
group_path = self._path_group(group_id)
member_path = self._path_member(group_id, self._member_id)
group, members = self._get_group_members(group_path)
member = None
for m in members:
if m["Key"] == member_path:
member = m
break
else:
raise coordination.MemberNotJoined(group_id, self._member_id)
# delete the member key with Check-And-Set semantics based on index
# we read above
self._client.kv.delete(member_path, cas=member["ModifyIndex"])
self._joined_groups.discard(group_id)
return ConsulFutureResult(self._executor.submit(_leave_group))
[docs]
def get_members(self, group_id):
@_translate_failures
def _get_members():
group_path = self._path_group(group_id)
group, members = self._get_group_members(group_path)
if not group:
raise coordination.GroupNotCreated(group_id)
result = set()
for m in members:
member_id = self._group_path_to_id(group_path, m["Key"])
result.add(member_id)
return result
return ConsulFutureResult(self._executor.submit(_get_members))
[docs]
def get_member_capabilities(self, group_id, member_id):
@_translate_failures
def _get_member_capabilities():
member_path = self._path_member(group_id, member_id)
index, data = self._client.kv.get(member_path)
if not data:
raise coordination.MemberNotJoined(group_id, member_id)
return utils.loads(data["Value"])
return ConsulFutureResult(
self._executor.submit(_get_member_capabilities))
[docs]
def update_capabilities(self, group_id, capabilities):
@_translate_failures
def _update_capabilities():
member_path = self._path_member(group_id, self._member_id)
index, data = self._client.kv.get(member_path)
if not data:
raise coordination.MemberNotJoined(group_id, self._member_id)
# no need to Check-And-Set here, latest write wins
self._client.kv.put(member_path, utils.dumps(capabilities))
return ConsulFutureResult(self._executor.submit(_update_capabilities))
[docs]
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
[docs]
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
ConsulFutureResult = functools.partial(coordination.CoordinatorResult,
failure_translator=_failure_translator)