在现代应用开发中,实时通信是许多系统的核心需求之一。Redis 的 PubSub(发布/订阅)机制为这种需求提供了强大的支持,允许系统组件之间进行高效的消息传递。上一篇文章中,我们深入探讨了 Redis 的 Pipeline 操作,而本篇文章将带您深入了解 Redis 的 PubSub 机制,特别是如何在 Golang 中利用 go-redis 库实现这一功能。
PubSub 是一种发布/订阅模式,其中发布者将消息发送到频道,而订阅者则通过订阅这些频道来接收消息。这种模式非常适合于实时通知、事件广播和多服务之间的通信。然而,PubSub 也有其限制——消息不能持久化,如果订阅者在消息发布时未连接,则消息会丢失。因此,它特别适用于对实时性有高要求但不需确保消息持久性的场景。
在这篇文章中,我们将详细介绍如何使用 go-redis 库来实现 Redis 的 PubSub 功能,涵盖从基本的频道订阅到高级的消息接收方法。我们还将通过一个实际的事件广播场景示例来展示这些功能的应用。无论您是在构建实时通知系统、事件广播还是聊天应用,了解和掌握 PubSub 的使用都将大大提升您的系统能力。
在《go-redis 使用指南》系列文章中,我们将详细介绍如何在 Golang 项目中使用 redis/go-redis 库与 Redis 进行交互。以下是该系列文章的全部内容:
Redis PubSub 是一种发布/订阅模式,允许发布者将消息发送到频道,订阅者可以订阅这些频道并接收消息。这种机制适用于构建实时消息系统、通知系统、事件广播等场景。
常见的使用场景:
需要注意的是,PubSub 的一个关键优势在于其实时性高,非常适合广播消息的场景。然而,它的缺点是消息不能持久化,如果消费者未连接或暂时离线,未接收到的消息将会丢失。因此,PubSub 更适用于需要即时消息传递的场景,而不适合那些需要确保消息可靠到达的场合。
Subscribe
- 订阅一个或多个频道,返回一个 PubSub 对象。PSubscribe
- 订阅一个或多个模式匹配的频道。Unsubscribe
- 取消订阅一个或多个频道。PUnsubscribe
- 取消订阅一个或多个模式匹配的频道。Publish
- 向一个频道发布消息。ReceiveMessage
- 接收订阅频道的消息。Close
- 关闭 PubSub 对象,取消所有订阅。PubSubChannels
- 查询活跃的频道。PubSubNumSub
- 查询指定频道有多少个订阅者。ReceiveTimeout
- 在指定时间内接收消息,超时则返回错误。Receive
- 接收消息或返回其他类型的信息,如 Subscription、Message、Pong 等。Channel
- 返回一个 Go channel,用于并发接收消息。ChannelWithSubscriptions
- 返回一个 Go channel,消息类型包括*Subscription
和*Message
,用于检测重新连接。package main
import (
"context"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务器地址
})
// 启动订阅者
go subscriber(ctx, rdb)
// 启动发布者
go publisher(ctx, rdb)
// 等待发布者和订阅者完成
time.Sleep(10 * time.Second)
}
// subscriber 订阅频道并接收消息
func subscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "mychannel")
defer pubsub.Close()
ch := pubsub.Channel()
fmt.Println("Subscriber is waiting for messages...")
for msg := range ch {
fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
}
}
// publisher 发布消息到频道
func publisher(ctx context.Context, rdb *redis.Client) {
messages := []string{"Hello", "World", "Redis", "PubSub", "Example"}
for _, msg := range messages {
fmt.Printf("Publishing message: %s\n", msg)
err := rdb.Publish(ctx, "mychannel", msg).Err()
if err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
time.Sleep(1 * time.Second) // 等待1秒钟再发布下一个消息
}
fmt.Println("All messages published.")
}
输出结果:
Publishing message: Hello
Subscriber is waiting for messages...
Received message from channel mychannel: Hello
Publishing message: World
Received message from channel mychannel: World
Publishing message: Redis
Received message from channel mychannel: Redis
Publishing message: PubSub
Received message from channel mychannel: PubSub
Publishing message: Example
Received message from channel mychannel: Example
All messages published.
在使用 Redis PubSub 时,你可以通过不同的方法来接收消息。这些方法包括 Receive
、ReceiveTimeout
、ReceiveMessage
,以及使用 Channel。
Receive
:用于接收所有类型的消息(*redis.Message
、*redis.Subscription
、*redis.Pong
)。ReceiveMessage
:用于接收 *redis.Message
类型的消息,专注于 PubSub 消息。ReceiveTimeout
:类似于 Receive
,但支持超时控制,适用于需要等待消息但希望避免长时间阻塞的场景。下面是对这些方法及其区别的详细介绍:
Receive
Receive
是一个通用的方法,用于接收任何类型的消息。它可以返回 *redis.Message
(消息类型)、*redis.Subscription
(订阅类型)或 *redis.Pong
(Pong 类型)。*redis.Pong
)时。msg, err := pubsub.Receive(ctx)
if err != nil {
log.Fatalf("Failed to receive message: %v", err)
}
switch v := msg.(type) {
case *redis.Message:
fmt.Printf("Received message: %s\n", v.Payload)
case *redis.Subscription:
fmt.Printf("Received subscription update: %s\n", v.Kind)
case *redis.Pong:
fmt.Println("Received Pong")
}
ReceiveMessage
ReceiveMessage
方法专门用于接收 *redis.Message
类型的消息。如果接收到其他类型的消息(如 *redis.Subscription
或 *redis.Pong
),它将返回错误。msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
log.Fatalf("Failed to receive message: %v", err)
}
fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
ReceiveTimeout
ReceiveTimeout
方法类似于 Receive
,但它支持超时控制。如果在指定的时间内未收到任何消息,将返回 context.DeadlineExceeded
错误。它可以接收 *redis.Message
、*redis.Subscription
和 *redis.Pong
类型的消息。msg, err := pubsub.ReceiveTimeout(ctx, 2*time.Second)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Println("Receive timeout")
return
}
log.Fatalf("Failed to receive message with timeout: %v", err)
}
switch v := msg.(type) {
case *redis.Message:
fmt.Printf("Received message: %s\n", v.Payload)
case *redis.Subscription:
fmt.Printf("Received subscription update: %s\n", v.Kind)
case *redis.Pong:
fmt.Println("Received Pong")
}
Channel
方法,你可以获取一个 Go channel 用于并发接收 PubSub 消息。该 channel 会持续接收 *redis.Message
和 *redis.Subscription
类型的消息。ch := pubsub.Channel()
for msg := range ch {
fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
}
根据你的具体需求,可以选择最合适的方法来处理 PubSub 消息。
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务器地址
})
// 启动普通订阅者
go subscriber(ctx, rdb)
// 启动模式匹配订阅者
go patternSubscriber(ctx, rdb)
// 启动使用 ReceiveTimeout 的订阅者
go timeoutSubscriber(ctx, rdb)
// 启动发布者
go publisher(ctx, rdb)
// 等待所有 goroutine 完成
time.Sleep(10 * time.Second)
}
// subscriber 订阅频道并接收消息
func subscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "channel1")
defer pubsub.Close()
// 使用 Channel 接收消息
ch := pubsub.Channel()
fmt.Printf("Subscriber channel1:%v is waiting for messages on channel1...\n", pubsub)
checkPubSubInfo(ctx, rdb)
for msg := range ch {
fmt.Printf("Subscriber channel1:%v received message from channel %s: %s\n", pubsub, msg.Channel, msg.Payload)
}
// 取消订阅并关闭 PubSub
if err := pubsub.Unsubscribe(ctx, "channel1"); err != nil {
log.Fatalf("Failed to Unsubscribe: %v", err)
}
}
// patternSubscriber 使用模式匹配订阅频道并接收消息
func patternSubscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.PSubscribe(ctx, "channel*")
defer pubsub.Close()
// 使用 ChannelWithSubscriptions 接收消息
ch := pubsub.ChannelWithSubscriptions()
fmt.Println("Pattern subscriber is waiting for messages on channel* ...")
checkPubSubInfo(ctx, rdb)
for msg := range ch {
switch v := msg.(type) {
case *redis.Message:
fmt.Printf("Pattern subscriber Received message from channel %s: %s\n", v.Channel, v.Payload)
case *redis.Subscription:
fmt.Printf("Pattern subscriber Received Subscription message: %s %s %d\n", v.Kind, v.Channel, v.Count)
}
}
if err := pubsub.PUnsubscribe(ctx, "channel*"); err != nil {
log.Fatalf("Failed to PUnsubscribe: %v", err)
}
}
// timeoutSubscriber 使用 ReceiveTimeout 接收消息
func timeoutSubscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "channel1")
defer pubsub.Close()
fmt.Println("Timeout subscriber is waiting for messages on channel1 with timeout...")
checkPubSubInfo(ctx, rdb)
for {
msg, err := pubsub.ReceiveTimeout(ctx, 2*time.Second)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Println("Receive timeout.")
continue
}
log.Fatalf("Failed to receive message with timeout: %v", err)
}
switch v := msg.(type) {
case *redis.Message:
fmt.Printf("Timeout subscriber Received message from channel %s: %s\n", v.Channel, v.Payload)
case *redis.Subscription:
fmt.Printf("Timeout subscriber Received Subscription message: %s %s %d\n", v.Kind, v.Channel, v.Count)
}
}
}
// publisher 发布消息到频道
func publisher(ctx context.Context, rdb *redis.Client) {
messages := []string{"Hello", "World", "Redis", "PubSub", "Example"}
for _, msg := range messages {
fmt.Printf("Publishing message: %s\n", msg)
if err := rdb.Publish(ctx, "channel1", msg).Err(); err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
time.Sleep(1 * time.Second) // 等待1秒钟再发布下一个消息
}
fmt.Println("All messages published.")
}
// checkPubSubInfo 查询活跃频道和订阅者数量
func checkPubSubInfo(ctx context.Context, rdb *redis.Client) {
// 查询活跃的频道
activeChannels := rdb.PubSubChannels(ctx, "channel*").Val()
fmt.Println("Active channels:", activeChannels)
// 查询频道的订阅者数量
numSub := rdb.PubSubNumSub(ctx, "channel1", "channel2").Val()
fmt.Println("Subscriber count:", numSub)
}
输出结果:
Publishing message: Hello
Subscriber channel1:PubSub(channel1) is waiting for messages on channel1...
Pattern subscriber is waiting for messages on channel* ...
Active channels: [channel1]
Timeout subscriber is waiting for messages on channel1 with timeout...
Subscriber count: map[channel1:1 channel2:0]
Pattern subscriber Received Subscription message: psubscribe channel* 1
Active channels: [channel1]
Subscriber count: map[channel1:2 channel2:0]
Subscriber channel1:PubSub(channel1) received message from channel channel1: Hello
Active channels: [channel1]
Subscriber count: map[channel1:2 channel2:0]
Timeout subscriber Received Subscription message: subscribe channel1 1
Publishing message: World
Pattern subscriber Received message from channel channel1: World
Timeout subscriber Received message from channel channel1: World
Subscriber channel1:PubSub(channel1) received message from channel channel1: World
Publishing message: Redis
Subscriber channel1:PubSub(channel1) received message from channel channel1: Redis
Timeout subscriber Received message from channel channel1: Redis
Pattern subscriber Received message from channel channel1: Redis
Publishing message: PubSub
Timeout subscriber Received message from channel channel1: PubSub
Pattern subscriber Received message from channel channel1: PubSub
Subscriber channel1:PubSub(channel1) received message from channel channel1: PubSub
Publishing message: Example
Pattern subscriber Received message from channel channel1: Example
Timeout subscriber Received message from channel channel1: Example
Subscriber channel1:PubSub(channel1) received message from channel channel1: Example
All messages published.
2024/08/20 23:24:05 Failed to receive message with timeout: read tcp 127.0.0.1:58755->127.0.0.1:6379: i/o timeout
exit status 1
通过本篇文章,我们详细探讨了如何在 Golang 中使用 go-redis 库实现 Redis 的 PubSub 发布/订阅机制,涵盖了从基础的订阅发布操作到高级的消息处理方法。使用这些工具,您可以构建高效、可靠的实时消息系统,满足各种复杂的业务需求。
希望这篇文章能帮助你更好地理解和使用 go-redis,点击 go-redis 使用指南 可查看更多相关教程!