2.1 应答参数
-
消费者手动应答,只要应答没被执行,队列内容就一直存在
-
producer.py
import pika # 1 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) channel = connection.channel() # 2 创建队列 channel.queue_declare(queue='queue_test2_1') # 3 向指定队列插入数据 channel.basic_publish(exchange='', # 简单模式 routing_key='queue_test2_1', # 指定队列 body=b'this is test2_1!') print("send ok!") # 4 断开链接 connection.close()
-
customer.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 创建队列 channel.queue_declare(queue='queue_test2_1') # 确定回调函数 def callback(ch, method, properties, body): print("callback recv: %r" % body) # 手动应答 ch.basic_ack(delivery_tag=method.delivery_tag) # 确定监听队列参数 channel.basic_consume(queue='queue_test2_1', auto_ack=False, # 默认应答改为手动应答 on_message_callback=callback) print('ready ok!') # 正式监听 channel.start_consuming()
2.2 持久化参数
-
队列中的数据持久化保存:
durable=True
-
producer.py
import pika # 1 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) channel = connection.channel() # 2 创可持久化队列:durable=True channel.queue_declare(queue='queue_test2_2', durable=True) # 3 向指定队列插入数据 channel.basic_publish(exchange='', # 简单模式 routing_key='queue_test2_2', # 指定队列 body=b'this is test2_2', properties=pika.BasicProperties( delivery_mode=2, # 队列数据持久化保存 ) ) print("send ok!") # 4 断开链接 connection.close()
-
customer.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 创建队列 channel.queue_declare(queue='queue_test2_2', durable=True) # 确定回调函数 def callback(ch, method, properties, body): print("callback recv: %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 确定监听队列参数 channel.basic_consume(queue='queue_test2_2', auto_ack=False, # 手动应答 on_message_callback=callback) print('ready ok!') # 正式监听 channel.start_consuming()
2.3 分发参数
-
默认轮询分发
-
公平分发:
channel.basic_qos(prefetch_count=1)
-
producer.py
import pika # 1 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) channel = connection.channel() # 2 创建队列 channel.queue_declare(queue='queue_test2_3') # 3 向指定队列插入数据 channel.basic_publish(exchange='', # 简单模式 routing_key='queue_test2_3', # 指定队列 body=b'this is test_2_3', ) print("send ok!") # 4 断开链接 connection.close()
-
customer.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 创建队列 channel.queue_declare(queue='queue_test2_3') # 确定回调函数 def callback(ch, method, properties, body): import time # time.sleep(5) time.sleep(20) print("callback recv: %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 公平分发 channel.basic_qos(prefetch_count=1) # 确定监听队列参数 channel.basic_consume(queue='queue_test2_3', auto_ack=False, # 手动应答 on_message_callback=callback) print('ready ok!') # 正式监听 channel.start_consuming()