gRPC流模式

gRPC中有四种数据流,分别是简单模式、服务端数据流模式、客户端数据流模式、双向数据流模式。

简单模式:客户端发起一次请求,服务端返回一个响应。

服务端数据流模式:客户端发起一次请求,服务端返回一段连续的数据流,如获取股票实时数据。

客户端数据流模式:客户端不断像服务器发送数据流,发送结束后,由服务端返回一个响应。如物联网终端向服务器发送数据。

双向数据流模式:客户端和服务端都可以向对方发送数据流,如实时聊天。

Proto文件

syntax = "proto3";

package proto;

option go_package = "/stream;stream";

message RequestInfo {
  string data = 1;
}

message ResponseInfo {
  string data = 1;
}

service Stream {
  rpc ServerStream (RequestInfo) returns (stream ResponseInfo) {} // 服务端数据流模式
  rpc ClientStream (stream RequestInfo) returns (ResponseInfo) {} // 客户端数据流模式
  rpc AllStream (stream RequestInfo) returns (stream ResponseInfo) {} // 双向数据流模式
}

使用 stream声明数据流模式。

服务端数据流模式

服务端

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "net"
    "test/stream"
    "time"
)

type Stream struct {
    stream.UnimplementedStreamServer
}

func (s *Stream) ServerStream(req *stream.RequestInfo, res stream.Stream_ServerStreamServer) error {
    fmt.Println(req.Data)
    for i := 0; i < 10; i++ {
        // 往客户端发送数据
        _ = res.Send(&stream.ResponseInfo{
            Data: fmt.Sprintf("%v", time.Now().Unix()),
        })
        time.Sleep(time.Second)
    }
    return nil
}

func main() {
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化grpc服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "test/stream"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    res, _ := client.ServerStream(context.Background(), &stream.RequestInfo{
        Data: "I am coming!",
    })
    for {
        // 接受服务端传送的数据
        data, err := res.Recv()
        if err != nil {
            break
        }
        fmt.Println(data)
    }
}

客户端数据流模式

服务端

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "net"
    "test/stream"
    "time"
)

type Stream struct {
    stream.UnimplementedStreamServer
}

func (s *Stream) ClientStream(cli stream.Stream_ClientStreamServer) error {
    for {
        // 接收客户端传输的数据
        if res, err := cli.Recv(); err != nil {
            fmt.Println(err)
            break
        } else {
            fmt.Println(res.Data)
        }
    }
    return nil
}

func main() {
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化grpc服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "test/stream"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    res, _ := client.ClientStream(context.Background())
    for i := 0; i < 10; i++ {
        // 往服务端发送数据
        _ = res.Send(&stream.RequestInfo{
            Data: fmt.Sprintf("%v", time.Now().Unix()),
        })

        time.Sleep(time.Second)
    }
}

双向数据流模式

服务端

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "net"
    "sync"
    "test/stream"
    "time"
)

type Stream struct {
    stream.UnimplementedStreamServer
}

func (s *Stream) AllStream(all stream.Stream_AllStreamServer) error {
    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
        defer wg.Done()
        for {
            // 接收客户端消息
            if res, err := all.Recv(); err != nil {
                fmt.Println(err)
                break
            } else {
                fmt.Println(res.Data)
            }
        }
    }()
    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
            // 往客户端发送消息
            _ = all.Send(&stream.ResponseInfo{
                Data: fmt.Sprintf("服务端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
    return nil
}

func main() {
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化grpc服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "test/stream"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    wg := sync.WaitGroup{}
    wg.Add(2)
    all, _ := client.AllStream(context.Background())
    go func() {
        defer wg.Done()
        for {
            // 接收服务端消息
            if res, err := all.Recv(); err != nil {
                fmt.Println(err)
                break
            } else {
                fmt.Println(res.Data)
            }
        }
    }()
    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
            // 往服务端发送消息
            _ = all.Send(&stream.RequestInfo{
                Data: fmt.Sprintf("客户端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!