Saturday, November 12, 2011

How the celeryctl command works in Celery..

One of the most popular Django apps out there is the Celery task queue framework. It allows you to build your own task queue system for offline processing and provides an elegant framework for interfacing with message brokers such as AMQP, Redis, etc.

The celeryctl command in the Celery task queue is an amazing tool. It allows you send commands to your Celery works to figure out which ones are actively processing tasks, revoke tasks that have been dispatched so that workers will skip over procesing them, and see which ones are scheduled. We use it here to monitor long-running tasks, such as this script:
from celery.bin import celeryctl
import datetime

def probe_celeryctl():
    results = celeryctl.inspect().run("active")
    check_old_celery_tasks(results)


def check_old_celery_tasks(results):
    bad_tasks = []

    MAX_TIMEDELTA = {'hours': 1}
    for host, tasks in results.items():
        for task in tasks:
            task_start = task.get('time_start')
            timestamp = float(task_start)
            task_timestamp = datetime.datetime.fromtimestamp(timestamp)
            time_diff = abs(datetime.datetime.utcnow() - task_timestamp)
            if time_diff > datetime.timedelta(**MAX_TIMEDELTA):
                print "Hmm..%s %s (on %s)" % (time_diff, task, host)
                bad_tasks.append("Task %s elapsed (%s): name=%s, PID=%s, args=%s, kwargs=%s" % (time_diff, host, task.get('name'), task.get('worker_pid'), task.get('args'), task.get('kwargs')))

    if len(bad_tasks) > 0:
        message = "You'd better check on these tasks...they are slowing things down.\n\n"
        message += "\n".join(bad_tasks)
        print message

if __name__ == "__main__":
    probe_celeryctl()
How does celeryctl work? Assuming you're using the AMQP backend with Celery, celeryctl relies on the same concepts used in the AMQP open standard (click here for a basic intro). When you first startup, Celery will create an AMQP exchange called "celeryd.pidbox" on the AMQP host. You can confirm by using rabbitmqctl to list exchanges:
$ sudo rabbitmqctl -p fanmgmt_prod list_exchanges
Listing exchanges ...
celeryd.pidbox fanout
.
.
.
You'll notice that celeryd.pidbox is created as a fanout exchange (see the AMQP intro for more details). This way, using celeryctl on one machine will broadcast a message to this exchange. The AMQP host will deliver messages to each queue that is bound to this celeryd.pidbox exchange. On startup, every Celery worker will also create a queue (queue.hostname.celery.pidbox) bound to this exchange, which can be used to respond to celeryctl commands.

Replies from each Celery worker are passed back via direct exchange using celeryd.reply.pidbox. When you startup celeryctl, it sends messages to the celeryd.pidbox and listens for messages to arrive from the celeryd.reply.pidbox queue. The timeout period to wait for replies is 1 second, so if you wish to increase the time to wait for replies from Celery workers, you can increase this number with the -t parameter (i.e. celeryctl inspect active -t )

Note: the reply exchange gets deleted after celeryctl command exists (usually set to auto_delete=True). Since all the Celery workers are still bound to the other celeryd.pidbox exchange, it should still persist until you shutdown all Celery workers.

No comments:

Post a Comment