ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

高性能消息队列之nsq

2022-05-25 20:02:55  阅读:209  来源: 互联网

标签:队列 nsqlookupd -- 高性能 nsqd address nsq 节点


NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者有兴趣构建自己的客户端的话,还可以参考官方提供的协议规范。

网上有人翻译了国外的一篇文章:​ ​我们是如何使用NSQ处理7500亿消息的​

 

官网文档:​ https://nsq.io/overview/quick_start.html">​ ​https://nsq.io/overview/quick_start.html​​​

中文文档:​ http://wiki.jikexueyuan.com/project/nsq-guide/">​ ​http://wiki.jikexueyuan.com/project/nsq-guide/​

 

nsqd:基本的节点

nsqlookupd:汇总节点信息,提供查询和管理topic等服务

看看官方的原话是怎么说:
nsqlookupd是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息

简单的说nsqlookupd就是中心管理服务,它使用tcp(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。同时为客户端提供查询功能

总的来说,nsqlookupd具有以下功能或特性

唯一性,在一个Nsq服务中只有一个nsqlookupd服务。当然也可以在集群中部署多个nsqlookupd,但它们之间是没有关联的
去中心化,即使nsqlookupd崩溃,也会不影响正在运行的nsqd服务
充当nsqd和naqadmin信息交互的中间件
提供一个http查询服务,给客户端定时更新nsqd的地址目录

 


nsqadmin:管理端展示UI界面,能有一个web页面去查看和操作

官方原话:是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务

总的来说,nsqadmin具有以下功能或特性

提供一个对topic和channel统一管理的操作界面以及各种实时监控数据的展示,界面设计的很简洁,操作也很简单
展示所有message的数量,恩…装X利器
能够在后台创建topic和channel,这个应该不常用到
nsqadmin的所有功能都必须依赖于nsqlookupd,nsqadmin只是向nsqlookupd传递用户操作并展示来自nsqlookupd的数据

 

 

 

特性
默认一开始消息不是持久化的

nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘


如果将 ​​--mem-queue-size​​ 设置为 ​​0​​,所有的消息将会存储到磁盘。
是即使服务器重启也会将当时在内存中的消息持久化
消息是没有顺序的
这一点很关键,由于nsq使用内存+磁盘的模式,而且还有requeue的操作,所以发送消息的顺序和接收的顺序可能不一样
官方不推荐使用客户端发消息
官方提供相应的客户端发送消息,但是HTTP可能更方便一些
没有复制
nsq节点相对独立,节点与节点之间没有复制或者集群的关系。
没有鉴权相关模块
当前release版本的nsq没有鉴权模块,只有版本v0.2.29+高于这个的才有
几个小点
topic名称有长度限制,命名建议用下划线连接
消息体大小有限制

 

nsq优点&缺点
优点:

部署极其方便,没有任何环境依赖,直接启动就行
轻量没有过多的配置参数,只需要简单的配置就可以直接使用
性能高
消息不存在丢失的情况

缺点:

消息无顺序
节点之间没有消息复制

 

 

 

[安装]

Docker:

添加docker-compose.yml文件

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    restart: always
    ports:
      - 4160:4160
      - 4161:4161
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data --broadcast-address=192.168.148.45
    restart: always
    depends_on:
      - nsqlookupd
    volumes:
    - ./nsqd/data:/data
    ports:
      - 4150:4150
      - 4151:4151
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    restart: always
    depends_on:
      - nsqlookupd
    ports:
      - 4171:4171

参数说明:

> ​​-lookupd-tcp-address​​ 为上面nsqlookupd的IP和tcp的端口4160
> ​​-lookupd-http-address​​ 是http的端口也就是4161因为admin通过http请求来查询相关信息

  

客户端写入消息:

curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=hello word'

队列消息写入文件:

./nsq_to_file --topic=test --output-dir=/tmp/log --lookupd-http-address=127.0.0.1:4161

 

 

 

使用:

golang:

go get -u github.com/nsqio/go-nsq

生产者:

package main

import (
    "github.com/nsqio/go-nsq"
    "log"
)

func main() {
    //初始化生产者
    config := nsq.NewConfig()
    producer, err := nsq.NewProducer("127.0.0.1:4150", config)
    if err != nil{
        panic(err)
    }
    msgBody := []byte("hello")
    topName := "topic_test"
    err = producer.Publish(topName, msgBody)
    if err != nil {
        log.Fatal(err)
    }
    producer.Stop()

}

 

 

 

消费者:

创建消费者的方式有两种,一种是通过http请求来发现nsqd生产者和配置的topic,另一种是直接使用tcp请求来连接本地实例 。

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "os/signal"
    "syscall"
)

type msgHandler struct {}

func processMsg(b []byte) error{
    content := string(b)
    fmt.Printf("Msg: %v \n", content)
    return nil
}

// 必须实现该接口
func (h *msgHandler) HandleMessage(m *nsq.Message) error {
    if len(m.Body) == 0 {
        return nil
    }

    err := processMsg(m.Body)
    m.Finish()
    return err
}


func main() {
    config := nsq.NewConfig()
    consumer, err := nsq.NewConsumer("topic_test", "channel", config)
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(&msgHandler{})
    //err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
    lookupAddr := []string {
        "127.0.0.1:4161",
    }
    err = consumer.ConnectToNSQLookupds(lookupAddr)
    if err != nil {
        log.Fatal(err)
    }

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <- sigChan

    consumer.Stop()
}

 

 

 

常见问题:

1.error connecting to nsqd - dial tcp: i/o timeout

消费者链接nsqlooup超时错误

在nsq中,如果消费者通过nsqlookup去连接nsqd从而获取消息中的数据,那个他的过程是如下的:

首先启动一个nsqlookup节点,用来维护,管理,发现nsqd节点

然后启动nsqd节点,nsqd节点启动的时候需要制定nsqlookup节点的地址,以及一个自身的广播地址参数

再然后生产者向指定的nsqd节点里面写入数据

之后消费者从nsqlookup中指定的topic里面找到对应的nsqd节点,并根据之前启动nsqd节点的广播地址去连接nsqd节点

最后,消费者连上nsqd节点以后就能够获取里面的数据了

再回到我们的错误

发生错误的原因是我启动nsqd节点的时候使用的指令如下:

nsqd --lookupd-tcp-address=127.0.0.1:4160

这里没有指定参数-broadcast-address参数,这个参数的作用就是将nsqd自身的地址发送给nsqlookup节点

所以我们重启nsqd节点就好了

nsqd --lookupd-tcp-address=127.0.0.1:4160 -broadcast-address='nsqd ip'

 

 

 

 

 

 

 

 

其他:

集群搭建:

version: '3'
services:
  nsqlookupd:
    restart: always
    image: nsqio/nsq
    command: /nsqlookupd --broadcast-address=192.168.2.3
    ports:
      - "4160:4160"
      - "4161:4161"
  nsqd:
    restart: always
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=192.168.2.3:4160 --broadcast-address=192.168.2.3 --tcp-address=:4150 --http-address=:4151 
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150"
      - "4151:4151"
  nsqd2:
    restart: always
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=192.168.2.3:4160 --broadcast-address=192.168.2.3 --tcp-address=:4250 --http-address=:4251 
    depends_on:
      - nsqlookupd
    ports:
      - "4250:4250"
      - "4251:4251"
  nsqadmin:
    restart: always
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=192.168.2.3:4161
    depends_on:
      - nsqlookupd  
    ports:
      - "4171:4171"

 

标签:队列,nsqlookupd,--,高性能,nsqd,address,nsq,节点
来源: https://www.cnblogs.com/xingxia/p/nsq.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有