gRPC流模式
gRPC
中有四种数据流,分别是简单模式、服务端数据流模式、客户端数据流模式、双向数据流模式。
简单模式:客户端发起一次请求,服务端返回一个响应。
服务端数据流模式:客户端发起一次请求,服务端返回一段连续的数据流,如获取股票实时数据。
客户端数据流模式:客户端不断像服务器发送数据流,发送结束后,由服务端返回一个响应。如物联网终端向服务器发送数据。
双向数据流模式:客户端和服务端都可以向对方发送数据流,如实时聊天。
Proto文件
syntax = "proto3";
package proto;
option go_package = "/stream;stream";
message RequestInfo {
string data = 1;
}
message ResponseInfo {
string data = 1;
}
service Stream {
rpc ServerStream (RequestInfo) returns (stream ResponseInfo) {} // 服务端数据流模式
rpc ClientStream (stream RequestInfo) returns (ResponseInfo) {} // 客户端数据流模式
rpc AllStream (stream RequestInfo) returns (stream ResponseInfo) {} // 双向数据流模式
}
使用 stream
声明数据流模式。
服务端数据流模式
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
"net"
"test/stream"
"time"
)
type Stream struct {
stream.UnimplementedStreamServer
}
func (s *Stream) ServerStream(req *stream.RequestInfo, res stream.Stream_ServerStreamServer) error {
fmt.Println(req.Data)
for i := 0; i < 10; i++ {
// 往客户端发送数据
_ = res.Send(&stream.ResponseInfo{
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
}
return nil
}
func main() {
// 监听
listen, _ := net.Listen("tcp", ":8080")
// 实例化grpc服务
s := grpc.NewServer()
// 注册服务
stream.RegisterStreamServer(s, &Stream{})
// 启动
s.Serve(listen)
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"test/stream"
)
func main() {
// 建立连接
conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
// 实例化客户端
client := stream.NewStreamClient(conn)
// 调用服务
res, _ := client.ServerStream(context.Background(), &stream.RequestInfo{
Data: "I am coming!",
})
for {
// 接受服务端传送的数据
data, err := res.Recv()
if err != nil {
break
}
fmt.Println(data)
}
}
客户端数据流模式
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
"net"
"test/stream"
"time"
)
type Stream struct {
stream.UnimplementedStreamServer
}
func (s *Stream) ClientStream(cli stream.Stream_ClientStreamServer) error {
for {
// 接收客户端传输的数据
if res, err := cli.Recv(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println(res.Data)
}
}
return nil
}
func main() {
// 监听
listen, _ := net.Listen("tcp", ":8080")
// 实例化grpc服务
s := grpc.NewServer()
// 注册服务
stream.RegisterStreamServer(s, &Stream{})
// 启动
s.Serve(listen)
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"test/stream"
)
func main() {
// 建立连接
conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
// 实例化客户端
client := stream.NewStreamClient(conn)
// 调用服务
res, _ := client.ClientStream(context.Background())
for i := 0; i < 10; i++ {
// 往服务端发送数据
_ = res.Send(&stream.RequestInfo{
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
}
}
双向数据流模式
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
"net"
"sync"
"test/stream"
"time"
)
type Stream struct {
stream.UnimplementedStreamServer
}
func (s *Stream) AllStream(all stream.Stream_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
// 接收客户端消息
if res, err := all.Recv(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println(res.Data)
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
// 往客户端发送消息
_ = all.Send(&stream.ResponseInfo{
Data: fmt.Sprintf("服务端消息:%v", time.Now().Unix()),
})
time.Sleep(time.Second)
}
}()
wg.Wait()
return nil
}
func main() {
// 监听
listen, _ := net.Listen("tcp", ":8080")
// 实例化grpc服务
s := grpc.NewServer()
// 注册服务
stream.RegisterStreamServer(s, &Stream{})
// 启动
s.Serve(listen)
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"test/stream"
)
func main() {
// 建立连接
conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
// 实例化客户端
client := stream.NewStreamClient(conn)
// 调用服务
wg := sync.WaitGroup{}
wg.Add(2)
all, _ := client.AllStream(context.Background())
go func() {
defer wg.Done()
for {
// 接收服务端消息
if res, err := all.Recv(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println(res.Data)
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
// 往服务端发送消息
_ = all.Send(&stream.RequestInfo{
Data: fmt.Sprintf("客户端消息:%v", time.Now().Unix()),
})
time.Sleep(time.Second)
}
}()
wg.Wait()
}
本作品采用《CC 协议》,转载必须注明作者和本文链接