操作NSQ
# 什么是 NSQ
NSQ 就是一个简单的队列,是用 Golang 开发,它类似于 kafka、MQ。
下面附一张对比图(图片来自 golang2017 开发者大会):
从上面图来看,NSQ 完爆其他两种中间件(虽然我觉得有点吹牛逼的感觉)。
# NSQ 的优势
NSQ 的优势主要有以下几点:
- 支持拓扑的高可用性和避免单点故障(SPOFs)。
- 更强的消息递交保证
- 为单次处理绑定着内存的足迹(通过把一些持久话的消息放入磁盘)
- 对生产者和消费者的配置进行极大的简化
- 提供直接的升级路径
- 提升效率
# NSQ 的组成
NSQ 由三个组件组成,它们是:
- nsqd 用于接收消息,排队消息,投递消息,我们的客户端(生产者,消费者)主要和它打交道
- nsqlookupd 管理 nsqd,nsqadmin 拓扑信息。 我们的客户端(消费者)询问此组件来发现 nsqd 等
- nsqadmin web UI 查询各种 NSQ 组件的信息,消息信息
# nsqlookupd
nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息
简单的说 nsqlookupd 就是中心管理服务,它使用 tcp(默认端口 4160)管理 nsqd 服务,使用 http(默认端口 4161)管理 nsqadmin 服务。同时为客户端提供查询功能
总的来说,nsqlookupd 具有以下功能或特性
- 唯一性,在一个 Nsq 服务中只有一个 nsqlookupd 服务。当然也可以在集群中部署多个 nsqlookupd,但它们之间是没有关联的
- 去中心化,即使 nsqlookupd 崩溃,也会不影响正在运行的 nsqd 服务
- 充当 nsqd 和 naqadmin 信息交互的中间件
- 提供一个 http 查询服务,给客户端定时更新 nsqd 的地址目录
# nsqadmin
nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务
总的来说,nsqadmin 具有以下功能或特性
- 提供一个对 topic 和 channel 统一管理的操作界面以及各种实时监控数据的展示,界面设计的很简洁,操作也很简单
- 展示所有 message 的数量,恩....装 X 利器
- 能够在后台创建 topic 和 channel,这个应该不常用到
- nsqadmin 的所有功能都必须依赖于 nsqlookupd,nsqadmin 只是向 nsqlookupd 传递用户操作并展示来自 nsqlookupd 的数据
nsqadmin 默认的访问地址是http://127.0.0.1:4171/ (opens new window)
# nsqd
nsqd 是一个守护进程,负责接收,排队,投递消息给客户端
简单的说,真正干活的就是这个服务,它主要负责 message 的收发,队列的维护。nsqd 会默认监听一个 tcp 端口(4150)和一个 http 端口(4151)以及一个可选的 https 端口
总的来说,nsqd 具有以下功能或特性
- 对订阅了同一个 topic,同一个 channel 的消费者使用负载均衡策略(不是轮询)
- 只要 channel 存在,即使没有该 channel 的消费者,也会将生产者的 message 缓存到队列中(注意消息的过期处理)
- 保证队列中的 message 至少会被消费一次,即使 nsqd 退出,也会将队列中的消息暂存磁盘上(结束进程等意外情况除外)
- 限定内存占用,能够配置 nsqd 中每个 channel 队列在内存中缓存的 message 数量,一旦超出,message 将被缓存到磁盘中
- topic,channel 一旦建立,将会一直存在,要及时在管理台或者用代码清除无效的 topic 和 channel,避免资源的浪费
每个 nsqd 实例旨在一次处理多个数据流。这些数据流称为“topics”
,一个topic
具有 1 个或多个“channels”
。每个channel
都会收到topic
所有消息的副本,实际上下游的服务是通过对应的channel
来消费topic
消息。
topic
和channel
不是预先配置的。topic
在首次使用时创建,方法是将其发布到指定topic
,或者订阅指定topic
上的channel
。channel
是通过订阅指定的channel
在第一次使用时创建的。
topic
和channel
都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel
的积压(同样适用于topic
级别)。
channel
可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:
# NSQ 的使用步骤
NSQ 的使用步骤如下:
- 启动 nsqlookupd 组件
- 启动 nsqd 并向 nsqlookupd 注册
- 启动 nsqadmin 并向 nsqlookupd 注册
- 生产者推送一个 message 到其中一个 nsqd,并将此消息设置到一个 topic 里面
- 消费者向 nsqlookupd 询问指定 topic 的消息,nsqlookupd 把有此 topic 的 nsqd 地址给到消费者
- 消费者建立 channel 和 topic 之间的订阅关系,通过 channel 向 nsqd 获取指定 topic 里面的消息
- nsqd 向所有订阅该 topic 的 channel 推送 message, 然后其中一个消费者可以通过其中一个 channel 获取该 topic 的 message
如图:
# 搭建服务端 NSQ
下载二进制包 (opens new window),按着上面的顺序启动服务。
比如我这里在 Linux 上搭建:
# 下载二进制包
wget https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
# 解压包
nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
# 启动nsqlookud
./nsqlookupd
# 启动nsqd,并接入刚刚启动的nsqlookud
./nsqd --lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4152 -http-address=0.0.0.0:4153
# 启动nqsadmin
./nsqadmin --lookupd-http-address=127.0.0.1:4161
2
3
4
5
6
7
8
9
10
11
12
13
14
更多配置项可以使用 二进制文件 --help
浏览器输入:http://IP:4171 访问 WEB UI,如下:
# 客户端使用 NSQ
官方提供了 Go 语言版的客户端:go-nsq (opens new window),更多客户端支持请查看CLIENT LIBRARIES (opens new window)。
# 安装
go get -u github.com/nsqio/go-nsq
# 生成者
package main
import (
"bufio"
"fmt"
"os"
"strings"
"github.com/nsqio/go-nsq"
)
// 定义一个全局的producer
var producer *nsq.Producer
// 初始化生成者
func initProducer(nsqAddr string) (err error) {
producer, err = nsq.NewProducer(nsqAddr, nsq.NewConfig())
if err != nil {
fmt.Println("创建producer失败. err:", err)
return
}
return
}
func main() {
//初始化生成者
nsqAddr := "122.51.79.172:4152"
err := initProducer(nsqAddr)
if err != nil {
fmt.Println("producer初始化失败. err:", err)
return
}
reader := bufio.NewReader(os.Stdin)
for {
data, err := reader.ReadString('\n')
if err != nil {
fmt.Printf("read string from stdin failed, err:%v\n", err)
continue
}
data = strings.TrimSpace(data)
if strings.ToUpper(data) == "Q" { // 输入Q退出
break
}
// 向 'topic_demo' publish 数据
err = producer.Publish("topic_demo", []byte(data))
if err != nil {
fmt.Printf("publish msg to nsq failed, err:%v\n", err)
continue
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
go build
运行二进制文件,输入如下:
.\producer.exe
11
2020/03/26 15:13:08 INF 1 (192.168.1.12:4152) connecting to nsqd
22
33
44
2
3
4
5
6
然后在网页上看到创建的 topic,如下:
点击 topic 可以看到里面的消息信息:
在 Nodes 界面可以看到当前接入lookupd
的nsqd
节点:
在 Counter页面显示了处理的消息数量,因为我们没有接入消费者,所以处理的消息数量为 0:
在 Lookup 页面支持创建 topic 和 channel:
# 消费者
// nsq_consumer/main.go
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/nsqio/go-nsq"
)
// NSQ Consumer Demo
// MyHandler 是一个消费者类型
type MyHandler struct {
Title string
}
// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}
// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err:%v\n", err)
return
}
consumer := &MyHandler{
Title: "测试",
}
c.AddHandler(consumer)
if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
// if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
return err
}
return nil
}
func main() {
err := initConsumer("topic_demo", "first", "122.51.79.172:4152")
if err != nil {
fmt.Printf("init consumer failed, err:%v\n", err)
return
}
c := make(chan os.Signal) // 定义一个信号的通道
signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
<-c // 阻塞
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59