二、参数


返回

2.1 应答参数

  • 消费者手动应答,只要应答没被执行,队列内容就一直存在

  • producer.py

    import pika
    
    # 1 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
    channel = connection.channel()
    
    # 2 创建队列
    channel.queue_declare(queue='queue_test2_1')
    
    # 3 向指定队列插入数据
    channel.basic_publish(exchange='',  # 简单模式
                          routing_key='queue_test2_1',  # 指定队列
                          body=b'this is test2_1!')
    
    print("send ok!")
    # 4 断开链接
    connection.close()
    
    
  • customer.py

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # 创建队列
    channel.queue_declare(queue='queue_test2_1')
    
    
    # 确定回调函数
    def callback(ch, method, properties, body):
        print("callback recv: %r" % body)
        # 手动应答
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    # 确定监听队列参数
    channel.basic_consume(queue='queue_test2_1',
                          auto_ack=False,  # 默认应答改为手动应答
                          on_message_callback=callback)
    
    print('ready ok!')
    # 正式监听
    channel.start_consuming()
    
    

2.2 持久化参数

  • 队列中的数据持久化保存:durable=True

  • producer.py

    import pika
    
    # 1 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
    channel = connection.channel()
    
    # 2 创可持久化队列:durable=True
    channel.queue_declare(queue='queue_test2_2', durable=True)
    
    # 3 向指定队列插入数据
    channel.basic_publish(exchange='',  # 简单模式
                          routing_key='queue_test2_2',  # 指定队列
                          body=b'this is test2_2',
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 队列数据持久化保存
                          )
                          )
    
    print("send ok!")
    # 4 断开链接
    connection.close()
    
    
  • customer.py

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    #  创建队列
    channel.queue_declare(queue='queue_test2_2', durable=True)
    
    
    # 确定回调函数
    def callback(ch, method, properties, body):
        print("callback recv: %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    # 确定监听队列参数
    channel.basic_consume(queue='queue_test2_2',
                          auto_ack=False,  # 手动应答
                          on_message_callback=callback)
    
    print('ready ok!')
    # 正式监听
    channel.start_consuming()
    
    

2.3 分发参数

  • 默认轮询分发

  • 公平分发:channel.basic_qos(prefetch_count=1)

  • producer.py

    import pika
    
    # 1 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
    channel = connection.channel()
    
    # 2 创建队列
    channel.queue_declare(queue='queue_test2_3')
    
    # 3 向指定队列插入数据
    channel.basic_publish(exchange='',  # 简单模式
                          routing_key='queue_test2_3',  # 指定队列
                          body=b'this is test_2_3',
                          )
    
    print("send ok!")
    # 4 断开链接
    connection.close()
    
    
  • customer.py

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    #  创建队列
    channel.queue_declare(queue='queue_test2_3')
    
    
    # 确定回调函数
    def callback(ch, method, properties, body):
        import time
        # time.sleep(5)
        time.sleep(20)
        print("callback recv: %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    # 公平分发
    channel.basic_qos(prefetch_count=1)
    
    # 确定监听队列参数
    channel.basic_consume(queue='queue_test2_3',
                          auto_ack=False,  # 手动应答
                          on_message_callback=callback)
    
    print('ready ok!')
    # 正式监听
    channel.start_consuming()
    
    
返回