Source code for kombu.transport.django.managers

from __future__ import absolute_import

from functools import wraps

from django.db import transaction, connection, models
try:
    from django.db import connections, router
except ImportError:  # pre-Django 1.2
    connections = router = None  # noqa


try:
    transaction.atomic
except AttributeError:
    commit_on_success = transaction.commit_on_success
else:
[docs] def commit_on_success(fun): @wraps(fun) def _commit(*args, **kwargs): with transaction.atomic(): return fun(*args, **kwargs) return _commit
[docs]class QueueManager(models.Manager):
[docs] def publish(self, queue_name, payload): queue, created = self.get_or_create(name=queue_name) queue.messages.create(payload=payload)
[docs] def fetch(self, queue_name): try: queue = self.get(name=queue_name) except self.model.DoesNotExist: return return queue.messages.pop()
[docs] def size(self, queue_name): return self.get(name=queue_name).messages.count()
[docs] def purge(self, queue_name): try: queue = self.get(name=queue_name) except self.model.DoesNotExist: return messages = queue.messages.all() count = messages.count() messages.delete() return count
[docs]def select_for_update(qs): if connection.vendor == 'oracle': return qs try: return qs.select_for_update() except AttributeError: return qs
[docs]class MessageManager(models.Manager): _messages_received = [0] cleanup_every = 10 @commit_on_success
[docs] def pop(self): try: resultset = select_for_update( self.filter(visible=True).order_by('sent_at', 'id') ) result = resultset[0:1].get() result.visible = False result.save() recv = self.__class__._messages_received recv[0] += 1 if not recv[0] % self.cleanup_every: self.cleanup() return result.payload except self.model.DoesNotExist: pass
[docs] def cleanup(self): cursor = self.connection_for_write().cursor() cursor.execute( 'DELETE FROM %s WHERE visible=%%s' % ( self.model._meta.db_table, ), (False, ) )
[docs] def connection_for_write(self): if connections: return connections[router.db_for_write(self.model)] return connection