# dj_celery_pro **Repository Path**: duans/dj_celery_pro ## Basic Information - **Project Name**: dj_celery_pro - **Description**: celery+rabbitMQ示例项目 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-19 - **Last Updated**: 2026-03-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: celery, c异步任务队列, 异步任务队列 ## README # celery异步任务队列 Celery 是一个 **基于分布式消息传递的异步任务队列(Task Queue)**,广泛用于 Python 项目中处理耗时操作(如发送邮件、数据处理、定时任务等),避免阻塞 Web 请求。 ## 核心概念 | 概念 | 说明 | | :----------------------------- | :--------------------------------------------------------- | | **Task(任务)** | 一个可被异步调用的 Python 函数,用 `@app.task` 装饰 | | **Broker(消息中间件)** | 任务队列的“邮局”,负责接收和分发任务(如 Redis、RabbitMQ) | | **Worker(工作进程)** | 后台运行的进程,从 Broker 获取任务并执行 | | **Result Backend(结果存储)** | 可选,用于存储任务执行结果(如 Redis、数据库) | | **Beat(定时调度器)** | 用于周期性任务(类似 cron) | ## 安装 ```shell pip install celery==5.2.7 redis ``` `celery==5.2.7` 是最后一个对 Windows 支持较好的版本。 ## 常用命令 | 操作 | 命令 | | :----------- | :----------------------------------------------------- | | 查看任务状态 | `result.state` → `'PENDING'`, `'SUCCESS'`, `'FAILURE'` | | 等待任务完成 | `result.get(timeout=10)` | | 终止任务 | `revoke(task_id, terminate=True)` | | 重试任务 | 在 task 中 raise 异常 + `autoretry_for` | | 监控 | `celery -A proj flower`(需安装 `flower`) | ## Django 中使用 Celery ### 项目目录结构 ``` myproject/ # Django 项目根目录 ├── myproject/ # 主应用包(含 settings.py) │ ├── __init__.py │ ├── settings.py │ ├── urls.py │ ├── wsgi.py │ └── celery.py # ← Celery 配置文件(新增) ├── app1/ # 你的业务应用 │ ├── tasks.py # ← 任务定义文件(新增) ├── manage.py └── requirements.txt ``` ### 实现步骤 1. 在 `settings.py` 同级目录创建 `celery.py`: ```python # 示例 # myproject/celery.py import os from celery import Celery # =============防止celery报错关键代码=====================# import platform if platform.system() == "Windows": # 避免 spawn 模式下的序列化问题 os.environ["FORKED_BY_MULTIPROCESSING"] = "1" # =============防止celery报错关键代码=====================# # 设置环境变量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名称.settings') # 创建celery实例 app = Celery('项目名称') # 加载配置文件 app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() ``` ```python # 实例 # myproject/celery.py import os from celery import Celery # =============防止celery报错关键代码=====================# import platform if platform.system() == "Windows": # 避免 spawn 模式下的序列化问题 os.environ["FORKED_BY_MULTIPROCESSING"] = "1" # =============防止celery报错关键代码=====================# # 设置环境变量 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dj_celery_pro.settings") # 创建celery实例 app = Celery("dj_celery_pro") # 加载配置文件 app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() ``` 2. 在 `__init__.py` 中初始化: ```python # myproject/__init__.py from .celery import app as celery_app __all__ = ('celery_app',) ``` 3. 在 `settings.py` 中配置: ```python # myproject/settings.py # ... 其他配置 ... # ===== Celery 配置 ===== CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Broker 地址 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 结果存储(可选) CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # 与 Django TIME_ZONE 一致 # 可选:任务路由、超时等 # CELERY_TASK_DEFAULT_QUEUE = 'default' # CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 分钟超时 ``` 如果 Redis 有密码:`redis://:password@127.0.0.1:6379/0` 4. 任务写在任意 `tasks.py` 中,用 `@shared_task`: ```python # 示例 from celery import shared_task @shared_task def my_task(): # 重要函数执行过程中没有抛出异常, celery都认为异步任务执行成功 return "Done" ``` ```python # 实例 # 在任意 Django App(如 app1/)下创建 tasks.py: # app1/tasks.py from celery import shared_task from django.core.mail import send_mail import time @shared_task def send_email_task(subject, message, recipient_list): """异步发送邮件""" send_mail( subject=subject, message=message, from_email='noreply@example.com', recipient_list=recipient_list, fail_silently=False, ) return f"Email sent to {recipient_list}" @shared_task(bind=True) def long_running_task(self, n): """带重试和进度的任务示例""" try: for i in range(n): time.sleep(1) # 更新任务状态(可用于前端轮询) self.update_state(state='PROGRESS', meta={'current': i, 'total': n}) return {'status': 'Done', 'result': n} except Exception as exc: # 自动重试 3 次 raise self.retry(exc=exc, countdown=60, max_retries=3) ``` 使用 `@shared_task` 而不是 `@app.task`,更适配 Django。 5. 启动 Celery Worker(在项目根目录) ```python celery -A 你的项目名称 worker --loglevel=info # --pool=solo : 单进程,无并发,但稳定, 这能绕过所有多进程/序列化问题,适合本地调试。 celery -A 你的项目名称 worker --loglevel=info --pool=solo ``` `-A myproject` 对应 `myproject/celery.py` 中的 `app` 实例。 6. (可选)启动 Beat(定时任务) ```python celery -A myproject beat --loglevel=info ``` 7. 启动django项目开发服务器 ```python python manage.py runserver ``` 8. 在视图或代码中调用任务 ```python # views.py 或任意 Python 代码 from app1.tasks import send_email_task, long_running_task def my_view(request): # 异步调用(立即返回) task = send_email_task.delay( "Welcome!", "Thank you for signing up.", ["user@example.com"] ) return HttpResponse(f"Task ID: {task.id}") def check_task_status(request, task_id): from celery.result import AsyncResult from myproject.celery import app result = AsyncResult(task_id, app=app) if result.ready(): return JsonResponse({'status': 'SUCCESS', 'result': result.result}) elif result.state == 'PROGRESS': return JsonResponse({'status': 'PROGRESS', 'meta': result.info}) else: return JsonResponse({'status': 'PENDING'}) ``` 9. 配置定时任务(Periodic Tasks) ```python # settings.py from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'send-daily-report': { 'task': 'app1.tasks.send_email_task', 'schedule': crontab(hour=8, minute=0), # 每天 8:00 'args': ('Daily Report', 'Here is your report.', ['admin@example.com']) }, 'cleanup-every-hour': { 'task': 'app1.tasks.cleanup_old_data', 'schedule': 3600.0, # 每小时(秒) }, } ``` 然后启动 Beat: ```python celery -A myproject beat --loglevel=info ``` 也可以使用 django-celery-beat(数据库管理定时任务) 10. 注意事项 1. **不要在任务中处理大文件或长时间阻塞 I/O** → 考虑拆分任务或使用 `gevent`/`eventlet`。 2. **设置超时和重试机制**: ```python @app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3}) def unreliable_task(self): ... ``` 3. **使用 `.delay()` 或 `.apply_async()` 调用任务**: ```python 使用 .delay() 或 .apply_async() 调用任务: ``` 4. **生产环境用 Supervisor/systemd 管理 Worker 进程**。 ## `django-celery-results` **`django-celery-results` 这个第三方 Django 应用提供的模型(Model)**,它会把 Celery 任务的执行结果(包括成功、失败、参数、返回值、异常等)**持久化存储到 Django 默认的数据库中**(即你在 `settings.py` 中配置的 `DATABASES`)。 ### 安装 ```shell pip install django-celery-results ``` ### 注册并配置 ```python # settings.py INSTALLED_APPS = [ ... 'django_celery_results', ] CELERY_RESULT_BACKEND = 'django-db' # 关键!表示用 Django ORM 存结果 ``` ### **运行迁移** ```python python manage.py migrate django_celery_results ``` 这会在你的数据库中创建一张表(如 `django_celery_results_taskresult`) ### 查询所有失败任务并重试 ```python from django_celery_results.models import TaskResult from myapp.tasks import my_task # 查询所有失败任务(按任务名过滤) failed_tasks = TaskResult.objects.filter( status='FAILURE', task_name='myapp.tasks.my_task' ) for task in failed_tasks: # 解析原始参数(存储为 JSON 字符串) import json args = json.loads(task.task_args or '[]') kwargs = json.loads(task.task_kwargs or '{}') # 重新执行 my_task.apply_async(args=args, kwargs=kwargs) print(f"Retried task {task.task_id}") ```