Fork me on GitHub

消息队列之《02.NSQ实践篇》

这是消息队列NSQ的姊妹篇,不足之处请指出!

安装NSQ

方法一

首先搭建golang开发环境,这里就不详细讲了

注意一下,搭建golang环境时最好将bin目录添加到系统环境(path)里,省去了每次都要去bin目录里执行的麻烦

  • 安装包管理器godep
    1
    go get github.com/tools/godep

执行完后检查godep是否已经安装在bin目录下,一般都会自动安装,如果没有,用go install手动安装下

  • 安装依赖包assert

    1
    go get github.com/bmizerany/assert
  • 安装NSQ

    1
    godep get github.com/bitly/nsq/...

如果安装成功,bin目录里就会出现一大堆nsq_…开头的可执行文件

PS:如果安装失败,采用方法二安装

方法二

二进制包安装, 直接去 https://github.com/nsqio/nsq/releases 下载编译好的执行文件,将里面的可执行文件复制到bin目录下就可以使用了。

1
2
3
wget https://github.com/nsqio/nsq/releases/download/v1.0.0-compat/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
tar zxf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
mv nsq-1.0.0-compat.linux-amd64.go1.8 /usr/local/nsqd

运行NSQ

运行单机nsqd服务

nsqd是一个独立的服务,启动一个nsqd就可以完成message的收发,启动一个单机的nsqd很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/usr/local/nsqd/bin/nsqd

#当然,我习惯使用supervisor来管理启动的程序:
[root@tanshuai supervisor]# cat nsqd.conf
[program:nsqd]
command=/usr/local/nsqd/bin/nsqd
directory=/usr/local/nsqd
autostart=true
autorestart=true
startsecs=10
startretries=3
user=root
log_stdout=true
log_stderr=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/nsqd.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=1
[root@tanshuai supervisor]# supervisorctl reread
nsqd: available
[root@tanshuai supervisor]# supervisorctl add nsqd
nsqd: added process group
[root@tanshuai supervisor]# supervisorctl status
nsqd RUNNING pid 13703, uptime 0:00:11
[root@tanshuai supervisor]# cat /var/log/supervisor/nsqd.log
[nsqd] 2018/08/15 16:29:01.661329 nsqd v1.0.0-compat (built w/go1.8)
[nsqd] 2018/08/15 16:29:01.661362 ID: 831
[nsqd] 2018/08/15 16:29:01.661378 NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2018/08/15 16:29:01.663372 TCP: listening on [::]:4150
[nsqd] 2018/08/15 16:29:01.663411 HTTP: listening on [::]:4151

客户端可以使用http,也可以使用tcp来发送message

运行NSQ服务集群

  • 首先启动nsqlookupd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[root@tanshuai supervisor]# /usr/local/nsqd/bin/nsqlookupd --help
Usage of nsqlookupd:
-broadcast-address string
address of this lookupd node, (default to the OS hostname) (default "tanshuai")
-config string
path to config file
-http-address string
<addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4161")
-inactive-producer-timeout duration
duration of time a producer will remain in the active list since its last ping (default 5m0s)
-log-prefix string
log message prefix (default "[nsqlookupd] ")
-tcp-address string
<addr>:<port> to listen on for TCP clients (default "0.0.0.0:4160")
-tombstone-lifetime duration
duration of time a producer will remain tombstoned if registration remains (default 45s)
-verbose
enable verbose logging
-version
print version string
[root@tanshuai supervisor]# /usr/local/nsqd/bin/nsqlookupd -broadcast-address=0.0.0.0 -http-address=0.0.0.0:4161 -tcp-address=0.0.0.0:4160
[nsqlookupd] 2018/08/15 16:43:04.879524 nsqlookupd v1.0.0-compat (built w/go1.8)
[nsqlookupd] 2018/08/15 16:43:04.879884 TCP: listening on [::]:4160
[nsqlookupd] 2018/08/15 16:43:04.879925 HTTP: listening on [::]:4161
^C[nsqlookupd] 2018/08/15 16:43:09.132779 HTTP: closing [::]:4161
[nsqlookupd] 2018/08/15 16:43:09.132802 TCP: closing [::]:4160
[root@tanshuai supervisor]#

# 使用supervisord启动
[root@tanshuai supervisor]# cat nsqlookupd.conf
[program:nsqlookupd]
command=/usr/local/nsqd/bin/nsqlookupd -broadcast-address=0.0.0.0 -http-address=0.0.0.0:4161 -tcp-address=0.0.0.0:4160
directory=/usr/local/nsqd
autostart=true
autorestart=true
startsecs=10
startretries=3
user=root
log_stdout=true
log_stderr=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/nsqlookupd.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=1

[root@tanshuai supervisor]# cat /var/log/supervisor/nsqlookupd.log
[nsqlookupd] 2018/08/15 16:46:46.534278 nsqlookupd v1.0.0-compat (built w/go1.8)
[nsqlookupd] 2018/08/15 16:46:46.534869 TCP: listening on [::]:4160
[nsqlookupd] 2018/08/15 16:46:46.534902 HTTP: listening on [::]:4161
  • 启动nsqd,并接入刚刚启动的nsqlookupd。为了方便测试,启动两个nsqd。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
[root@tanshuai supervisor]# cat nsqd01.conf 
[program:nsqd01]
command=/usr/local/nsqd/bin/nsqd -lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4150 -http-address=0.0.0.0:4151
directory=/usr/local/nsqd
autostart=true
autorestart=true
startsecs=10
startretries=3
user=root
log_stdout=true
log_stderr=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/nsqd01.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=1

[root@tanshuai supervisor]# cat nsqd02.conf
[program:nsqd02]
command=/usr/local/nsqd/bin/nsqd -lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4152 -http-address=0.0.0.0:4153
# directory=/usr/local/nsqd #supervisor不允许相同的进程使用相同的directory,所以禁用掉,没影响。
autostart=true
autorestart=true
startsecs=10
startretries=3
user=root
log_stdout=true
log_stderr=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/nsqd02.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=1

[root@tanshuai supervisor]# cat /var/log/supervisor/nsqd01.log
[nsqd] 2018/08/15 16:52:43.646357 nsqd v1.0.0-compat (built w/go1.8)
[nsqd] 2018/08/15 16:52:43.646391 ID: 831
[nsqd] 2018/08/15 16:52:43.646459 NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2018/08/15 16:52:43.647727 TCP: listening on [::]:4150
[nsqd] 2018/08/15 16:52:43.647765 HTTP: listening on [::]:4151
[nsqd] 2018/08/15 16:52:43.647833 LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2018/08/15 16:52:43.647836 LOOKUP connecting to 127.0.0.1:4160
[nsqd] 2018/08/15 16:52:43.649061 LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.0.0-compat BroadcastAddress:0.0.0.0}
[nsqd] 2018/08/15 16:52:58.647932 LOOKUPD(127.0.0.1:4160): sending heartbeat
[nsqd] 2018/08/15 16:53:13.647894 LOOKUPD(127.0.0.1:4160): sending heartbeat
[nsqd] 2018/08/15 16:53:28.647928 LOOKUPD(127.0.0.1:4160): sending heartbeat
  • 启动nsqadmin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@tanshuai supervisor]# cat nsqadmin.conf 
[program:nsqadmin]
command=/usr/local/nsqd/bin/nsqadmin -lookupd-http-address=127.0.0.1:4161
directory=/usr/local/nsqd
autostart=true
autorestart=true
startsecs=10
startretries=3
user=root
log_stdout=true
log_stderr=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/nsqadmin.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=1

[root@tanshuai supervisor]# cat /var/log/supervisor/nsqadmin.log
[nsqadmin] 2018/08/15 17:04:23.622502 nsqadmin v1.0.0-compat (built w/go1.8)
[nsqadmin] 2018/08/15 17:04:23.623083 HTTP: listening on [::]:4171
  • 所有启动的进程
1
2
3
4
5
[root@tanshuai supervisor]# supervisorctl status
nsqadmin RUNNING pid 15252, uptime 0:01:05
nsqd01 RUNNING pid 14720, uptime 0:12:45
nsqd02 RUNNING pid 14787, uptime 0:11:29
nsqlookupd RUNNING pid 14456, uptime 0:18:42

访问nsqadmin:http://127.0.0.1:4171/

使用NSQ

1、创建一个topic并发布一条消息
接口:POST /pub

1
curl -d "hello world" http://127.0.0.1:4151/pub?topic=name

2、发送多条消息
接口:POST /mpub

1
curl -d "<message01>\n<message02>\n<message03>" http://127.0.0.1:4151/mpub?topic=name

3、创建一个topic
接口:POST /topic/create

1
curl -X POST http://127.0.0.1:4151/topic/create?topic=name

4、删除一个topic
接口:POST /topic/delete

1
curl -X POST http://127.0.0.1:4151/topic/delete?topic=test

5、为现有的topic创建一个channel
接口:POST /channel/create

1
curl -X POST "http://127.0.0.1:4151/channel/create?topic=name&channel=name"

6、删除topic里的一个channel
接口:POST /channel/delete

1
curl -X POST "http://127.0.0.1:4151/channel/delete?topic=name&channel=name"

7、清空topic
接口:POST /

1
curl -X POST http://127.0.0.1:4151/topic/empty?topic=name

8、清空channel
接口:POST /channel/empty

1
curl -X POST http://127.0.0.1:4151/channel/empty?topic=name&channel=name

9、统计数据
接口:POST /stats

1
curl http://127.0.0.1:4151/stats

10、集群信息
接口:POST /info

1
curl http://127.0.0.1:4151/info

以上为常用接口,还有其他接口看官网即可。

下一篇将介绍python如何使用NSQD队列进行message收发。

-------------本文结束感谢您的阅读-------------