This document is for Celery's development version, which can be significantly different from previous releases. Get old docs here: 2.5.

celery.task.base

celery.task.base

The task implementation has been moved to celery.app.task.

This contains the backward compatible Task class used in the old API.

copyright:
  1. 2009 - 2012 by Ask Solem.
license:

BSD, see LICENSE for more details.

class celery.task.base.BaseTask

Task base class.

When called tasks apply the run() method. This method must be defined by all tasks (that is unless the __call__() method is overridden).

AsyncResult(task_id)

Get AsyncResult instance for this kind of task.

Parameters:task_id – Task id to get result for.
class ErrorMail(task, **kwargs)

Defines how and when task error e-mails should be sent.

Parameters:task – The task instance that raised the error.

subject and body are format strings which are passed a context containing the following keys:

  • name

    Name of the task.

  • id

    UUID of the task.

  • exc

    String representation of the exception.

  • args

    Positional arguments.

  • kwargs

    Keyword arguments.

  • traceback

    String representation of the traceback.

  • hostname

    Worker hostname.

should_send(context, exc)

Returns true or false depending on if a task error mail should be sent for this type of error.

exception BaseTask.MaxRetriesExceededError

The tasks max restart limit has been exceeded.

BaseTask.after_return(status, retval, task_id, args, kwargs, einfo)

Handler called after the task returns.

Parameters:
  • status – Current task state.
  • retval – Task return value/exception.
  • task_id – Unique id of the task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
  • einfoExceptionInfo instance, containing the traceback (if any).

The return value of this handler is ignored.

BaseTask.apply(args=None, kwargs=None, **options)

Execute this task locally, by blocking until the task returns.

Parameters:
  • args – positional arguments passed on to the task.
  • kwargs – keyword arguments passed on to the task.
  • throw – Re-raise task exceptions. Defaults to the CELERY_EAGER_PROPAGATES_EXCEPTIONS setting.

:rtype celery.result.EagerResult:

BaseTask.apply_async(args=None, kwargs=None, task_id=None, publisher=None, connection=None, router=None, link=None, link_error=None, **options)

Apply tasks asynchronously by sending a message.

Parameters:
  • args – The positional arguments to pass on to the task (a list or tuple).
  • kwargs – The keyword arguments to pass on to the task (a dict)
  • countdown – Number of seconds into the future that the task should execute. Defaults to immediate execution (do not confuse with the immediate flag, as they are unrelated).
  • eta – A datetime object describing the absolute time and date of when the task should be executed. May not be specified if countdown is also supplied. (Do not confuse this with the immediate flag, as they are unrelated).
  • expires – Either a int, describing the number of seconds, or a datetime object that describes the absolute time and date of when the task should expire. The task will not be executed after the expiration time.
  • connection – Re-use existing broker connection instead of establishing a new one.
  • retry – If enabled sending of the task message will be retried in the event of connection loss or failure. Default is taken from the CELERY_TASK_PUBLISH_RETRY setting. Note you need to handle the publisher/connection manually for this to work.
  • retry_policy – Override the retry policy used. See the CELERY_TASK_PUBLISH_RETRY setting.
  • routing_key – The routing key used to route the task to a worker server. Defaults to the routing_key attribute.
  • exchange – The named exchange to send the task to. Defaults to the exchange attribute.
  • exchange_type – The exchange type to initialize the exchange if not already declared. Defaults to the exchange_type attribute.
  • immediate – Request immediate delivery. Will raise an exception if the task cannot be routed to a worker immediately. (Do not confuse this parameter with the countdown and eta settings, as they are unrelated). Defaults to the immediate attribute.
  • mandatory – Mandatory routing. Raises an exception if there’s no running workers able to take on this task. Defaults to the mandatory attribute.
  • priority – The task priority, a number between 0 and 9. Defaults to the priority attribute.
  • serializer – A string identifying the default serialization method to use. Can be pickle, json, yaml, msgpack or any custom serialization method that has been registered with kombu.serialization.registry. Defaults to the serializer attribute.
  • compression – A string identifying the compression method to use. Can be one of zlib, bzip2, or any custom compression methods registered with kombu.compression.register(). Defaults to the CELERY_MESSAGE_COMPRESSION setting.
  • link – A single, or a list of subtasks to apply if the task exits successfully.
  • link_error – A single, or a list of subtasks to apply if an error occurs while executing the task.

Note

If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.

BaseTask.delay(*args, **kwargs)

Star argument version of apply_async().

Does not support the extra options enabled by apply_async().

Parameters:
  • *args – positional arguments passed on to the task.
  • **kwargs – keyword arguments passed on to the task.

:returns celery.result.AsyncResult:

BaseTask.establish_connection(connect_timeout=None)

Establish a connection to the message broker.

BaseTask.execute(request, pool, loglevel, logfile, **kwargs)

The method the worker calls to execute the task.

Parameters:
  • request – A Request.
  • pool – A task pool.
  • loglevel – Current loglevel.
  • logfile – Name of the currently used logfile.
  • consumer – The Consumer.
BaseTask.get_consumer(connection=None, queues=None, **kwargs)

Get message consumer.

:rtype kombu.messaging.Consumer:

Warning

If you don’t specify a connection, one will automatically be established for you, in that case you need to close this connection after use:

>>> consumer = self.get_consumer()
>>> # do something with consumer
>>> consumer.close()
>>> consumer.connection.close()
BaseTask.get_logger(**kwargs)

Get task-aware logger object.

BaseTask.get_publisher(connection=None, exchange=None, connect_timeout=None, exchange_type=None, **options)

Get a celery task message publisher.

:rtype TaskProducer:

Warning

If you don’t specify a connection, one will automatically be established for you, in that case you need to close this connection after use:

>>> publisher = self.get_publisher()
>>> # ... do something with publisher
>>> publisher.connection.close()
BaseTask.on_bound(app)

This method can be defined to do additional actions when the task class is bound to an app.

BaseTask.on_failure(exc, task_id, args, kwargs, einfo)

Error handler.

This is run by the worker when the task fails.

Parameters:
  • exc – The exception raised by the task.
  • task_id – Unique id of the failed task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

BaseTask.on_retry(exc, task_id, args, kwargs, einfo)

Retry handler.

This is run by the worker when the task is to be retried.

Parameters:
  • exc – The exception sent to retry().
  • task_id – Unique id of the retried task.
  • args – Original arguments for the retried task.
  • kwargs – Original keyword arguments for the retried task.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

BaseTask.on_success(retval, task_id, args, kwargs)

Success handler.

Run by the worker if the task executes successfully.

Parameters:
  • retval – The return value of the task.
  • task_id – Unique id of the executed task.
  • args – Original arguments for the executed task.
  • kwargs – Original keyword arguments for the executed task.

The return value of this handler is ignored.

BaseTask.retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)

Retry the task.

Parameters:
  • args – Positional arguments to retry with.
  • kwargs – Keyword arguments to retry with.
  • exc – Optional exception to raise instead of MaxRetriesExceededError when the max restart limit has been exceeded.
  • countdown – Time in seconds to delay the retry for.
  • eta – Explicit time and date to run the retry at (must be a datetime instance).
  • max_retries – If set, overrides the default retry limit.
  • **options – Any extra options to pass on to meth:apply_async.
  • throw – If this is False, do not raise the RetryTaskError exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns.
Raises celery.exceptions.RetryTaskError:
 

To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to False, and is considered normal operation.

Example

>>> @task
>>> def tweet(auth, message):
...     twitter = Twitter(oauth=auth)
...     try:
...         twitter.post_status_update(message)
...     except twitter.FailWhale, exc:
...         # Retry in 5 minutes.
...         return tweet.retry(countdown=60 * 5, exc=exc)

Although the task will never return above as retry raises an exception to notify the worker, we use return in front of the retry to convey that the rest of the block will not be executed.

BaseTask.run(*args, **kwargs)

The body of the task executed by workers.

BaseTask.s(*args, **kwargs)

.s(*a, **k) -> .subtask(a, k)

BaseTask.subtask(*args, **kwargs)

Returns subtask object for this task, wrapping arguments and execution options for a single task invocation.

BaseTask.update_state(task_id=None, state=None, meta=None)

Update task state.

Parameters:
  • task_id – Id of the task to update.
  • state – New state (str).
  • meta – State metadata (dict).
class celery.task.base.PeriodicTask

A periodic task is a task that adds itself to the CELERYBEAT_SCHEDULE setting.

class celery.task.base.TaskType

Meta class for tasks.

Automatically registers the task in the task registry, except if the abstract attribute is set.

If no name attribute is provided, then no name is automatically set to the name of the module it was defined in, and the class name.

Previous topic

celery.task

Next topic

celery.task.sets

This Page