3.1 发布订阅模式
-
producer.py
import pika # 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 声明一个名为logs、类型为fanout的交换机 channel.exchange_declare(exchange='logs', exchange_type='fanout') # fanout:发布订阅模式参数 # 向logs交换机插入数据 channel.basic_publish(exchange='logs', # 指定交换机 routing_key='', # 不用指定队列 body=b"this is test3_1") print("send ok!") connection.close()
-
customer.py
import pika # 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 声明一个名为logs、类型为fanout的交换机 channel.exchange_declare(exchange='logs', exchange_type='fanout') # 消费者创建队列 result = channel.queue_declare("", exclusive=True) # exclusive=True,设置队列名字为随机 queue_test3_1 = result.method.queue # 拿到队列 print(queue_test3_1) # 将队列绑定到交换机 channel.queue_bind(exchange='logs', queue=queue_test3_1) print('ready ok!') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_test3_1, auto_ack=True, on_message_callback=callback) channel.start_consuming()
3.2 关键字模式
-
在发布订阅基础上,新增关键字过滤发送
-
producer.py
import pika # 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 声明一个名为logs2、类型为direct的交换机 channel.exchange_declare(exchange='logs2', exchange_type='direct') # direct:关键字模式参数 # 向logs2交换机插入数据 channel.basic_publish(exchange='logs2', routing_key='info', # 指定队列关键字:info、error、warning body=b"this is test3_2") print("send ok!") connection.close()
-
customer.py
import pika # 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 声明一个名为logs、类型为direct的交换机 channel.exchange_declare(exchange='logs2', exchange_type='direct') # 创建队列 result = channel.queue_declare("", exclusive=True) # exclusive=True,设置队列名字为随机 queue_test3_2 = result.method.queue # 拿到队列 print(queue_test3_2) # 将队列绑定到交换机上 channel.queue_bind(exchange='logs2', queue=queue_test3_2, routing_key="info" # 指定队列关键字 info ) channel.queue_bind(exchange='logs2', queue=queue_test3_2, routing_key="error" # 指定队列关键字 error ) channel.queue_bind(exchange='logs2', queue=queue_test3_2, routing_key="warning" # 指定队列关键字 warning ) print('ready ok!') def callback(ch, method, properties, body): print("callback recv: %r" % body) channel.basic_consume(queue=queue_test3_2, auto_ack=True, on_message_callback=callback) channel.start_consuming()
3.3 通配符模式
-
在关键字基础上,新增通配符匹配
-
producer.py
import pika # 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 声明一个名为logs2、类型为topic的交换机 channel.exchange_declare(exchange='logs3', exchange_type='topic') # topic:通配符模式参数 # 向logs3交换机插入数据 channel.basic_publish(exchange='logs3', routing_key='test.weather', body=b"this is test3_3") print("send ok!") connection.close()
-
customer.py
import pika # 链接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 声明一个名为logs、类型为topic的交换机 channel.exchange_declare(exchange='logs3', exchange_type='topic') # 创建队列 result = channel.queue_declare("", exclusive=True) # exclusive=True,设置队列名字为随机 queue_test3_3 = result.method.queue # 拿到队列 print(queue_test3_3) # 将指定队列绑定到交换机上 channel.queue_bind(exchange='logs3', queue=queue_test3_3, routing_key="#.weather" # 指定队列通配符关键字 ) print('ready ok!') def callback(ch, method, properties, body): print("callback recv: %r" % body) channel.basic_consume(queue=queue_test3_3, auto_ack=True, on_message_callback=callback) channel.start_consuming()