ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

go 中的grpc的stream 使用

2021-07-22 11:35:53  阅读:208  来源: 互联网

标签:err stream grpc pro go 推送 服务端


前言

之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。

 

stream的种类:

客户端推送 服务端 rpc GetStream (StreamReqData) returns (stream StreamResData){}
服务端推送 客户端 rpc PutStream (stream StreamReqData) returns (StreamResData){}
客户端与 服务端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}

  

其实这个流 已经 基本退化成 tcp了,grpc 底层为我们 分包了,所以真的很方便。


protobuf的定义:

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
package pro;

//声明grpc服务
service Greeter {
   /*
   以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
   */
  rpc GetStream (StreamReqData) returns (stream StreamResData){}
  rpc PutStream (stream StreamReqData) returns (StreamResData){}
  rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
}


//stream请求结构
message StreamReqData {
   string data = 1;
}
//stream返回结构
message StreamResData {
   string data = 1;
}

我们在 protobuf 里面 定义 要提供的服务,如果 你想把哪个数据 源源不断的 推送 就在前面加个stream 就好了,定义好记得编译。

服务端的实现:

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpc/pro"
    "log"
    "net"
    "sync"
    "time"
)

const PORT  = ":50051"

type server struct {
}

//服务端 单向流
func (s *server)GetStream(req *pro.StreamReqData, res pro.Greeter_GetStreamServer) error{
    i:= 0
    for{
        i++
        res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
        time.Sleep(1*time.Second)
        if i >10 {
            break
        }
    }
    return nil
}

//客户端 单向流
func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error {

    for {
        if tem, err := cliStr.Recv(); err == nil {
            log.Println(tem)
        } else {
            log.Println("break, err :", err)
            break
        }
    }

    return nil
}

//客户端服务端 双向流
func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error {

    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
        for {
            data, _ := allStr.Recv()
            log.Println(data)
        }
        wg.Done()
    }()

    go func() {
        for {
            allStr.Send(&pro.StreamResData{Data:"ssss"})
            time.Sleep(time.Second)
        }
        wg.Done()
    }()

    wg.Wait()
    return nil
}

func main(){
    //监听端口
    lis,err := net.Listen("tcp",PORT)
    if err != nil{
        return
    }
    //创建一个grpc 服务器
    s := grpc.NewServer()
    //注册事件
    pro.RegisterGreeterServer(s,&server{})
    //处理链接
    s.Serve(lis)
}

知识点:

  1. 每个函数都对应着 完成了 protobuf 里面的 定义。
  2. 每个函数 形参都有对应的 推送 或者 接收 对象,我们只要 不断循环 Recv(),或者 Send() 就能接收或者推送了!
  3. 当return出函数,就说明此次 推送 或者 接收 结束了,client 会 对应的 收到消息!

客户端调用:

package main

import (
    "google.golang.org/grpc"

    "grpc/pro"
    "log"
    "context"
    "time"
    _ "google.golang.org/grpc/balancer/grpclb"
)

const (
    ADDRESS = "localhost:50051"
)


func main(){
    //通过grpc 库 建立一个连接
    conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())
    if err != nil{
        return
    }
    defer conn.Close()
    //通过刚刚的连接 生成一个client对象。
    c := pro.NewGreeterClient(conn)
    //调用服务端推送流
    reqstreamData := &pro.StreamReqData{Data:"aaa"}
    res,_ := c.GetStream(context.Background(),reqstreamData)
    for {
        aa,err := res.Recv()
        if err != nil {
            log.Println(err)
            break
        }
        log.Println(aa)
    }
    //客户端 推送 流
    putRes, _ := c.PutStream(context.Background())
    i := 1
    for {
        i++
        putRes.Send(&pro.StreamReqData{Data:"ss"})
        time.Sleep(time.Second)
        if i > 10 {
            break
        }
    }
    //服务端 客户端 双向流
    allStr,_ := c.AllStream(context.Background())
    go func() {
        for {
            data,_ := allStr.Recv()
            log.Println(data)
        }
    }()

    go func() {
        for {
            allStr.Send(&pro.StreamReqData{Data:"ssss"})
            time.Sleep(time.Second)
        }
    }()

    select {
    }

}

 

client 调用 流的函数, 就会 返回一个 流对象,只要 不断地 对它进行读取或者写入,对应方就能收到。

总结:

grpc 的 stream 和 go的协程 配合 简直完美。通过流 我们 可以更加 灵活的 实现自己的业务。如 订阅,大数据传输等。



作者:xyt001
链接:https://www.jianshu.com/p/85e9cfa16247
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

 

 

 

标签:err,stream,grpc,pro,go,推送,服务端
来源: https://www.cnblogs.com/pebblecome/p/15043292.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有