The zaqar.storage.pooling module

class Catalog(conf, cache, control)

Bases: object

Represents the mapping between queues and pool drivers.

deregister(*args, **kwargs)

Removes a queue from the pool catalog.

Call this method after successfully deleting it from a backend pool.

Parameters:
  • queue (six.text_type) – Name of the new queue to assign to a pool
  • project (six.text_type) – Project to which the queue belongs, or None for the “global” or “generic” project.
get_claim_controller(queue, project=None)

Lookup the claim controller for the given queue and project.

Parameters:
  • queue – Name of the queue for which to find a pool
  • project – Project to which the queue belongs, or None to specify the “global” or “generic” project.
Returns:

The claim controller associated with the data driver for the pool containing (queue, project) or None if this doesn’t exist.

Return type:

Maybe ClaimController

get_default_pool(use_listing=True)
get_driver(pool_id, pool_conf=None)

Get storage driver, preferably cached, from a pool name.

Parameters:pool_id (six.text_type) – The name of a pool.
Returns:a storage driver
Return type:zaqar.storage.base.DataDriver
get_message_controller(queue, project=None)

Lookup the message controller for the given queue and project.

Parameters:
  • queue – Name of the queue for which to find a pool
  • project – Project to which the queue belongs, or None to specify the “global” or “generic” project.
Returns:

The message controller associated with the data driver for the pool containing (queue, project) or None if this doesn’t exist.

Return type:

Maybe MessageController

get_queue_controller(queue, project=None)

Lookup the queue controller for the given queue and project.

Parameters:
  • queue – Name of the queue for which to find a pool
  • project – Project to which the queue belongs, or None to specify the “global” or “generic” project.
Returns:

The queue controller associated with the data driver for the pool containing (queue, project) or None if this doesn’t exist.

Return type:

Maybe QueueController

get_subscription_controller(queue, project=None)

Lookup the subscription controller for the given queue and project.

Parameters:
  • queue – Name of the queue for which to find a pool
  • project – Project to which the queue belongs, or None to specify the “global” or “generic” project.
Returns:

The subscription controller associated with the data driver for the pool containing (queue, project) or None if this doesn’t exist.

Return type:

Maybe SubscriptionController

lookup(queue, project=None)

Lookup a pool driver for the given queue and project.

Parameters:
  • queue – Name of the queue for which to find a pool
  • project – Project to which the queue belongs, or None to specify the “global” or “generic” project.
Returns:

A storage driver instance for the appropriate pool. If the driver does not exist yet, it is created and cached. If the queue is not mapped, returns None.

Return type:

Maybe DataDriver

register(queue, project=None, flavor=None)

Register a new queue in the pool catalog.

This method should be called whenever a new queue is being created, and will create an entry in the pool catalog for the given queue.

After using this method to register the queue in the catalog, the caller should call lookup() to get a reference to a storage driver which will allow interacting with the queue’s assigned backend pool.

Parameters:
  • queue (six.text_type) – Name of the new queue to assign to a pool
  • project (six.text_type) – Project to which the queue belongs, or None for the “global” or “generic” project.
  • flavor (six.text_type) – Flavor for the queue (OPTIONAL)
Raises:

NoPoolFound

class ClaimController(pool_catalog)

Bases: zaqar.storage.base.Claim

Routes operations to a claim controller in the appropriate pool.

Parameters:pool_catalog (queues.pooling.base.Catalog) – a catalog of available pools
create(queue, metadata, project=None, limit=10)
delete(queue, claim_id, project=None)
get(queue, claim_id, project=None)
update(queue, claim_id, metadata, project=None)
class DataDriver(conf, cache, control, control_driver=None)

Bases: zaqar.storage.base.DataDriverBase

Pooling meta-driver for routing requests to multiple backends.

Parameters:
  • conf – Configuration from which to read pooling options
  • cache – Cache instance that will be passed to individual storage driver instances that correspond to each pool. will also be used by the pool controller to reduce latency for some operations.
BASE_CAPABILITIES = (<Capabilities.FIFO: 1>, <Capabilities.CLAIMS: 2>, <Capabilities.DURABILITY: 3>, <Capabilities.AOD: 4>, <Capabilities.HIGH_THROUGHPUT: 5>)
capabilities
claim_controller
close()
gc()
is_alive()
message_controller
queue_controller
subscription_controller
class MessageController(pool_catalog)

Bases: zaqar.storage.base.Message

Routes operations to a message controller in the appropriate pool.

Parameters:pool_catalog (queues.pooling.base.Catalog) – a catalog of available pools
bulk_delete(queue, message_ids, project=None)
bulk_get(queue, message_ids, project=None)
delete(queue, message_id, project=None, claim=None)
first(queue, project=None, sort=1)
get(queue, message_id, project=None)
list(queue, project=None, marker=None, limit=10, echo=False, client_uuid=None, include_claimed=False)
pop(queue, limit, project=None)
post(queue, messages, client_uuid, project=None)
class QueueController(pool_catalog)

Bases: zaqar.storage.base.Queue

Routes operations to get the appropriate queue controller.

Parameters:pool_catalog (queues.pooling.base.Catalog) – a catalog of available pools
get_metadata(name, project=None)
set_metadata(name, metadata, project=None)
class SubscriptionController(pool_catalog)

Bases: zaqar.storage.base.Subscription

Controller to facilitate processing for subscription operations.

confirm(queue, subscription_id, project=None, confirmed=None)
create(queue, subscriber, ttl, options, project=None)
delete(queue, subscription_id, project=None)
exists(queue, subscription_id, project=None)
get(queue, subscription_id, project=None)
get_with_subscriber(queue, subscriber, project=None)
list(queue, project=None, marker=None, limit=10)
update(queue, subscription_id, project=None, **kwargs)