celery-exporter で Celery のメトリクスを Prometheus 形式で公開する

pythonprometheus

Celery は Python の分散タスクキュー。Worker が Redis などの Broker からタスクを取り出して実行する。リトライやスケジューリング、ワークフローといった機能も備える。

import os
from celery import Celery, chain, group

app = Celery('tasks', broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'))

app.conf.update(
    worker_send_task_events=True,
    task_send_sent_event=True,
)

@app.task
def add(x, y):
    return x + y

# Retry
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_url(self, url):
    try:
        return requests.get(url).text
    except Exception as e:
        self.retry(exc=e)

# Workflow: sequential execution
chain(add.s(1, 2), add.s(3))()  # (1+2) + 3 = 6

# Workflow: parallel execution
group(add.s(1, 2), add.s(3, 4))()  # [3, 7]

# Scheduling
app.conf.beat_schedule = {
    'add-every-minute': {
        'task': 'tasks.add',
        'schedule': 60.0,
        'args': (1, 2),
    },
}

デフォルトでは Worker が受信時に ACK したタイミングでタスクをキューから削除するため、Worker クラッシュ時には処理中のタスクが失われる。task_acks_late=True にすると完了後に ACK するため、クラッシュしてもタスクは再配信される。ただし同じタスクが再実行される可能性がある。

app.conf.update(
    task_acks_late=True,  # default: False
)

効率化のためにタスクは prefetch されるが長時間のタスクがある場合は worker_prefetch_multiplier を減らすことで分散処理されやすくなる。

app.conf.update(
    worker_prefetch_multiplier=1,  # default: 4
)

Redis、Celery Worker、celery-exporter を起動する。

# docker-compose.yml
services:
  redis:
    image: redis:8.4
    ports:
      - "6379:6379"

  worker:
    build: .
    command: celery -A tasks worker --loglevel=info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis

  celery-exporter:
    image: danihodovic/celery-exporter:0.12.2
    command: --broker-url=redis://redis:6379/0
    ports:
      - "9808:9808"
    depends_on:
      - redis
FROM python:3.13-slim
WORKDIR /app
RUN pip install --no-cache-dir celery==5.6.2 redis
COPY tasks.py .

起動してタスクを実行する。

$ docker compose up -d --build
$ docker compose exec worker python -c "from tasks import add; add.delay(4, 6)"

celery-exporter を使うと Celery のイベントからタスクの実行状況を集計して Prometheus 形式で取得できる。タスクの送信、受信、開始、成功、失敗の回数や実行時間、キューの長さといったメトリクスが取れる。

$ curl localhost:9808/metrics
# HELP celery_task_sent_total Sent when a task message is published.
# TYPE celery_task_sent_total counter
celery_task_sent_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_sent_created Sent when a task message is published.
# TYPE celery_task_sent_created gauge
celery_task_sent_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.770589914288271e+09
# HELP celery_task_received_total Sent when the worker receives a task.
# TYPE celery_task_received_total counter
celery_task_received_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_received_created Sent when the worker receives a task.
# TYPE celery_task_received_created gauge
celery_task_received_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142892926e+09
# HELP celery_task_started_total Sent just before the worker executes the task.
# TYPE celery_task_started_total counter
celery_task_started_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_started_created Sent just before the worker executes the task.
# TYPE celery_task_started_created gauge
celery_task_started_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893069e+09
# HELP celery_task_succeeded_total Sent if the task executed successfully.
# TYPE celery_task_succeeded_total counter
celery_task_succeeded_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
# HELP celery_task_succeeded_created Sent if the task executed successfully.
# TYPE celery_task_succeeded_created gauge
celery_task_succeeded_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893167e+09
# HELP celery_task_failed_total Sent if the execution of the task failed.
# TYPE celery_task_failed_total counter
celery_task_failed_total{exception="",hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_failed_created Sent if the execution of the task failed.
# TYPE celery_task_failed_created gauge
celery_task_failed_created{exception="",hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893283e+09
# HELP celery_task_rejected_total The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.
# TYPE celery_task_rejected_total counter
celery_task_rejected_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_rejected_created The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.
# TYPE celery_task_rejected_created gauge
celery_task_rejected_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142893376e+09
# HELP celery_task_revoked_total Sent if the task has been revoked.
# TYPE celery_task_revoked_total counter
celery_task_revoked_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_revoked_created Sent if the task has been revoked.
# TYPE celery_task_revoked_created gauge
celery_task_revoked_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.770589914289346e+09
# HELP celery_task_retried_total Sent if the task failed, but will be retried in the future.
# TYPE celery_task_retried_total counter
celery_task_retried_total{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.0
# HELP celery_task_retried_created Sent if the task failed, but will be retried in the future.
# TYPE celery_task_retried_created gauge
celery_task_retried_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.770589914289354e+09
# HELP celery_worker_up Indicates if a worker has recently sent a heartbeat.
# TYPE celery_worker_up gauge
celery_worker_up{hostname="31709bfbe1e0"} 1.0
# HELP celery_worker_tasks_active The number of tasks the worker is currently processing
# TYPE celery_worker_tasks_active gauge
celery_worker_tasks_active{hostname="31709bfbe1e0"} 0.0
# HELP celery_task_runtime Histogram of task runtime measurements.
# TYPE celery_task_runtime histogram
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.005",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.01",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.025",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.05",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.075",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.1",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.25",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.5",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="0.75",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="1.0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="2.5",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="5.0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="7.5",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="10.0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_bucket{hostname="31709bfbe1e0",le="+Inf",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_count{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.0
celery_task_runtime_sum{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 0.00024117900102282874
# HELP celery_task_runtime_created Histogram of task runtime measurements.
# TYPE celery_task_runtime_created gauge
celery_task_runtime_created{hostname="31709bfbe1e0",name="tasks.add",queue_name="celery"} 1.7705899142916417e+09
# HELP celery_queue_length The number of message in broker queue.
# TYPE celery_queue_length gauge
celery_queue_length{queue_name="celery"} 0.0
# HELP celery_active_consumer_count The number of active consumer in broker queue.
# TYPE celery_active_consumer_count gauge
# HELP celery_active_worker_count The number of active workers in broker queue.
# TYPE celery_active_worker_count gauge
celery_active_worker_count{queue_name="celery"} 1.0
# HELP celery_active_process_count The number of active processes in broker queue.
# TYPE celery_active_process_count gauge
celery_active_process_count{queue_name="celery"} 8.0