ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

gRPC

2021-09-10 22:03:02  阅读:146  来源: 互联网

标签:return err gRPC fmt pb grpc func


  • 安装
  • 流模式

安装

方法1:
go get -u -v google.golang.org/grpc
方法2:

git clone https://github.com/grpc/grpc-go.git $GOPATH/pkg/mod/google.golang.org/grpc

git clone https://github.com/golang/net.git $GOPATH/pkg/mod/google.golang.org/x/net

git clone https://github.com/golang/text.git $GOPATH/pkg/mod/google.golang.org/x/text

git clone https://github.com/google/go-genproto.git $GOPATH/pkg/mod/google.golang.org/genproto

cd $GOPATH/pkg/mod
go install google.golang.org/grpc

stream模式

流模式可以源源不断的推送数据,很适合传输一些大数据,或服务端和客户端长时间数据交互。

简单模式(Simple RPC)

客户端发起一次请求,服务端响应一次数据

服务端
//protobuf定义的服务函数
type OutMsg struct{}
func (om *OutMsg) SayHello(ctx context.Context, p *pb.Person) (*pb.PhoneNumber, error) {
	p.Age = 18
	p.Name = "wang"
	var pn pb.PhoneNumber = pb.PhoneNumber{Number: "11", Type: 2}
	fmt.Println("rotem call")
	return &pn, nil
}
/**
 * @func: CreateGrpcSer
 * @msg: 创建grpc服务端
 * @param {*}
 * @return {*}
 */
func CreateGrpcSer() {
	//初始化对象
	grpcSer := gRPC.NewServer()
	//注册服务
	pb.RegisterHelloServer(grpcSer, new(OutMsg))
	//设置监听,ip,port
	listener, err := net.Listen("tcp", "0.0.0.0:38000")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer listener.Close()
	//启动
	grpcSer.Serve(listener)
}
客户端
/**
 * @func: CreateGrpcCli
 * @msg: 创建grpc客户端
 * @param {*}
 * @return {*}
 */
func CreateGrpcCli() {
	//连接grpc服务
	grpcConn, err := gRPC.Dial("192.168.11.140:38000", gRPC.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		return
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewHelloClient(grpcConn)
	//调用远程服务
	pNumber, err := grpcClient.SayHello(context.TODO(), &pb.Person{})
	if err != nil {
		log.Fatal(err)
		return
	}
	fmt.Println(pNumber)
}

服务端数据流模式(Server-side streaming RPC)

客户端发起一次请求,服务端返回一段连续的数据流。
结束传输服务端会传送EOF

客户端数据流模式(Client-side streaming RPC)

与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应

双向数据流模式(Bidirectional streamin RPC)

客户端和服务端都可以同时向对方发送数据流

关键字 stream

stream.proto

syntax="proto3";

option go_package="../;pb";

//关键字 stream
service Greeter{
	rpc GetStream(StreamReqData) returns (stream StreamResData){} // 服务端流模式
	rpc PostStream(stream StreamReqData) returns (StreamResData); //客户端流模式
	rpc AllStream(stream StreamReqData)returns(stream StreamResData)//双向流模式
}
message StreamReqData{
	string data=1;
}
message StreamResData{
	string data=1;
}
server.go

package main

import (
	"fmt"
	"grpcStream/pb"
	"log"
	"net"
	"sync"
	"time"

	"google.golang.org/grpc"
)

const PORT = ":50052"

/*注意 服务端接口函数定义
// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
	GetStream(*StreamReqData, Greeter_GetStreamServer) error
	PostStream(Greeter_PostStreamServer) error
	AllStream(Greeter_AllStreamServer) error
}
*/
type Serv struct{}

/**
 * @func:
 * @msg: 向客户端推送数据流
 * @param {*pb.StreamReqData} req
 * @param {pb.Greeter_GetStreamServer} res
 * @return {error}
 */
func (s *Serv) GetStream(req *pb.StreamReqData, res pb.Greeter_GetStreamServer) (err error) {
	fmt.Println(req.Data)
	for i := 0; i < 10; i++ {
		err := res.Send(&pb.StreamResData{
			Data: fmt.Sprintf("%v,%d", time.Now().Unix(), i),
		})
		if err != nil {
			log.Fatalln(err)
			break
		}
		// time.Sleep(time.Second)
	}
	return err
}

/**
 * @func:
 * @msg:接收客户端推送数据
 * @param {pb.Greeter_PostStreamServer} cliStr
 * @return {*}
 */
func (s *Serv) PostStream(cliStr pb.Greeter_PostStreamServer) (err error) {
	for {
		if a, err := cliStr.Recv(); err != nil {
			fmt.Printf("err:%v", err)
			break
		} else {
			fmt.Println(a.Data)
		}
	}
	return err
}

/**
 * @func:
 * @msg: 双向推流服务端
 * @param {pb.Greeter_AllStreamServer} allStr
 * @return {*}
 */
func (s *Serv) AllStream(allStr pb.Greeter_AllStreamServer) (err error) {
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			if a, err := allStr.Recv(); err != nil {
				fmt.Printf("err:%v", err)
				break
			} else {
				fmt.Println(a.Data)
			}
		}
	}()
	go func() {
		defer wg.Done()
		for {
			err = allStr.Send(&pb.StreamResData{
				Data: fmt.Sprintf("server_send:%v", time.Now().Unix()),
			})
			if err != nil {
				fmt.Printf("err:%v", err)
				break
			}
			time.Sleep(time.Second)
		}
	}()
	defer wg.Wait()
	return err
}
func main() {
	lis, err := net.Listen("tcp", PORT)
	if err != nil {
		panic(err)
	}
	defer lis.Close()
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, new(Serv))
	err = s.Serve(lis)
	if err != nil {
		panic(err)
	}
}
client.go

package main

import (
	"context"
	"fmt"
	"grpcStream/pb"
	"log"
	"sync"
	"time"

	"google.golang.org/grpc"
)

/**
 * @func: GetGrpcCli
 * @msg: 创建grpc客户端-服务端流模式
 * @param {*}
 * @return {*}
 */
func GetGrpcCli() error {
	//连接grpc服务
	grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		panic(err)
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewGreeterClient(grpcConn)
	//调用远程服务
	res, err := grpcClient.GetStream(context.TODO(), &pb.StreamReqData{Data: "req"})
	if err != nil {
		log.Fatal(err)
		panic(err)
	}
	for {
		a, err := res.Recv()
		if err != nil {
			fmt.Println(err)
			break
		}
		fmt.Println(a)
	}
	panic(err)
}

/**
 * @func:
 * @msg: 创建grpc客户端-客户端流模式
 * @param {*}
 * @return {*}
 */
func PostGrpcCli() error {
	//连接grpc服务
	grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		return err
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewGreeterClient(grpcConn)
	//调用远程服务
	postS, err := grpcClient.PostStream(context.TODO())
	if err != nil {
		log.Fatal(err)
		return err
	}
	for i := 0; i < 5; i++ {
		err = postS.Send(&pb.StreamReqData{
			Data: fmt.Sprintf("%v,%d", time.Now().Unix(), i),
		})
		if err != nil {
			log.Fatalln(err)
			break
		}
		time.Sleep(time.Second)
	}
	return err
}

/**
 * @func:
 * @msg: 双向推流客户端
 * @param {*}
 * @return {*}
 */
func AllGrpcCli() error {
	grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		return err
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewGreeterClient(grpcConn)
	//调用远程服务
	allStr, err := grpcClient.AllStream(context.Background())
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			if a, err := allStr.Recv(); err != nil {
				fmt.Printf("err:%v", err)
				break
			} else {
				fmt.Println(a.Data)
			}
		}
	}()
	go func() {
		defer wg.Done()
		for {
			err = allStr.Send(&pb.StreamReqData{
				Data: fmt.Sprintf("client_send:%v", time.Now().Unix()),
			})
			if err != nil {
				fmt.Printf("err:%v", err)
				break
			}
			time.Sleep(time.Second)
		}
	}()
	defer wg.Wait()
	return err
}
func main() {
	GetGrpcCli()
	PostGrpcCli()
	AllGrpcCli()
}

标签:return,err,gRPC,fmt,pb,grpc,func
来源: https://www.cnblogs.com/wangzhilei-src/p/15253165.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有