Migrating from RQ to Celery

This walkthrough sits under RQ vs Celery for Python and the broader Backend Frameworks & Worker Scaling guide, and assumes you have already decided Celery's routing, workflow, and multi-broker capabilities justify leaving RQ behind.

Teams outgrow RQ when they need fan-out workflows, per-task rate limits, or a broker other than Redis. The migration risk is concrete: a big-bang switch leaves in-flight RQ jobs stranded in Redis lists that Celery workers will never read, so payments retry never fires and scheduled jobs silently stop. The safe path is an incremental cutover where both systems drain the same logical work until the RQ queues hit zero, then you decommission RQ. This guide maps every RQ concept to its Celery equivalent and gives you a reversible rollout.

Prerequisites

  • A running RQ deployment with rq and redis pinned in requirements.txt, and a known list of queue names (default, high, etc.).
  • An inventory of every queue.enqueue(...) call site and every @job-decorated or plain function used as a task.
  • Celery 5.3+ installed alongside RQ: pip install "celery[redis]>=5.3".
  • A Redis instance you can point Celery at — ideally a separate database index from RQ (e.g. RQ on db=0, Celery on db=2) so the two systems never collide on keys.
  • Idempotent task bodies. During cutover a job may run once under RQ and, if redelivered, again under Celery. Review exactly-once vs at-least-once delivery before you start.

Step 1: Map RQ concepts to Celery

Build a translation table before touching code so every call site has a known target. The mapping is mostly mechanical.

RQ Celery Notes
Queue('high', connection=r) task_routes + named queue Routing is config-driven, not an object you pass around
@job('default') / plain function @shared_task Decorator wraps the callable as a registered task
queue.enqueue(fn, arg) fn.delay(arg) / fn.apply_async(args, queue='high') delay is the shorthand; apply_async exposes routing/ETA
Retry(max=3, interval=[10,30,60]) autoretry_for + retry_backoff Celery computes backoff; you don't pass a literal list
enqueue_in(timedelta, fn) apply_async(countdown=...) / eta=... Delayed execution is a kwarg
rq-scheduler cron Celery Beat Periodic schedule lives in beat_schedule
rq worker high default celery -A app worker -Q high,default -Q selects queues
job.result / RQ result TTL result_backend + AsyncResult Results need an explicit backend

Write this down per call site. The dangerous rows are retries and scheduling, because RQ's literal interval list has no direct Celery analogue — Celery derives delays from a backoff policy instead.

Step 2: Stand up a Celery app next to RQ

Add a Celery app without removing any RQ code. Point it at a distinct Redis database so RQ keys and Celery keys cannot overwrite each other.

# app/celery_app.py
from celery import Celery

celery_app = Celery("myproject")
celery_app.conf.update(
    broker_url="redis://redis-primary:6379/2",      # separate db from RQ (db=0)
    result_backend="redis://redis-primary:6379/3",  # results isolated again
    task_serializer="json",                          # avoid pickle RCE risk
    accept_content=["json"],
    result_serializer="json",
    task_acks_late=True,                             # ack after success, not on receive
    task_reject_on_worker_lost=True,                 # requeue if a worker dies mid-task
    broker_connection_retry_on_startup=True,
    # Route by name so producers stay simple:
    task_routes={
        "tasks.send_email": {"queue": "high"},
        "tasks.*": {"queue": "default"},
    },
)

This config keeps the two systems on the same Redis host but in separate logical databases, eliminating the namespace-collision failure mode where RQ and Celery fight over the same list keys.

Step 3: Translate task definitions

Convert RQ task functions to Celery tasks. Keep the function body identical; only the decorator and signature change.

# Before — RQ task
# tasks.py
from rq.decorators import job
from redis import Redis

redis_conn = Redis(host="redis-primary", port=6379, db=0)

@job("high", connection=redis_conn, timeout=300)
def send_email(user_id, template):
    deliver(user_id, template)   # unchanged business logic
# After — Celery task
# tasks.py
from app.celery_app import celery_app

@celery_app.shared_task(
    name="tasks.send_email",       # explicit name matches task_routes
    bind=True,
    soft_time_limit=300,           # RQ timeout -> soft_time_limit
    time_limit=330,                # hard kill 30s later
)
def send_email(self, user_id, template):
    deliver(user_id, template)     # business logic untouched

Note soft_time_limit raises SoftTimeLimitExceeded inside the task so you can clean up; time_limit is the hard SIGKILL boundary. RQ's single timeout maps to the soft limit.

Step 4: Translate retries and scheduling

RQ's Retry(max=3, interval=[10, 30, 60]) enumerates delays explicitly. Celery instead declares a backoff policy and computes the delays for you.

# Celery retry: equivalent intent to Retry(max=3, interval=[10,30,60])
@celery_app.shared_task(
    bind=True,
    autoretry_for=(ConnectionError, TimeoutError),  # which exceptions retry
    retry_backoff=10,        # base delay -> ~10, 20, 40s (exponential)
    retry_backoff_max=60,    # cap each delay at 60s (matches RQ's tail)
    retry_jitter=True,       # spread retries to avoid thundering herd
    max_retries=3,
)
def charge_card(self, order_id):
    gateway.charge(order_id)

For delayed and periodic jobs, translate the dispatch site rather than the task:

# RQ delayed:  queue.enqueue_in(timedelta(minutes=5), reminder, uid)
reminder.apply_async(args=[uid], countdown=300)   # Celery: 5 minutes later

# RQ scheduler cron -> Celery Beat
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
    "nightly-report": {
        "task": "tasks.nightly_report",
        "schedule": crontab(hour=2, minute=0),    # 02:00 daily
    },
}

If you rely heavily on cron-style schedules, plan that side of the move with Celery Beat periodic task scheduling before cutover.

Step 5: Update producers and run both workers

Now flip enqueue call sites. Keep RQ workers running so anything already queued still drains.

# Before — RQ producer
from redis import Redis
from rq import Queue
q = Queue("high", connection=Redis(db=0))
q.enqueue(send_email, user_id, "welcome", timeout=300)
# After — Celery producer
from tasks import send_email
send_email.apply_async(args=[user_id, "welcome"], queue="high")

Start a Celery worker beside the existing rq worker process so both consume during the transition window:

# Existing RQ worker keeps draining old jobs already in db=0
rq worker high default

# New Celery worker handles all newly enqueued work in db=2
celery -A app.celery_app worker -Q high,default --concurrency=8 --loglevel=info

# Beat scheduler (run exactly one replica) for periodic tasks
celery -A app.celery_app beat --loglevel=info

Deploy the producer change. From this moment all new work goes to Celery; RQ only finishes its backlog.

Verification

Confirm RQ's queues drain to zero and Celery's are flowing before you remove anything.

# RQ backlog must trend to 0 — these lists should empty out
redis-cli -n 0 LLEN rq:queue:high
redis-cli -n 0 LLEN rq:queue:default

# Celery is receiving and finishing work
celery -A app.celery_app inspect active
celery -A app.celery_app inspect stats | grep -A2 total
# Assert a known task round-trips through Celery before cutover sign-off
from tasks import send_email
r = send_email.apply_async(args=[1, "smoke-test"], queue="high")
assert r.get(timeout=30) is None        # raises on failure/timeout
print("celery path verified:", r.state) # -> SUCCESS

Treat the RQ LLEN of every queue reaching and staying at 0 (including delayed/scheduled sets) as the gate for decommissioning RQ workers.

Gotchas & edge cases

  • Stranded delayed jobs. enqueue_in and rq-scheduler entries live in Redis sorted sets, not the main list. They will not show up in LLEN and Celery cannot read them. Enumerate rq:scheduler:scheduled_jobs and re-issue any future-dated work as Celery countdown/eta tasks, or wait for them to fire under a still-running RQ worker before shutdown.
  • Pickle vs JSON payloads. RQ pickles arguments by default, so it happily enqueues non-JSON-serializable objects (datetimes, model instances). Celery configured with accept_content=['json'] will reject them at enqueue time. Convert such arguments to plain types (IDs, ISO strings) at the call site, not inside the task.
  • Result API mismatch. Code that reads job.result synchronously must move to AsyncResult(id).get() and requires a configured result_backend. If you only used RQ to fire-and-forget, skip the backend entirely to avoid the extra Redis write per task.
  • Two Beat schedulers. Running more than one celery beat replica double-fires every periodic task. Pin Beat to a single replica (or use a leader-elected lock) — this is the most common post-migration duplicate-execution bug.

Rollback

Because producers are the only thing that changed atomically, rollback is a single deploy: revert the producer commit so enqueue calls go back to queue.enqueue(...) on RQ db=0. The RQ workers never stopped, so they immediately pick the work back up. Leave the Celery worker running until its in-flight tasks finish, then scale it to zero. Keep the separate Redis database indices in place during the bake period so a rollback never has to untangle mixed keys.

Related