Celery は Python の分散タスクキュー。Worker が Redis などの Broker からタスクを取り出して実行する。リトライやスケジューリング、ワークフローといった機能も備える。
import os
from celery import Celery, chain, group
app = Celery('tasks', broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'))
app.conf.update(
worker_send_task_events=True,
task_send_sent_event=True,
)
@app.task
def add(x, y):
return x + y
# Retry
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_url(self, url):
try:
return requests.get(url).text
except Exception as e:
self.retry(exc=e)
# Workflow: sequential execution
chain(add.s(1, 2), add.s(3))() # (1+2) + 3 = 6
# Workflow: parallel execution
group(add.s(1, 2), add.s(3, 4))() # [3, 7]
# Scheduling
app.conf.beat_schedule = {
'add-every-minute': {
'task': 'tasks.add',
'schedule': 60.0,
'args': (1, 2),
},
}
デフォルトでは Worker が受信時に ACK したタイミングでタスクをキューから削除するため、Worker クラッシュ時には処理中のタスクが失われる。task_acks_late=True にすると完了後に ACK するため、クラッシュしてもタスクは再配信される。ただし同じタスクが再実行される可能性がある。
app.conf.update(
task_acks_late=True, # default: False
)
効率化のためにタスクは prefetch されるが長時間のタスクがある場合は worker_prefetch_multiplier を減らすことで分散処理されやすくなる。
app.conf.update(
worker_prefetch_multiplier=1, # default: 4
)
Redis、Celery Worker、celery-exporter を起動する。
# docker-compose.yml
services:
redis:
image: redis:8.4
ports:
- "6379:6379"
worker:
build: .
command: celery -A tasks worker --loglevel=info
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
celery-exporter:
image: danihodovic/celery-exporter:0.12.2
command: --broker-url=redis://redis:6379/0
ports:
- "9808:9808"
depends_on:
- redis
FROM python:3.13-slim
WORKDIR /app
RUN pip install --no-cache-dir celery==5.6.2 redis
COPY tasks.py .
起動してタスクを実行する。
$ docker compose up -d --build
$ docker compose exec worker python -c "from tasks import add; add.delay(4, 6)"
celery-exporter を使うと Celery のイベントからタスクの実行状況を集計して Prometheus 形式で取得できる。タスクの送信、受信、開始、成功、失敗の回数や実行時間、キューの長さといったメトリクスが取れる。
$ curl localhost:9808/metrics
# HELP celery_task_sent_total Sent when a task message is published.
# TYPE celery_task_sent_total counter
celery_task_sent_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_sent_created Sent when a task message is published.
# TYPE celery_task_sent_created gauge
celery_task_sent_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.770589914288271e+09
# HELP celery_task_received_total Sent when the worker receives a task.
# TYPE celery_task_received_total counter
celery_task_received_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_received_created Sent when the worker receives a task.
# TYPE celery_task_received_created gauge
celery_task_received_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142892926e+09
# HELP celery_task_started_total Sent just before the worker executes the task.
# TYPE celery_task_started_total counter
celery_task_started_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_started_created Sent just before the worker executes the task.
# TYPE celery_task_started_created gauge
celery_task_started_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893069e+09
# HELP celery_task_succeeded_total Sent if the task executed successfully.
# TYPE celery_task_succeeded_total counter
celery_task_succeeded_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_succeeded_created Sent if the task executed successfully.
# TYPE celery_task_succeeded_created gauge
celery_task_succeeded_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893167e+09
# HELP celery_task_failed_total Sent if the execution of the task failed.
# TYPE celery_task_failed_total counter
celery_task_failed_total{exception="",hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_failed_created Sent if the execution of the task failed.
# TYPE celery_task_failed_created gauge
celery_task_failed_created{exception="",hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893283e+09
# HELP celery_task_rejected_total The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.
# TYPE celery_task_rejected_total counter
celery_task_rejected_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_rejected_created The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.
# TYPE celery_task_rejected_created gauge
celery_task_rejected_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893376e+09
# HELP celery_task_revoked_total Sent if the task has been revoked.
# TYPE celery_task_revoked_total counter
celery_task_revoked_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_revoked_created Sent if the task has been revoked.
# TYPE celery_task_revoked_created gauge
celery_task_revoked_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.770589914289346e+09
# HELP celery_task_retried_total Sent if the task failed, but will be retried in the future.
# TYPE celery_task_retried_total counter
celery_task_retried_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_retried_created Sent if the task failed, but will be retried in the future.
# TYPE celery_task_retried_created gauge
celery_task_retried_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.770589914289354e+09
# HELP celery_worker_up Indicates if a worker has recently sent a heartbeat.
# TYPE celery_worker_up gauge
celery_worker_up{hostname="31709bfbe1e0"} 1.0
# HELP celery_worker_tasks_active The number of tasks the worker is currently processing
# TYPE celery_worker_tasks_active gauge
celery_worker_tasks_active{hostname="31709bfbe1e0"} 0.0
# HELP celery_task_runtime Histogram of task runtime measurements.
# TYPE celery_task_runtime histogram
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.005",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.01",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.025",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.05",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.075",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.1",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.25",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.5",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.75",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="1.0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="2.5",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="5.0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="7.5",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="10.0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="+Inf",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_count{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_sum{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.00024117900102282874
# HELP celery_task_runtime_created Histogram of task runtime measurements.
# TYPE celery_task_runtime_created gauge
celery_task_runtime_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142916417e+09
# HELP celery_queue_length The number of message in broker queue.
# TYPE celery_queue_length gauge
celery_queue_length{queue_name="celery"} 0.0
# HELP celery_active_consumer_count The number of active consumer in broker queue.
# TYPE celery_active_consumer_count gauge
# HELP celery_active_worker_count The number of active workers in broker queue.
# TYPE celery_active_worker_count gauge
celery_active_worker_count{queue_name="celery"} 1.0
# HELP celery_active_process_count The number of active processes in broker queue.
# TYPE celery_active_process_count gauge
celery_active_process_count{queue_name="celery"} 8.0