RabbitMQ 提供了 四种核心的交换机类型,每种类型决定了消息如何从交换机路由到队列。以下是详细的介绍:
1. Direct Exchange(直连交换机)
- 路由规则:消息的
routing key
必须与队列的binding key
完全匹配。 - 使用场景:适用于点对点消息传递,例如将消息发送到特定的队列。
- 示例:
- 交换机绑定队列
Q1
,binding key
为error
。 - 发送消息时,
routing key
为error
的消息会被路由到Q1
。
- 交换机绑定队列
2. Fanout Exchange(扇出交换机)
- 路由规则:忽略
routing key
,将消息广播到所有绑定的队列。 - 使用场景:适用于广播消息,例如通知所有订阅者。
- 示例:
- 交换机绑定队列
Q1
、Q2
、Q3
。 - 发送的消息会被路由到
Q1
、Q2
、Q3
。
- 交换机绑定队列
3. Topic Exchange(主题交换机)
- 路由规则:根据
routing key
和binding key
的模式匹配路由消息。binding key
可以使用通配符:*
匹配一个单词。#
匹配零个或多个单词。
- 使用场景:适用于基于主题的消息路由,例如根据消息类型分发。
- 示例:
- 交换机绑定队列
Q1
,binding key
为logs.error
。 - 发送消息时,
routing key
为logs.error
的消息会被路由到Q1
。
- 交换机绑定队列
4. Headers Exchange(头交换机)
- 路由规则:根据消息的
headers
属性匹配路由消息,忽略routing key
。- 匹配规则可以是
all
(所有键值对必须匹配)或any
(任意一个键值对匹配)。
- 匹配规则可以是
- 使用场景:适用于基于消息属性的复杂路由。
- 示例:
- 交换机绑定队列
Q1
,headers
为{"type": "error", "priority": "high"}
。 - 发送消息时,
headers
包含{"type": "error", "priority": "high"}
的消息会被路由到Q1
。
- 交换机绑定队列
5. Default Exchange(默认交换机)
- 路由规则:RabbitMQ 默认的交换机,
routing key
必须与队列名称完全匹配。 - 使用场景:通常用于直接发送消息到指定队列。
- 示例:
- 发送消息时,
routing key
为Q1
的消息会被路由到队列Q1
。
- 发送消息时,
ExchangeDeclare 函数原型
在Go语言的amqp
库中,ExchangeDeclare
函数用于声明一个交换机。其函数原型如下:
func (ch *Channel) ExchangeDeclare(
name string,
kind string,
durable bool,
autoDelete bool,
internal bool,
noWait bool,
args Table
) error
下面对每个参数进行解释:
-
name string
:- 交换机的名称。在你的例子中,名称为
"logs_direct"
。这个名称用于在后续的操作中引用该交换机,例如将队列绑定到该交换机时会用到。
- 交换机的名称。在你的例子中,名称为
-
kind string
:- 交换机的类型。常见的类型有
"direct"
、"fanout"
、"topic"
和"headers"
。在你的例子中,类型为"direct"
,这种类型的交换机根据消息的路由键(routing key)将消息发送到与之匹配的队列。
- 交换机的类型。常见的类型有
-
durable bool
:- 是否持久化。如果设置为
true
,那么在RabbitMQ服务器重启后,该交换机仍然存在。如果设置为false
,服务器重启后,这个交换机将被删除。在你的例子中,设置为true
,表示交换机是持久化的。
- 是否持久化。如果设置为
-
autoDelete bool
:- 是否自动删除。如果设置为
true
,当所有与之绑定的队列都解除绑定时,该交换机将自动删除。如果设置为false
,则不会自动删除。在你的例子中,设置为false
,表示交换机不会自动删除。
- 是否自动删除。如果设置为
-
internal bool
:- 是否为内部交换机。如果设置为
true
,那么这个交换机不能被客户端直接发送消息到,只能通过其他交换机进行转发。如果设置为false
,客户端可以直接向该交换机发送消息。在你的例子中,设置为false
,表示该交换机不是内部交换机。
- 是否为内部交换机。如果设置为
-
noWait bool
:- 是否不等待服务器的响应。如果设置为
true
,客户端在声明交换机后不会等待服务器的确认响应,继续执行后续代码。这种情况下,如果声明操作失败,客户端可能无法及时得知。如果设置为false
,客户端会等待服务器的确认响应,确保声明操作成功完成。在你的例子中,设置为false
,表示等待服务器响应。
- 是否不等待服务器的响应。如果设置为
-
args Table
:- 其他参数。这是一个键值对的集合,可以用来设置一些额外的交换机属性,比如
x-delayed-message
类型交换机的延迟时间等。在你的例子中,设置为nil
,表示不使用额外的参数。Table
实际上是map[string]interface{}
的别名。
- 其他参数。这是一个键值对的集合,可以用来设置一些额外的交换机属性,比如
函数返回值error
:
如果声明交换机的操作成功,返回nil
;如果操作失败,返回一个error
对象,包含失败的详细信息。
代码示例
声明交换机
// 声明直连交换机
err = ch.ExchangeDeclare(
"logs_direct", // 交换机名称
"direct", // 交换机类型
true, // 持久化
false, // 自动删除
false, // 内部交换机
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare an exchange")
绑定队列
// 绑定队列到交换机
err = ch.QueueBind(
"Q1", // 队列名称
"error", // binding key
"logs_direct", // 交换机名称
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to bind a queue")
发送消息
// 发送消息到交换机
err = ch.Publish(
"logs_direct", // 交换机名称
"error", // routing key
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("This is an error log"),
},
)
failOnError(err, "Failed to publish a message")
总结
交换机类型 | 路由规则 | 使用场景 |
---|---|---|
Direct | routing key 完全匹配 |
点对点消息传递 |
Fanout | 忽略 routing key ,广播到所有队列 |
广播消息 |
Topic | routing key 模式匹配 |
基于主题的消息路由 |
Headers | 根据 headers 属性匹配 |
基于消息属性的复杂路由 |
Default | routing key 必须匹配队列名称 |
直接发送消息到指定队列 |