Django-RQ
Django integration with RQ, a
Redis based Python queuing library.
Django-RQ is a simple app that allows
you to configure your queues in django\'s settings.py
and easily use
them in your project.
Support Django-RQ
If you find django-rq
useful, please consider supporting its
development via
Tidelift.
Requirements
Installation
- Install
django-rq
(or download from PyPI):
pip install django-rq
- Add
django_rq
toINSTALLED_APPS
insettings.py
:
INSTALLED_APPS = (
# other apps
"django_rq",
)
- Configure your queues in django\'s
settings.py
:
RQ_QUEUES = {
'default': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'USERNAME': 'some-user',
'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
'REDIS_CLIENT_KWARGS': { # Eventual additional Redis connection arguments
'ssl_cert_reqs': None,
},
},
'with-sentinel': {
'SENTINELS': [('localhost', 26736), ('localhost', 26737)],
'MASTER_NAME': 'redismaster',
'DB': 0,
# Redis username/password
'USERNAME': 'redis-user',
'PASSWORD': 'secret',
'SOCKET_TIMEOUT': 0.3,
'CONNECTION_KWARGS': { # Eventual additional Redis connection arguments
'ssl': True
},
'SENTINEL_KWARGS': { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username': 'sentinel-user',
'password': 'secret',
},
},
'high': {
'URL': os.getenv('REDISTOGO_URL', 'redis://localhost:6379/0'), # If you're on Heroku
'DEFAULT_TIMEOUT': 500,
},
'low': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
}
}
RQ_EXCEPTION_HANDLERS = ['path.to.my.handler'] # If you need custom exception handlers
- Include
django_rq.urls
in yoururls.py
:
urlpatterns += [
path('django-rq/', include('django_rq.urls'))
]
Usage
Putting jobs in the queue
[Django-RQ]{.title-ref} allows you to easily put jobs into any of the
queues defined in settings.py
. It comes with a few utility functions:
enqueue
- push a job to thedefault
queue:
import django_rq
django_rq.enqueue(func, foo, bar=baz)
get_queue
- returns anQueue
instance.
import django_rq
queue = django_rq.get_queue('high')
queue.enqueue(func, foo, bar=baz)
In addition to name
argument, get_queue
also accepts
default_timeout
, is_async
, autocommit
, connection
and
queue_class
arguments. For example:
queue = django_rq.get_queue('default', autocommit=True, is_async=True, default_timeout=360)
queue.enqueue(func, foo, bar=baz)
You can provide your own singleton Redis connection object to this function so that it will not create a new connection object for each queue definition. This will help you limit number of connections to Redis server. For example:
import django_rq
import redis
redis_cursor = redis.StrictRedis(host='', port='', db='', password='')
high_queue = django_rq.get_queue('high', connection=redis_cursor)
low_queue = django_rq.get_queue('low', connection=redis_cursor)
get_connection
- accepts a single queue name argument (defaults to \"default\") and returns a connection to the queue\'s Redis server:
import django_rq
redis_conn = django_rq.get_connection('high')
get_worker
- accepts optional queue names and returns a new [RQ]{.title-ref}Worker
instance for specified queues (ordefault
queue):
import django_rq
worker = django_rq.get_worker() # Returns a worker for "default" queue
worker.work()
worker = django_rq.get_worker('low', 'high') # Returns a worker for "low" and "high"
\@job decorator
To easily turn a callable into an RQ task, you can also use the @job
decorator that comes with django_rq
:
from django_rq import job
@job
def long_running_func():
pass
long_running_func.delay() # Enqueue function in "default" queue
@job('high')
def long_running_func():
pass
long_running_func.delay() # Enqueue function in "high" queue
You can pass in any arguments that RQ\'s job decorator accepts:
@job('default', timeout=3600)
def long_running_func():
pass
long_running_func.delay() # Enqueue function with a timeout of 3600 seconds.
It\'s possible to specify default for result_ttl
decorator keyword
argument via DEFAULT_RESULT_TTL
setting:
RQ = {
'DEFAULT_RESULT_TTL': 5000,
}
With this setting, job decorator will set result_ttl
to 5000 unless
it\'s specified explicitly.
Running workers
django_rq provides a management command that starts a worker for every queue specified as arguments:
python manage.py rqworker high default low
If you want to run rqworker
in burst mode, you can pass in the
--burst
flag:
python manage.py rqworker high default low --burst
If you need to use custom worker, job or queue classes, it is best to use global settings (see Custom queue classes and Custom job and worker classes). However, it is also possible to override such settings with command line options as follows.
To use a custom worker class, you can pass in the --worker-class
flag
with the path to your worker:
python manage.py rqworker high default low --worker-class 'path.to.GeventWorker'
To use a custom queue class, you can pass in the --queue-class
flag
with the path to your queue class:
python manage.py rqworker high default low --queue-class 'path.to.CustomQueue'
To use a custom job class, provide --job-class
flag.
Starting from version 2.10, running RQ\'s worker-pool is also supported:
python manage.py rqworker-pool default low medium --num-workers 4
Support for Scheduled Jobs
With RQ 1.2.0. you can use built-in scheduler for your jobs. For example:
from django_rq.queues import get_queue
queue = get_queue('default')
job = queue.enqueue_at(datetime(2020, 10, 10), func)
If you are using built-in scheduler you have to start workers with scheduler support:
python manage.py rqworker --with-scheduler
Alternatively you can use RQ
Scheduler. After install you can
also use the get_scheduler
function to return a Scheduler
instance
for queues defined in settings.py\'s RQ_QUEUES
. For example:
import django_rq
scheduler = django_rq.get_scheduler('default')
job = scheduler.enqueue_at(datetime(2020, 10, 10), func)
You can also use the management command rqscheduler
to start the
scheduler:
python manage.py rqscheduler
Support for django-redis and django-redis-cache
If you have django-redis or django-redis-cache installed, you can instruct django_rq to use the same connection information from your Redis cache. This has two advantages: it\'s DRY and it takes advantage of any optimization that may be going on in your cache setup (like using connection pooling or Hiredis.)
To use configure it, use a dict with the key USE_REDIS_CACHE
pointing
to the name of the desired cache in your RQ_QUEUES
dict. It goes
without saying that the chosen cache must exist and use the Redis
backend. See your respective Redis cache package docs for configuration
instructions. It\'s also important to point out that since the
django-redis-cache ShardedClient
splits the cache over multiple Redis
connections, it does not work.
Here is an example settings fragment for `django-redis`:
CACHES = {
'redis-cache': {
'BACKEND': 'redis_cache.cache.RedisCache',
'LOCATION': 'localhost:6379:1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'MAX_ENTRIES': 5000,
},
},
}
RQ_QUEUES = {
'high': {
'USE_REDIS_CACHE': 'redis-cache',
},
'low': {
'USE_REDIS_CACHE': 'redis-cache',
},
}
Suspending and Resuming Workers
Sometimes you may want to suspend RQ to prevent it from processing new jobs. A classic example is during the initial phase of a deployment script or in advance of putting your site into maintenance mode. This is particularly helpful when you have jobs that are relatively long-running and might otherwise be forcibly killed during the deploy.
The [suspend]{.title-ref} command stops workers on _all queues (in a single Redis database) from picking up new jobs. However currently running jobs will continue until completion.
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
Queue Statistics
django_rq
also provides a dashboard to monitor the status of your
queues at /django-rq/
(or whatever URL you set in your urls.py
during installation.
You can also add a link to this dashboard link in /admin
by adding
RQ_SHOW_ADMIN_LINK = True
in settings.py
. Be careful though, this
will override the default admin template so it may interfere with other
apps that modifies the default admin template.
These statistics are also available in JSON format via
/django-rq/stats.json
, which is accessible to staff members. If you
need to access this view via other HTTP clients (for monitoring
purposes), you can define RQ_API_TOKEN
and access it via
/django-rq/stats.json/<API_TOKEN>
.
Note: Statistics of scheduled jobs display jobs from RQ built-in scheduler, not optional RQ scheduler.
Additionally, these statistics are also accessible from the command line.
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Configuring Sentry
Sentry should be configured within the Django settings.py
as described
in the Sentry docs.
You can override the default Django Sentry configuration when running
the rqworker
command by passing the sentry-dsn
option:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
This will override any existing Django configuration and reinitialise Sentry, setting the following Sentry options:
{
'debug': options.get('sentry_debug'),
'ca_certs': options.get('sentry_ca_certs'),
'integrations': [RedisIntegration(), RqIntegration(), DjangoIntegration()]
}
Configuring Logging
RQ uses Python\'s logging
, this means you can easily configure
rqworker
\'s logging mechanism in django\'s settings.py
. For example:
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"rq_console": {
"format": "%(asctime)s %(message)s",
"datefmt": "%H:%M:%S",
},
},
"handlers": {
"rq_console": {
"level": "DEBUG",
"class": "rq.logutils.ColorizingStreamHandler",
"formatter": "rq_console",
"exclude": ["%(asctime)s"],
},
},
'loggers': {
"rq.worker": {
"handlers": ["rq_console", "sentry"],
"level": "DEBUG"
},
}
}
Custom Queue Classes
By default, every queue will use DjangoRQ
class. If you want to use a
custom queue class, you can do so by adding a QUEUE_CLASS
option on a
per queue basis in RQ_QUEUES
:
RQ_QUEUES = {
'default': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'QUEUE_CLASS': 'module.path.CustomClass',
}
}
or you can specify DjangoRQ
to use a custom class for all your queues
in RQ
settings:
RQ = {
'QUEUE_CLASS': 'module.path.CustomClass',
}
Custom queue classes should inherit from django_rq.queues.DjangoRQ
.
If you are using more than one queue class (not recommended), be sure to
only run workers on queues with same queue class. For example if you
have two queues defined in RQ_QUEUES
and one has custom class
specified, you would have to run at least two separate workers for each
queue.
Custom Job and Worker Classes
Similarly to custom queue classes, global custom job and worker classes
can be configured using JOB_CLASS
and WORKER_CLASS
settings:
RQ = {
'JOB_CLASS': 'module.path.CustomJobClass',
'WORKER_CLASS': 'module.path.CustomWorkerClass',
}
Custom job class should inherit from rq.job.Job
. It will be used for
all jobs if configured.
Custom worker class should inherit from rq.worker.Worker
. It will be
used for running all workers unless overridden by rqworker
management
command worker-class
option.
Testing Tip
For an easier testing process, you can run a worker synchronously this way:
from django.test import TestCase
from django_rq import get_worker
class MyTest(TestCase):
def test_something_that_creates_jobs(self):
... # Stuff that init jobs.
get_worker().work(burst=True) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
Synchronous Mode
You can set the option ASYNC
to False
to make synchronous operation
the default for a given queue. This will cause jobs to execute
immediately and on the same thread as they are dispatched, which is
useful for testing and debugging. For example, you might add the
following after you queue configuration in your settings file:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING:
for queueConfig in RQ_QUEUES.values():
queueConfig['ASYNC'] = False
Note that setting the is_async
parameter explicitly when calling
get_queue
will override this setting.
Running Tests
To run django_rq
\'s test suite:
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.
Deploying on Ubuntu
Create an rqworker service that runs the high, default, and low queues.
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory=<<path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python \
<<path_to_your_project_folder>>/manage.py \
rqworker high default low
[Install]
WantedBy=multi-user.target
Enable and start the service
sudo systemctl enable rqworker
sudo systemctl start rqworker
Deploying on Heroku
Add [django-rq]{.title-ref} to your [requirements.txt]{.title-ref} file with:
pip freeze > requirements.txt
Update your [Procfile]{.title-ref} to:
web: gunicorn --pythonpath="$PWD/your_app_name" config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
Commit and re-deploy. Then add your new worker with:
heroku scale worker=1
Changelog
See CHANGELOG.md.