2.1 任务目录 celery_task
-
启动命令:
celery -A celery_task worker -l info
-
__init__.py
-
celery.py
from celery import Celery """ 启动命令:celery -A celery_task worker -l info """ # redis的16个区,0-15 backend = 'redis://127.0.0.1/1' # store 数据库 broker = 'redis://127.0.0.1/2' # broker 消息队列,推荐RabbitMQ cel = Celery('celery_m', backend=backend, broker=broker, # 包含的任务 include=[ 'celery_task.task01', 'celery_task.task02', ]) cel.conf.timezone = 'Asia/Shanghai' # 时区 cel.conf.enable_utc = False # UTC
-
task01.py
import time from celery_task.celery import cel @cel.task def send_mail(info): print('sending email...') time.sleep(3) print(f'send {info} ok!') return 'ok'
-
task02.py
import time from celery_task.celery import cel @cel.task def send_msg(info): print('sending msg...') time.sleep(3) print(f'send {info} ok!') return 'ok'
2.2 生产者 broker
-
producer_task.py
from celery_task import send_mail, send_msg res_mail = send_mail.delay('email') print(res_mail.id) res_msg = send_msg.delay('msg') print(res_msg.id)
2.3 执行结果 store
-
resuly.py
from celery.result import AsyncResult from celery_task import cel res_mail = '081a4e7c-b2cd-4b8e-a27b-f098c26ee74e' # 任务id async_result_mail = AsyncResult(id=res_mail, app=cel) if async_result_mail.successful(): result = async_result_mail.get() print(result) elif async_result_mail.failed(): print('failed!') elif async_result_mail.status == 'PENDING': print('pending...') elif async_result_mail.status == 'RETRY': print('retry...') elif async_result_mail.status == 'STARTED': print('started!')