RabbitMQ 提供了 四种核心的交换机类型,每种类型决定了消息如何从交换机路由到队列。以下是详细的介绍:


1. Direct Exchange(直连交换机)

  • 路由规则:消息的 routing key 必须与队列的 binding key 完全匹配。
  • 使用场景:适用于点对点消息传递,例如将消息发送到特定的队列。
  • 示例
    • 交换机绑定队列 Q1binding keyerror
    • 发送消息时,routing keyerror 的消息会被路由到 Q1

2. Fanout Exchange(扇出交换机)

  • 路由规则:忽略 routing key,将消息广播到所有绑定的队列。
  • 使用场景:适用于广播消息,例如通知所有订阅者。
  • 示例
    • 交换机绑定队列 Q1Q2Q3
    • 发送的消息会被路由到 Q1Q2Q3

3. Topic Exchange(主题交换机)

  • 路由规则:根据 routing keybinding key 的模式匹配路由消息。
    • binding key 可以使用通配符:
      • * 匹配一个单词。
      • # 匹配零个或多个单词。
  • 使用场景:适用于基于主题的消息路由,例如根据消息类型分发。
  • 示例
    • 交换机绑定队列 Q1binding keylogs.error
    • 发送消息时,routing keylogs.error 的消息会被路由到 Q1

4. Headers Exchange(头交换机)

  • 路由规则:根据消息的 headers 属性匹配路由消息,忽略 routing key
    • 匹配规则可以是 all(所有键值对必须匹配)或 any(任意一个键值对匹配)。
  • 使用场景:适用于基于消息属性的复杂路由。
  • 示例
    • 交换机绑定队列 Q1headers{"type": "error", "priority": "high"}
    • 发送消息时,headers 包含 {"type": "error", "priority": "high"} 的消息会被路由到 Q1

5. Default Exchange(默认交换机)

  • 路由规则:RabbitMQ 默认的交换机,routing key 必须与队列名称完全匹配。
  • 使用场景:通常用于直接发送消息到指定队列。
  • 示例
    • 发送消息时,routing keyQ1 的消息会被路由到队列 Q1

ExchangeDeclare 函数原型

在Go语言的amqp库中,ExchangeDeclare函数用于声明一个交换机。其函数原型如下:

func (ch *Channel) ExchangeDeclare(
    name string,
    kind string,
    durable bool,
    autoDelete bool,
    internal bool,
    noWait bool,
    args Table
) error

下面对每个参数进行解释:

  1. name string

    • 交换机的名称。在你的例子中,名称为"logs_direct"。这个名称用于在后续的操作中引用该交换机,例如将队列绑定到该交换机时会用到。
  2. kind string

    • 交换机的类型。常见的类型有"direct""fanout""topic""headers"。在你的例子中,类型为"direct",这种类型的交换机根据消息的路由键(routing key)将消息发送到与之匹配的队列。
  3. durable bool

    • 是否持久化。如果设置为true,那么在RabbitMQ服务器重启后,该交换机仍然存在。如果设置为false,服务器重启后,这个交换机将被删除。在你的例子中,设置为true,表示交换机是持久化的。
  4. autoDelete bool

    • 是否自动删除。如果设置为true,当所有与之绑定的队列都解除绑定时,该交换机将自动删除。如果设置为false,则不会自动删除。在你的例子中,设置为false,表示交换机不会自动删除。
  5. internal bool

    • 是否为内部交换机。如果设置为true,那么这个交换机不能被客户端直接发送消息到,只能通过其他交换机进行转发。如果设置为false,客户端可以直接向该交换机发送消息。在你的例子中,设置为false,表示该交换机不是内部交换机。
  6. noWait bool

    • 是否不等待服务器的响应。如果设置为true,客户端在声明交换机后不会等待服务器的确认响应,继续执行后续代码。这种情况下,如果声明操作失败,客户端可能无法及时得知。如果设置为false,客户端会等待服务器的确认响应,确保声明操作成功完成。在你的例子中,设置为false,表示等待服务器响应。
  7. args Table

    • 其他参数。这是一个键值对的集合,可以用来设置一些额外的交换机属性,比如x-delayed-message类型交换机的延迟时间等。在你的例子中,设置为nil,表示不使用额外的参数。Table实际上是map[string]interface{}的别名。

函数返回值error
如果声明交换机的操作成功,返回nil;如果操作失败,返回一个error对象,包含失败的详细信息。


代码示例

声明交换机

// 声明直连交换机
err = ch.ExchangeDeclare(
    "logs_direct", // 交换机名称
    "direct",      // 交换机类型
    true,          // 持久化
    false,         // 自动删除
    false,         // 内部交换机
    false,         // 无等待
    nil,           // 参数
)
failOnError(err, "Failed to declare an exchange")

绑定队列

// 绑定队列到交换机
err = ch.QueueBind(
    "Q1",          // 队列名称
    "error",       // binding key
    "logs_direct", // 交换机名称
    false,         // 无等待
    nil,           // 参数
)
failOnError(err, "Failed to bind a queue")

发送消息

// 发送消息到交换机
err = ch.Publish(
    "logs_direct", // 交换机名称
    "error",      // routing key
    false,        // 强制
    false,        // 立即
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("This is an error log"),
    },
)
failOnError(err, "Failed to publish a message")

总结

交换机类型 路由规则 使用场景
Direct routing key 完全匹配 点对点消息传递
Fanout 忽略 routing key,广播到所有队列 广播消息
Topic routing key 模式匹配 基于主题的消息路由
Headers 根据 headers 属性匹配 基于消息属性的复杂路由
Default routing key 必须匹配队列名称 直接发送消息到指定队列