二、多目录结构


返回

2.1 任务目录 celery_task

  • 启动命令:celery -A celery_task worker -l info

  • __init__.py

  • celery.py

    from celery import Celery
    
    """
    启动命令:celery -A celery_task worker -l info
    """
    
    # redis的16个区,0-15
    backend = 'redis://127.0.0.1/1'  # store 数据库
    broker = 'redis://127.0.0.1/2'  # broker 消息队列,推荐RabbitMQ
    
    cel = Celery('celery_m', backend=backend, broker=broker,
                 # 包含的任务
                 include=[
                     'celery_task.task01',
                     'celery_task.task02',
                 ])
    cel.conf.timezone = 'Asia/Shanghai'  # 时区
    cel.conf.enable_utc = False  # UTC
    
    
  • task01.py

    import time
    from celery_task.celery import cel
    
    
    @cel.task
    def send_mail(info):
        print('sending email...')
        time.sleep(3)
        print(f'send {info} ok!')
        return 'ok'
    
    
  • task02.py

    import time
    from celery_task.celery import cel
    
    
    @cel.task
    def send_msg(info):
        print('sending msg...')
        time.sleep(3)
        print(f'send {info} ok!')
        return 'ok'
    
    

2.2 生产者 broker

  • producer_task.py

    from celery_task import send_mail, send_msg
    
    res_mail = send_mail.delay('email')
    print(res_mail.id)
    res_msg = send_msg.delay('msg')
    print(res_msg.id)
    
    

2.3 执行结果 store

  • resuly.py

    from celery.result import AsyncResult
    from celery_task import cel
    
    res_mail = '081a4e7c-b2cd-4b8e-a27b-f098c26ee74e'  # 任务id
    
    async_result_mail = AsyncResult(id=res_mail, app=cel)
    
    if async_result_mail.successful():
        result = async_result_mail.get()
        print(result)
    elif async_result_mail.failed():
        print('failed!')
    elif async_result_mail.status == 'PENDING':
        print('pending...')
    elif async_result_mail.status == 'RETRY':
        print('retry...')
    elif async_result_mail.status == 'STARTED':
        print('started!')
    
    
返回