ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

go使用rabbitmq

2022-06-09 18:03:16  阅读:146  来源: 互联网

标签:log err time rabbitmq test mq 使用 go Fatal


rabbitmq是一款消息中间件,采用erlang语言编写。基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。消息的消费者被动拉取(rabbitMQ 推送消息给消费者)。

基本概念包括:vhost,producer,exchange,queue(设定routingkey来绑定exchange和queue),consumer。

  vhost是rabbitmq虚拟的“服务器”,比如“/”,“/user”。是单独的,互不干扰。   topic模式下,定义routingkey规则:  发送到topic类型交换机的消息的routing_key不能随便设置–它必须是多个单词组成,用点分割。单词可以是任意的,但它们通常指定连接到该消息的某些功能。例如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由关键字可包含任意多的单词,但最高限制是255字节。

     绑定的关键字必须有相同的格式。topic交换机和direct交换的逻辑是相似的–拥有特定的路由关键字的消息将被发送到所有匹配关键字的队列。然而,绑定关键字有两个特殊的情况:
(1)* (星号) 可以代替一个完整的单词.
(2)# (井号) 可以代替零个或多个单词.

  以下是go使用rabbitmq基本操作:  
package rabbitmq

import (
    "errors"
    "github.com/streadway/amqp"
    "log"
    "strings"
)

//定义全局mqmap
var RabbitMqMap = make(map[string]*rabbitMQ)

// RabbitMQ 用于管理和维护rabbitmq的对象
type rabbitMQ struct {
    //wg           sync.WaitGroup
    channel *amqp.Channel
    mqConn  *amqp.Connection
}

//连接mq,config中可配置channel连接的容量和心跳时长
//默认为:
/**
maxChannelMax = (2 << 15) - 1
defaultHeartbeat         = 10 * time.Second
defaultConnectionTimeout = 30 * time.Second
defaultProduct           = "https://github.com/streadway/amqp"
defaultVersion           = "β"
// Safer default that makes channel leaks a lot easier to spot
// before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
defaultChannelMax = (2 << 10) - 1
*/
func (mq *rabbitMQ) connToMq(url string, config *amqp.Config) (rabbitMq *rabbitMQ, err error) {
    mq.mqConn, err = amqp.DialConfig(url, *config)
    if err != nil {
        return
    }
    mq.channel, err = mq.mqConn.Channel()
    mq.mqConn.Channel()
    if err != nil {
        return
    }
    return mq, nil
}

//直接初始化队列
func (mq *rabbitMQ) PrepareQueue(queueName string) (queue amqp.Queue, err error) {
    if queueName == "" {
        return queue, errors.New("queueName为空")
    }
    queue, err = mq.channel.QueueDeclare(
        queueName, //name
        true,      //durable,是否持久化,默认持久需要根据情况选择
        false,     //delete when unused
        false,     //exclusive
        false,     //no-wait
        nil,       //arguments
    )
    return
}

// prepareExchange 准备rabbitmq的Exchange
func (mq *rabbitMQ) PrepareExchange(exchangeName, exchangeType string) error {
    if exchangeName == "" {
        return errors.New("exchangeName为空")
    }
    err := mq.channel.ExchangeDeclare(
        exchangeName, // exchange
        exchangeType, // type
        true,         // durable 是否持久化,默认持久需要根据情况选择
        false,        // autoDelete
        false,        // internal
        false,        // noWait
        nil,          // args
    )

    if nil != err {
        return err
    }

    return nil
}

//通过exchange发送消息
func (mq *rabbitMQ) ExchangeSend(exchangeName, routingKey string, publishing amqp.Publishing) error {

    return mq.channel.Publish(
        exchangeName, //exchangeName
        routingKey,   //routing key
        true,         //mandatory
        false,        //immediate
        publishing,
    )
}

//通过队列发送消息
func (mq *rabbitMQ) QueueSend(queueName string, publishing amqp.Publishing) error {

    return mq.channel.Publish(
        "",        //exchangeName
        queueName, //queue name
        false,     //mandatory
        false,     //immediate
        publishing,
    )

}

//消费队列,内部方法会阻塞,使用时需要单独启用一个线程处理,常驻后台执行
func (mq *rabbitMQ) QueueConsume(queueName, consumer string) (delivery <-chan amqp.Delivery, err error) {
    err = mq.channel.Qos(1, 0, true)
    if err != nil {
        log.Fatal("Queue Consume: ", err.Error())
        return nil, err
    }
    //后期可调整参数
    delivery, err = mq.channel.Consume(
        queueName, // queue
        consumer, // consumer
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    if err != nil {
        log.Fatal("Queue Consume: ", err.Error())
        return nil, err
    }
    return delivery, nil
}

//队列绑定exchange
func (mq *rabbitMQ) QueueBindExchange(queueName, routingKey, exchangeName string) error {
    return mq.channel.QueueBind(queueName, routingKey, exchangeName, false, nil)
}

//关闭连接
func Close() {
    for k := range RabbitMqMap {
        RabbitMqMap[k].channel.Close()
        RabbitMqMap[k].mqConn.Close()
    }
}

func InitRabbitMq() {
    var (
        mq     rabbitMQ
        config amqp.Config
        err    error
    )
   //此处可定义多个配置,可调整
        dsn := "amqp://guest:guest@localhost:5672/"
        config.Vhost = "/"
        RabbitMqMap["mq"], err = mq.connToMq(dsn, &config)if err != nil {
            log.Fatal("[rabbit-mq] connect to rabbit-mq error:" + err.Error())
        } else {
            log.Println("[rabbit-mq] connect success")
        }

}

//获取连接
func GetRabbitConn(name ...string) *rabbitMQ {
    rabbitName := ""
    if len(name) > 0 {
        rabbitName = strings.ToLower(name[0])
    }
    if rabbitName == "" {
        return RabbitMqMap["mq"]
    }
    return RabbitMqMap[rabbitName]
}

 

producer,生产消息:

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "strconv"
    "test01/mq/rabbitmq"
    "time"
)


func product() {
    rabbitmq.InitRabbitMq()
    mq := rabbitmq.RabbitMqMap["mq"]
// 可以定义基本的exchange类型,topic(模糊匹配),direct err := mq.PrepareExchange("topic_exchange", "topic") if err != nil { log.Fatal("准备交换机出错", err) } queue, err := mq.PrepareQueue("test-qq") if err != nil { fmt.Println("queue初始化失败", err.Error()) log.Fatal(queue.Name) } queue2, err := mq.PrepareQueue("test-qq2") if err != nil { fmt.Println("queue2初始化失败", err.Error()) log.Fatal(queue2.Name) } queue3, err := mq.PrepareQueue("test-qq3") if err != nil { fmt.Println("queue3初始化失败", err.Error()) log.Fatal(queue3.Name) } if err := mq.QueueBindExchange("test-qq", "wodekey.log.info", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错", err) } if err := mq.QueueBindExchange("test-qq2", "wodekey.log.debug", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } if err := mq.QueueBindExchange("test-qq3", "wodekey.log.error", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } for i := 0; i < 1000; i++ { //mq.QueueSend("test-qq", amqp.Publishing{ // AppId: "", // ContentType: "application/json", // MessageId: "你好", // Body: []byte("这是我的消息:" + strconv.Itoa(i)), //}) //fmt.Println("发送成功: test-qq ", i) //time.Sleep(2 * time.Second) //mq.QueueSend("test-qq2", amqp.Publishing{ // AppId: "", // ContentType: "application/json", // MessageId: "你好啊", // Body: []byte("这是我的消息2:" + strconv.Itoa(i)), //}) mq.ExchangeSend("topic_exchange", "wodekey.random", amqp.Publishing{ ContentType: "application/json", Body: []byte("这是我的消息哦" + strconv.Itoa(i)), }) fmt.Println("发送成功: exchange ", i) time.Sleep(1 * time.Second) } } func main() { product() time.Sleep(1000000 * time.Second) }

 

consumer,启动三个线程来消费:

package main

import (
    "fmt"
    "log"
    "test01/mq/rabbitmq"
    "time"
)

func consumer() {
    rabbitmq.InitRabbitMq()
    mq := rabbitmq.RabbitMqMap["mq2"]
    if err := mq.QueueBindExchange("test-qq", "*.log.info", "topic_exchange"); err != nil {
        log.Fatal("队列绑定交换机出错", err)
    }
    if err := mq.QueueBindExchange("test-qq2", "*.log.debug", "topic_exchange"); err != nil {
        log.Fatal("队列绑定交换机出错2", err)
    }
    if err := mq.QueueBindExchange("test-qq3", "*.log.error", "topic_exchange"); err != nil {
        log.Fatal("队列绑定交换机出错2", err)
    }
    go func() {
        resu, err := mq.QueueConsume("test-qq", "consumer1")
        if err != nil {
            fmt.Println("消费错误", err.Error())
            log.Fatal("test-qq消费错误")
        }
        for d := range resu {
            fmt.Println(d.ConsumerTag + " test-qq消费成功:", string(d.Body))

            d.Ack(true) //需手动应答
            time.Sleep(2 * time.Second)
        }
    }()

    go func() {
        resu, err := mq.QueueConsume("test-qq2", "consumer002")
        if err != nil {
            fmt.Println("消费错误", err.Error())
            log.Fatal("test-qq2消费错误")
        }
        for d := range resu {
            fmt.Println(d.ConsumerTag + " test-qq2消费成功:", string(d.Body))
            d.Ack(true) //需手动应答
            time.Sleep(2 * time.Second)
        }
    }()

    go func() {
        resu, err := mq.QueueConsume("test-qq3", "consumer003")
        if err != nil {
            fmt.Println("消费错误", err.Error())
            log.Fatal("test-qq3消费错误")
        }
        for d := range resu {
            fmt.Println(d.ConsumerTag + " test-qq3消费成功:", string(d.Body))
            d.Ack(true) //需手动应答
            time.Sleep(2 * time.Second)
        }
    }()
}

func main() {
    consumer()

    time.Sleep(10000000* time.Second)
}

 

 


标签:log,err,time,rabbitmq,test,mq,使用,go,Fatal
来源: https://www.cnblogs.com/ahgo/p/16360282.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有