Run and monitor celery tasks
Project description
Summary
Run, monitor and log celery tasks.
Installation and setup
Declare tasks using celery task or cubicweb-celery cwtasks.
On worker side, install cw-celerytask-helpers.
celeryconfig.py example:
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = BROKER_URL
CUBICWEB_CELERYTASK_REDIS_URL = BROKER_URL
CELERY_IMPORTS = ('cw_celerytask_helpers.helpers', 'module.containing.tasks')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
In this configuration example, the cw_celerytask_helpers in CELERY_IMPORTS is required to have logging data (in the task) sent back to the Cubicweb instance via Redis. The CUBICWEB_CELERYTASK_REDIS_URL is the Redis endpoint used for this logging handling mechanism.
Start a worker:
# running cubicweb tasks (celeryconfig.py will be imported from your instance config directory) celery -A cubicweb_celery -i <CW_INSTANCE_NAME> worker -l info # running pure celery tasks celery worker -l info
Task state synchronization requires to run the celery-monitor command:
cubicweb-ctl celery-monitor <instance-name>
Ensure to have the celeryconfig.py loaded for both cubicweb instance and celery worker, enforce by settings with CELERY_CONFIG_MODULE environment variable (it must be an importable python module).
Running tasks
Create a task:
from celery import current_app as app
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(name='hi_there')
def my_task(arg, kw=0):
logger.info('HI %s %s!', arg, kw)
return 42
Run a task:
from cubes.celerytask.entities import start_async_task
cwtask = start_async_task(cnx, 'hi_there', 'THERE', kw=42)
cnx.commit()
start_async_task() accept task names, task objects or task signatures: http://docs.celeryproject.org/en/latest/userguide/canvas.html#signatures
For instance, to start the above task in a dedicated queue named myqueue:
import celery
start_async_task(cnx, celery.signature('hi_there', args=('THERE',),
kwargs={'kw': 42}, queue='myqueue'))
Testing task based application
In CubicWeb test mode, tasks don’t run automatically, use cubes.celerytask.entities.get_tasks() to introspect them and cubes.celerytask.entities.run_all_tasks() to run them.
Also, CELERY_ALWAYS_EAGER and CELERY_EAGER_PROPAGATES_EXCEPTIONS are set to True by default.
Demo
A simple demo is supplied with the source code.
We assume the present cubicweb-celerytask cube is properly installed in a working Cubicweb environment, and there is a redis server available on redis://localhost:6379/0 (you can change this in demo/celeryconfig.py is needed).
Then:
user@host:~/celerytask$ cubicweb-ctl create celerytask demo
For the sake of simplicity, choose sqlite as database driver, and say ‘yes’ to the question “aAllow anonymous access ?”
Start the web application:
user@host:~/celerytask$ cubicweb-ctl start -D -linfo demo
Open your web browser on http://127.0.0.1:8080/
In another terminal, start a celery worker:
user@host:~$ cd celerytask/demo
user@host:~/celerytask/demo$ celery worker -l info -E
In a third terminal, launch some tasks:
user@host:~$ cd celerytask/demo
user@host:~/celerytask/demo$ cubicweb-ctl shell demo launchtasks.py
You should be able to see 3 tasks on http://127.0.0.1:8080/CeleryTask
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file cubicweb-celerytask-0.4.0.tar.gz.
File metadata
- Download URL: cubicweb-celerytask-0.4.0.tar.gz
- Upload date:
- Size: 21.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b860e133897f05eb01d072e470c36dcb34d0fa08c232faeaf596d2cd206bc3fe
|
|
| MD5 |
1ed95ccd00ff62a55dcaf93bbdd476b5
|
|
| BLAKE2b-256 |
887d05ed56a4110ba1d5f798387e521f2fd609fb892e261d6767bc0732a1fcae
|