三、交换机模式


返回

3.1 发布订阅模式

  • producer.py

    import pika
    
    # 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    
    # 声明一个名为logs、类型为fanout的交换机
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')  # fanout:发布订阅模式参数
    
    # 向logs交换机插入数据
    channel.basic_publish(exchange='logs',  # 指定交换机
                          routing_key='',  # 不用指定队列
                          body=b"this is test3_1")
    print("send ok!")
    connection.close()
    
    
  • customer.py

    import pika
    
    # 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 声明一个名为logs、类型为fanout的交换机
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    # 消费者创建队列
    result = channel.queue_declare("", exclusive=True)  # exclusive=True,设置队列名字为随机
    queue_test3_1 = result.method.queue  # 拿到队列
    print(queue_test3_1)
    
    # 将队列绑定到交换机
    channel.queue_bind(exchange='logs',
                       queue=queue_test3_1)
    
    print('ready ok!')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(queue=queue_test3_1,
                          auto_ack=True,
                          on_message_callback=callback)
    
    channel.start_consuming()
    
    

3.2 关键字模式

  • 在发布订阅基础上,新增关键字过滤发送

  • producer.py

    import pika
    
    # 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    
    # 声明一个名为logs2、类型为direct的交换机
    channel.exchange_declare(exchange='logs2',
                             exchange_type='direct')  # direct:关键字模式参数
    
    # 向logs2交换机插入数据
    channel.basic_publish(exchange='logs2',
                          routing_key='info',  # 指定队列关键字:info、error、warning
                          body=b"this is test3_2")
    print("send ok!")
    connection.close()
    
    
  • customer.py

    import pika
    
    # 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 声明一个名为logs、类型为direct的交换机
    channel.exchange_declare(exchange='logs2',
                             exchange_type='direct')
    
    # 创建队列
    result = channel.queue_declare("", exclusive=True)  # exclusive=True,设置队列名字为随机
    queue_test3_2 = result.method.queue  # 拿到队列
    print(queue_test3_2)
    
    # 将队列绑定到交换机上
    channel.queue_bind(exchange='logs2',
                       queue=queue_test3_2,
                       routing_key="info"  # 指定队列关键字 info
                       )
    channel.queue_bind(exchange='logs2',
                       queue=queue_test3_2,
                       routing_key="error"  # 指定队列关键字 error
                       )
    channel.queue_bind(exchange='logs2',
                       queue=queue_test3_2,
                       routing_key="warning"  # 指定队列关键字 warning
                       )
    print('ready ok!')
    
    
    def callback(ch, method, properties, body):
        print("callback recv: %r" % body)
    
    
    channel.basic_consume(queue=queue_test3_2,
                          auto_ack=True,
                          on_message_callback=callback)
    
    channel.start_consuming()
    
    

3.3 通配符模式

  • 在关键字基础上,新增通配符匹配

  • producer.py

    import pika
    
    # 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    
    # 声明一个名为logs2、类型为topic的交换机
    channel.exchange_declare(exchange='logs3',
                             exchange_type='topic')  # topic:通配符模式参数
    
    # 向logs3交换机插入数据
    channel.basic_publish(exchange='logs3',
                          routing_key='test.weather',
                          body=b"this is test3_3")
    print("send ok!")
    connection.close()
    
    
  • customer.py

    import pika
    
    # 链接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 声明一个名为logs、类型为topic的交换机
    channel.exchange_declare(exchange='logs3',
                             exchange_type='topic')
    
    # 创建队列
    result = channel.queue_declare("", exclusive=True)  # exclusive=True,设置队列名字为随机
    queue_test3_3 = result.method.queue  # 拿到队列
    print(queue_test3_3)
    
    # 将指定队列绑定到交换机上
    channel.queue_bind(exchange='logs3',
                       queue=queue_test3_3,
                       routing_key="#.weather"  # 指定队列通配符关键字
                       )
    
    print('ready ok!')
    
    
    def callback(ch, method, properties, body):
        print("callback recv: %r" % body)
    
    
    channel.basic_consume(queue=queue_test3_3,
                          auto_ack=True,
                          on_message_callback=callback)
    
    channel.start_consuming()
    
    
返回