gRPC的metadata
概念
metadata
是以 key-value
的形式存储数据的,其中 key
是字符串类型,value
是字符串数组类型,类似于 http
请求中的 header
。
文档
github.com/grpc/grpc-go/blob/maste...
新建metadata
// 使用metadata.New创建
md := metadata.New(map[string]string{"key": "value"})
// 使用metadata.New创建
md := metadata.Pairs(
"key1", "value1",
"key1", "value2",
"key2", "value2",
)
MD
本质上是一个 map[string][]string
类型。
客户端发送metadata
使用 metadata.NewOutgoingContext
设置。
// 设置metadata
md := metadata.New(map[string]string{"origin": "client"})
ctx := metadata.NewOutgoingContext(context.Background(), md)
服务端接收metadata
使用 metadata.FromIncomingContext
设置。
// 接收metadata
if md, ok := metadata.FromIncomingContext(ctx); ok {
fmt.Println("server:", md["origin"])
}
简单模式下metadata示例
proto文件
syntax = "proto3";
package proto;
option go_package = "/cal;cal";
message RequestInfo {
int64 number1 = 1;
int64 number2 = 2;
}
message ResponseInfo {
int64 res = 1;
}
service Cal {
rpc Add (RequestInfo) returns (ResponseInfo) {}
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"test/cal"
)
func main() {
// 建立连接
conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
// 实例化客户端
client := cal.NewCalClient(conn)
// 设置metadata
md := metadata.New(map[string]string{"origin": "client"})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 调用服务
res, _ := client.Add(ctx, &cal.RequestInfo{
Number1: 1,
Number2: 1,
})
fmt.Println(res.Res)
}
服务端
package main
import (
"fmt"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"net"
"test/cal"
)
type Cal struct {
cal.UnimplementedCalServer
}
func (c *Cal) Add(ctx context.Context, req *cal.RequestInfo) (*cal.ResponseInfo, error) {
// 接收metadata
if md, ok := metadata.FromIncomingContext(ctx); ok {
fmt.Println(md["origin"])
}
return &cal.ResponseInfo{Res: req.Number1 + req.Number2}, nil
}
func main() {
fmt.Println("start")
// 监听
listen, _ := net.Listen("tcp", ":8080")
// 实例化grpc服务
s := grpc.NewServer()
// 注册服务
cal.RegisterCalServer(s, &Cal{})
// 启动
s.Serve(listen)
}
流模式下metadata实例
proto文件
syntax = "proto3";
package proto;
option go_package = "/stream;stream";
message RequestInfo {
string data = 1;
}
message ResponseInfo {
string data = 1;
}
service Stream {
rpc AllStream (stream RequestInfo) returns (stream ResponseInfo) {}
}
客户端
流模式下客户端接收服务端的 metadata
使用 Header()
方法。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"sync"
"test/stream"
"time"
)
func main() {
// 建立连接
conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
// 实例化客户端
client := stream.NewStreamClient(conn)
// 调用服务
wg := sync.WaitGroup{}
wg.Add(2)
// 设置请求的metadata
md := metadata.New(map[string]string{
"origin": "client",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
all, _ := client.AllStream(ctx)
go func() {
defer wg.Done()
for {
if res, err := all.Recv(); err != nil {
fmt.Println(err)
break
} else {
// 打印服务端的metadata
header, _ := all.Header()
fmt.Println("header", header)
fmt.Println(res.Data)
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
_ = all.Send(&stream.RequestInfo{
Data: fmt.Sprintf("客户端消息:%v", time.Now().Unix()),
})
time.Sleep(time.Second)
}
}()
wg.Wait()
}
服务端
流模式下服务端发送 metadata
使用 SetHeader()
方法。
package main
import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"net"
"sync"
"test/stream"
"time"
)
type Stream struct {
stream.UnimplementedStreamServer
}
func (s *Stream) AllStream(all stream.Stream_AllStreamServer) error {
// 打印客户端的metadata
if md, ok := metadata.FromIncomingContext(all.Context()); ok {
fmt.Println(md["origin"])
}
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
if res, err := all.Recv(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println(res.Data)
}
}
}()
go func() {
defer wg.Done()
// 设置服务端的metadata
md := metadata.New(map[string]string{
"origin": "server",
})
all.SetHeader(md)
for i := 0; i < 10; i++ {
_ = all.Send(&stream.ResponseInfo{
Data: fmt.Sprintf("服务端消息:%v", time.Now().Unix()),
})
time.Sleep(time.Second)
}
}()
wg.Wait()
return nil
}
func main() {
fmt.Println("start")
// 监听
listen, _ := net.Listen("tcp", ":8080")
// 实例化grpc服务
s := grpc.NewServer()
// 注册服务
stream.RegisterStreamServer(s, &Stream{})
// 启动
s.Serve(listen)
}
本作品采用《CC 协议》,转载必须注明作者和本文链接