1.1 任务 worker
-
启动命令:
celery -A celery_task worker -l info
-
celery_task.py
import celery import time """ 启动命令:celery -A celery_task worker -l info -c 并发数 """ # redis的16个区,0-15 backend = 'redis://127.0.0.1/1' # store 数据库 broker = 'redis://127.0.0.1/2' # broker 消息队列,推荐RabbitMQ cel = celery.Celery('test', backend=backend, broker=broker) @cel.task def send_mail(info): print('sending email...') time.sleep(3) print(f'send {info} ok!') return 'ok' @cel.task def send_msg(info): print('sending msg...') time.sleep(3) print(f'send {info} ok!') return 'ok'
1.2 生产者 broker
-
producer_task.py
from celery_task import send_mail, send_msg res_mail = send_mail.delay('email') print(res_mail.id) res_msg = send_msg.delay('msg') print(res_msg.id)
1.3 执行结果 store
-
resuly.py
from celery.result import AsyncResult from celery_task import cel res_mail = '081a4e7c-b2cd-4b8e-a27b-f098c26ee74e' # 任务id async_result_mail = AsyncResult(id=res_mail, app=cel) if async_result_mail.successful(): result = async_result_mail.get() print(result) elif async_result_mail.failed(): print('failed!') elif async_result_mail.status == 'PENDING': print('pending...') elif async_result_mail.status == 'RETRY': print('retry...') elif async_result_mail.status == 'STARTED': print('started!')