一、简单结构


返回

1.1 任务 worker

  • 启动命令:celery -A celery_task worker -l info

  • celery_task.py

    import celery
    import time
    
    """
    启动命令:celery -A celery_task worker -l info
        -c 并发数
    """
    
    # redis的16个区,0-15
    backend = 'redis://127.0.0.1/1'  # store 数据库
    broker = 'redis://127.0.0.1/2'  # broker 消息队列,推荐RabbitMQ
    
    cel = celery.Celery('test', backend=backend, broker=broker)
    
    
    @cel.task
    def send_mail(info):
        print('sending email...')
        time.sleep(3)
        print(f'send {info} ok!')
        return 'ok'
    
    
    @cel.task
    def send_msg(info):
        print('sending msg...')
        time.sleep(3)
        print(f'send {info} ok!')
        return 'ok'
    
    

1.2 生产者 broker

  • producer_task.py

    from celery_task import send_mail, send_msg
    
    res_mail = send_mail.delay('email')
    print(res_mail.id)
    res_msg = send_msg.delay('msg')
    print(res_msg.id)
    
    

1.3 执行结果 store

  • resuly.py

    from celery.result import AsyncResult
    from celery_task import cel
    
    res_mail = '081a4e7c-b2cd-4b8e-a27b-f098c26ee74e'  # 任务id
    
    async_result_mail = AsyncResult(id=res_mail, app=cel)
    
    if async_result_mail.successful():
        result = async_result_mail.get()
        print(result)
    elif async_result_mail.failed():
        print('failed!')
    elif async_result_mail.status == 'PENDING':
        print('pending...')
    elif async_result_mail.status == 'RETRY':
        print('retry...')
    elif async_result_mail.status == 'STARTED':
        print('started!')
    
    
返回