gRPC的metadata

概念

metadata是以 key-value的形式存储数据的,其中 key是字符串类型,value是字符串数组类型,类似于 http请求中的 header

文档

github.com/grpc/grpc-go/blob/maste...

新建metadata

// 使用metadata.New创建
md := metadata.New(map[string]string{"key": "value"})
// 使用metadata.New创建
md := metadata.Pairs(
    "key1", "value1",
    "key1", "value2",
    "key2", "value2",
)

MD本质上是一个 map[string][]string类型。

客户端发送metadata

使用 metadata.NewOutgoingContext设置。

// 设置metadata
md := metadata.New(map[string]string{"origin": "client"})
ctx := metadata.NewOutgoingContext(context.Background(), md)

服务端接收metadata

使用 metadata.FromIncomingContext设置。

// 接收metadata
if md, ok := metadata.FromIncomingContext(ctx); ok {
    fmt.Println("server:", md["origin"])
}

简单模式下metadata示例

proto文件

syntax = "proto3";

package proto;

option go_package = "/cal;cal";

message RequestInfo {
  int64 number1 = 1;
  int64 number2 = 2;
}

message ResponseInfo {
  int64 res = 1;
}

service Cal {
  rpc Add (RequestInfo) returns (ResponseInfo) {}
}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
    "test/cal"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := cal.NewCalClient(conn)

    // 设置metadata
    md := metadata.New(map[string]string{"origin": "client"})
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // 调用服务
    res, _ := client.Add(ctx, &cal.RequestInfo{
        Number1: 1,
        Number2: 1,
    })
    fmt.Println(res.Res)
}

服务端

package main

import (
    "fmt"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    "net"
    "test/cal"
)

type Cal struct {
    cal.UnimplementedCalServer
}

func (c *Cal) Add(ctx context.Context, req *cal.RequestInfo) (*cal.ResponseInfo, error) {
    // 接收metadata
    if md, ok := metadata.FromIncomingContext(ctx); ok {
        fmt.Println(md["origin"])
    }
    return &cal.ResponseInfo{Res: req.Number1 + req.Number2}, nil
}

func main() {
    fmt.Println("start")
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化grpc服务
    s := grpc.NewServer()
    // 注册服务
    cal.RegisterCalServer(s, &Cal{})
    // 启动
    s.Serve(listen)
}

流模式下metadata实例

proto文件

syntax = "proto3";

package proto;

option go_package = "/stream;stream";

message RequestInfo {
  string data = 1;
}

message ResponseInfo {
  string data = 1;
}

service Stream {
  rpc AllStream (stream RequestInfo) returns (stream ResponseInfo) {}
}

客户端

流模式下客户端接收服务端的 metadata使用 Header()方法。

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
    "sync"
    "test/stream"
    "time"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    wg := sync.WaitGroup{}
    wg.Add(2)
    // 设置请求的metadata
    md := metadata.New(map[string]string{
        "origin": "client",
    })
    ctx := metadata.NewOutgoingContext(context.Background(), md)
    all, _ := client.AllStream(ctx)
    go func() {
        defer wg.Done()
        for {
            if res, err := all.Recv(); err != nil {
                fmt.Println(err)
                break
            } else {
                // 打印服务端的metadata
                header, _ := all.Header()
                fmt.Println("header", header)
                fmt.Println(res.Data)
            }
        }
    }()
    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
            _ = all.Send(&stream.RequestInfo{
                Data: fmt.Sprintf("客户端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
}

服务端

流模式下服务端发送 metadata使用 SetHeader()方法。

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    "net"
    "sync"
    "test/stream"
    "time"
)

type Stream struct {
    stream.UnimplementedStreamServer
}

func (s *Stream) AllStream(all stream.Stream_AllStreamServer) error {
    // 打印客户端的metadata
    if md, ok := metadata.FromIncomingContext(all.Context()); ok {
        fmt.Println(md["origin"])
    }
    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
        defer wg.Done()
        for {
            if res, err := all.Recv(); err != nil {
                fmt.Println(err)
                break
            } else {
                fmt.Println(res.Data)
            }
        }
    }()
    go func() {
        defer wg.Done()
        // 设置服务端的metadata
        md := metadata.New(map[string]string{
            "origin": "server",
        })
        all.SetHeader(md)
        for i := 0; i < 10; i++ {
            _ = all.Send(&stream.ResponseInfo{
                Data: fmt.Sprintf("服务端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
    return nil
}

func main() {
    fmt.Println("start")
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化grpc服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!