Go-kratos 框架商城微服务实战之商城服务 (五) BFF API

Go-kratos 框架商城微服务实战之用户服务 (五)

这篇主要给服务加入链路追踪,完善 consul,并测试 shop 的 http api 接口 文章写的不清晰的地方可通过 GitHub 源码进行查看, 也感谢您指出不足之处,欢迎大佬指教。

注:竖排 … 代码省略,为了保持文章的篇幅简洁,我会将一些不必要的代码使用竖排的 . 来代替,你在复制本文代码块的时候,切记不要将 . 也一同复制进去。

准备工作

安装 consul

# 这里使用的是 docker 工具进行创建的
docker run -d -p 8500:8500 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8600:8600/udp consul consul agent -dev -client=0.0.0.0

jaeger 的安装

# 这里使用的是 docker 工具进行创建的
docker run --rm --name jaeger -p14268:14268 -p16686:16686 jaegertracing/all-in-one
// 执行完毕之后,切记别退出服务
  • 浏览器访问 http://127.0.0.1:16686/ 验证是否安装成功

user 服务添加配置代码

consul 的配置前几篇文章都已经添加过了,这里就不重复添加了

  • user 项目中添加
 # user/configs/config.yaml 配置文件新增

...

trace:
  endpoint: http://127.0.0.1:14268/api/traces
  • 修改 user 的配置文件
...

message Bootstrap {
  Server server = 1;
  Data data = 2;
  Trace trace = 3; // 此处为新增的配置
}

...

message Trace {
  string endpoint = 1;
}
  • 生成 user 的 conf 文件
user 根目录执行命令,生成新的配置文件
make config
  • 修改 grpc.go 文件
package server

import (
    .
    .
    .
    "github.com/go-kratos/kratos/v2/middleware/tracing"     // 新增引入
)

// NewGRPCServer new a gRPC server.
func NewGRPCServer(c *conf.Server, u *service.UserService, logger log.Logger) *grpc.Server {
    var opts = []grpc.ServerOption{
        grpc.Middleware(
            recovery.Recovery(),
            tracing.Server(), // 新增 tracing
        ),
    }
    .
    .
    .
    v1.RegisterUserServer(srv, u)
    return srv
}
  • 修改 main.go 文件
package main

import (
    "flag"
    "github.com/go-kratos/kratos/v2/registry"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "os"

    "github.com/go-kratos/kratos/v2"
    "github.com/go-kratos/kratos/v2/config"
    "github.com/go-kratos/kratos/v2/config/file"
    "github.com/go-kratos/kratos/v2/log"
    "github.com/go-kratos/kratos/v2/middleware/tracing"
    "github.com/go-kratos/kratos/v2/transport/grpc"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    tracesdk "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    "user/internal/conf"
)

// go build -ldflags "-X main.Version=x.y.z"
var (
    // Name is the name of the compiled software.
    Name = "shop.user.service"
    // Version is the version of the compiled software.
    Version "user.v1"
    // flagconf is the config flag.
    flagconf string

    id, _ = os.Hostname()
)

func init() {
    flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml")
}

func newApp(logger log.Logger, gs *grpc.Server, rr registry.Registrar) *kratos.App {
    return kratos.New(
        kratos.ID(id+"user service"),
        kratos.Name(Name),
        kratos.Version(Version),
        kratos.Metadata(map[string]string{}),
        kratos.Logger(logger),
        kratos.Server(
            gs,
        ),
        kratos.Registrar(rr), // 服务注册与发现
    )
}

// Set global trace provider 设置链路追逐的方法
func setTracerProvider(url string) error {
    // Create the Jaeger exporter
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
    if err != nil {
        return err
    }
    tp := tracesdk.NewTracerProvider(
        // Set the sampling rate based on the parent span to 100%
        tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))),
        // Always be sure to batch in production.
        tracesdk.WithBatcher(exp),
        // Record information about this application in an Resource.
        tracesdk.WithResource(resource.NewSchemaless(
            semconv.ServiceNameKey.String(Name),
            attribute.String("env", "dev"),
        )),
    )
    otel.SetTracerProvider(tp)
    return nil
}

func main() {
    flag.Parse()
    logger := log.With(log.NewStdLogger(os.Stdout),
        "ts", log.DefaultTimestamp,
        "caller", log.DefaultCaller,
        "service.id", id,
        "service.name", Name,
        "service.version", Version,
        "trace_id", tracing.TraceID(),
        "span_id", tracing.SpanID(),
    )
    c := config.New(
        config.WithSource(
            file.NewSource(flagconf),
        ),
    )
    defer c.Close()

    if err := c.Load(); err != nil {
        panic(err)
    }

    var bc conf.Bootstrap
    if err := c.Scan(&bc); err != nil {
        panic(err)
    }
  // 加入链路追踪的配置
    if err := setTracerProvider(bc.Trace.Endpoint); err != nil {
        panic(err)
    }

    var rc conf.Registry
    if err := c.Scan(&rc); err != nil {
        panic(err)
    }

    app, cleanup, err := initApp(bc.Server, &rc, bc.Data, logger)
    if err != nil {
        panic(err)
    }
    defer cleanup()

    // start and wait for stop signal
    if err := app.Run(); err != nil {
        panic(err)
    }
}

修改 wire.go 文件

根目录执行命令,生成新的 wire_gen.go 文件
 make wire

shop 项目中添加配置代码

前几篇已经把 consul service 的一些配置加入到了 config 文件中,这里就不重复添加了

  • 修改 config.yaml 文件

    考虑到这个配置文件的重要性,这里贴出来了全部的配置

 name: shop.api
server:
  http:
    addr: 0.0.0.0:8097
    timeout: 1s
  grpc:
    addr: 0.0.0.0:9001
    timeout: 1s
data:
  database:
    driver: mysql
    source: root:root@tcp(127.0.0.1:3306)/test
  redis:
    addr: 127.0.0.1:6379
    read_timeout: 0.2s
    write_timeout: 0.2s
trace:
  endpoint: http://127.0.0.1:14268/api/traces
auth:
  jwt_key: hqFr%3ddt32DGlSTOI5cO6@TH#fFwYnP$S
service:
  user:
    endpoint: discovery:///shop.user.service
  goods:
    endpoint: discovery:///shop.goods.service
  • 修改 conf.proto 文件

    考虑到这个配置文件的重要性,这里贴出来了全部的配置

syntax = "proto3";
package shop.api;

option go_package = "shop/internal/conf;conf";

import "google/protobuf/duration.proto";

message Bootstrap {
  Server server = 1;
  Data data = 2;
  Trace trace = 3;
  Auth auth = 4;
  Service service = 5;
}

message Server {
  message HTTP {
    string network = 1;
    string addr = 2;
    google.protobuf.Duration timeout = 3;
  }
  message GRPC {
    string network = 1;
    string addr = 2;
    google.protobuf.Duration timeout = 3;
  }
  HTTP http = 1;
  GRPC grpc = 2;
}

message Data {
  message Database {
    string driver = 1;
    string source = 2;
  }
  message Redis {
    string network = 1;
    string addr = 2;
    google.protobuf.Duration read_timeout = 3;
    google.protobuf.Duration write_timeout = 4;
  }
  Database database = 1;
  Redis redis = 2;
}

message Service {
  message User {
    string endpoint = 1;
  }
  message Goods {
    string endpoint = 1;
  }
  User user = 1;
  Goods goods = 2;
}

message Trace {
  string endpoint = 1;
}


message Registry {
  message Consul {
    string address = 1;
    string scheme = 2;
  }
  Consul consul = 1;
}

message Auth {
  string jwt_key = 1;
}
  • 生成新的配置
user 根目录执行命令,生成新的配置文件
make config
  • 修改链接用户服务的连接

    shop/internal/data/data.go

package data

import (
    "context"
    consul "github.com/go-kratos/kratos/contrib/registry/consul/v2"
    "github.com/go-kratos/kratos/v2/log"
    "github.com/go-kratos/kratos/v2/middleware/recovery"
    "github.com/go-kratos/kratos/v2/middleware/tracing"
    "github.com/go-kratos/kratos/v2/registry"
    "github.com/go-kratos/kratos/v2/transport/grpc"
    "github.com/google/wire"
    consulAPI "github.com/hashicorp/consul/api"
    grpcx "google.golang.org/grpc"
    userV1 "shop/api/service/user/v1"
    "shop/internal/conf"
    "time"
)

// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewUserRepo, NewUserServiceClient, NewRegistrar, NewDiscovery)

// Data .
type Data struct {
    log *log.Helper
    uc  userV1.UserClient
}

// NewData .
func NewData(c *conf.Data, uc userV1.UserClient, logger log.Logger) (*Data, error) {
    l := log.NewHelper(log.With(logger, "module", "data"))
    return &Data{log: l, uc: uc}, nil
}

// NewUserServiceClient 链接用户服务 grpc
func NewUserServiceClient(ac *conf.Auth, sr *conf.Service, rr registry.Discovery) userV1.UserClient {
    conn, err := grpc.DialInsecure(
        context.Background(),
        grpc.WithEndpoint(sr.User.Endpoint),
        grpc.WithDiscovery(rr),
        grpc.WithMiddleware(
            tracing.Client(), // 链路追踪
            recovery.Recovery(),
        ),
        grpc.WithTimeout(2*time.Second),
        grpc.WithOptions(grpcx.WithStatsHandler(&tracing.ClientHandler{})),
    )
    if err != nil {
        panic(err)
    }
    c := userV1.NewUserClient(conn)
    return c
}

// NewRegistrar add consul
func NewRegistrar(conf *conf.Registry) registry.Registrar {
    c := consulAPI.DefaultConfig()
    c.Address = conf.Consul.Address
    c.Scheme = conf.Consul.Scheme
    cli, err := consulAPI.NewClient(c)
    if err != nil {
        panic(err)
    }
    r := consul.New(cli, consul.WithHealthCheck(false))
    return r
}

func NewDiscovery(conf *conf.Registry) registry.Discovery {
    c := consulAPI.DefaultConfig()
    c.Address = conf.Consul.Address
    c.Scheme = conf.Consul.Scheme
    cli, err := consulAPI.NewClient(c)
    if err != nil {
        panic(err)
    }
    r := consul.New(cli, consul.WithHealthCheck(false))
    return r
}
  • 修改 server http 服务
package server

import (
    "context"
    "github.com/go-kratos/kratos/v2/log"
    "github.com/go-kratos/kratos/v2/middleware/auth/jwt"
    "github.com/go-kratos/kratos/v2/middleware/logging"
    "github.com/go-kratos/kratos/v2/middleware/recovery"
    "github.com/go-kratos/kratos/v2/middleware/selector"
    "github.com/go-kratos/kratos/v2/middleware/tracing"
    "github.com/go-kratos/kratos/v2/middleware/validate"
    "github.com/go-kratos/kratos/v2/transport/http"
    jwt2 "github.com/golang-jwt/jwt/v4"
    "github.com/gorilla/handlers"
    v1 "shop/api/shop/v1"
    "shop/internal/conf"
    "shop/internal/service"
)

// NewHTTPServer new an HTTP server.
func NewHTTPServer(c *conf.Server, ac *conf.Auth, s *service.ShopService, logger log.Logger) *http.Server {
    var opts = []http.ServerOption{
        http.Middleware(
            recovery.Recovery(),
            validate.Validator(),
            tracing.Server(), // 这里是本篇新增的
            selector.Server(
                jwt.Server(func(token *jwt2.Token) (interface{}, error) {
                    return []byte(ac.JwtKey), nil
                }, jwt.WithSigningMethod(jwt2.SigningMethodHS256)),
            ).Match(NewWhiteListMatcher()).Build(),
            logging.Server(logger),
        ),
        http.Filter(handlers.CORS(
            handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}),
            handlers.AllowedMethods([]string{"GET", "POST", "PUT", "HEAD", "OPTIONS"}),
            handlers.AllowedOrigins([]string{"*"}),
        )),
    }

  ...

    return srv
}

...
  • 修改启动文件
package main

import (
    "flag"
    "os"

    "github.com/go-kratos/kratos/v2"
    "github.com/go-kratos/kratos/v2/config"
    "github.com/go-kratos/kratos/v2/config/file"
    "github.com/go-kratos/kratos/v2/log"
    "github.com/go-kratos/kratos/v2/middleware/tracing"
    "github.com/go-kratos/kratos/v2/registry"
    "github.com/go-kratos/kratos/v2/transport/grpc"
    "github.com/go-kratos/kratos/v2/transport/http"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    tracesdk "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.7.0"

    "shop/internal/conf"
)

// go build -ldflags "-X main.Version=x.y.z"
var (
    // Name is the name of the compiled software.
    Name = "shop.api"
    // Version is the version of the compiled software.
    Version = "shop.api.v1"
    // flagconf is the config flag.
    flagconf string

    id, _ = os.Hostname()
)

func init() {
    flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml")
}

func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server, rr registry.Registrar) *kratos.App {
    return kratos.New(
        kratos.ID(id+"shop.api"),
        kratos.Name(Name),
        kratos.Version(Version),
        kratos.Metadata(map[string]string{}),
        kratos.Logger(logger),
        kratos.Server(
            hs,
            //gs,
        ),
        kratos.Registrar(rr),
    )
}

func main() {
    flag.Parse()
    logger := log.With(log.NewStdLogger(os.Stdout),
        "ts", log.DefaultTimestamp,
        "caller", log.DefaultCaller,
        "service.id", id,
        "service.name", Name,
        "service.version", Version,
        "trace_id", tracing.TraceID(),
        "span_id", tracing.SpanID(),
    )
    c := config.New(
        config.WithSource(
            file.NewSource(flagconf),
        ),
    )
    defer c.Close()

    if err := c.Load(); err != nil {
        panic(err)
    }

    var bc conf.Bootstrap
    if err := c.Scan(&bc); err != nil {
        panic(err)
    }

    var rc conf.Registry
    if err := c.Scan(&rc); err != nil {
        panic(err)
    }

    err := setTracerProvider(bc.Trace.Endpoint)
    if err != nil {
        panic(err)
    }

    app, cleanup, err := initApp(bc.Server, bc.Data, bc.Auth, bc.Service, &rc, logger)
    if err != nil {
        panic(err)
    }
    defer cleanup()

    // start and wait for stop signal
    if err := app.Run(); err != nil {
        panic(err)
    }
}

func setTracerProvider(url string) error {
    // Create the Jaeger exporter
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
    if err != nil {
        return err
    }
    tp := tracesdk.NewTracerProvider(
        // Set the sampling rate based on the parent span to 100%
        tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))),
        // Always be sure to batch in production.
        tracesdk.WithBatcher(exp),
        // Record information about this application in an Resource.
        tracesdk.WithResource(resource.NewSchemaless(
            semconv.ServiceNameKey.String(Name),
            attribute.String("env", "dev"),
        )),
    )
    otel.SetTracerProvider(tp)
    return nil
}
  • 修改 wire.go 文件
根目录执行命令,生成新的 wire_gen.go 文件
 make wire

完整流程测试

  • 启动 user 服务
user 目录下执行命令
kratos run
  • 启动 shop 服务
shop 目录下执行命令
kratos run
  • consul 验证是不是两个服务都启动了

    浏览器访问 127.0.0.1:8500/

访问用户创建接口

这里使用的是 apipost 接口测试工具, 具体操作看图示,注意传入的参数类型为 json

如图正确返回了,就证明接口访问成功,可以去数据库表中验证,是否是同样的数据插入

如图所示选择 shop.api 然后点击下方的 Find Traces

搜索之后点击进去,看到如图

这里需要注意,在咱们的代码中并未设置成统一的 SpanId,只有 TraceId 是一样的。

结束语

整个服务流程已经通了,这里需要注意的点是,访问用户登陆接口的时候,需要先获取一个验证码接口,然后拿到验证码给的 ID 和 code 进行登陆请求。code 是个 url 需要通过浏览器访问才能看到具体 code 是什么。还有查询用户详细信息的时候,需要携带注册或登陆给的 token。携带的方式是 bearer auth 的方式。

接下来会开始完善用户服务的其他信息,如:用户的收获地址之类的。感谢您的耐心阅读,动动手指点个赞吧。

本作品采用《CC 协议》,转载必须注明作者和本文链接
微信搜索:上帝喜爱笨人
本帖由系统于 1年前 自动加精
讨论数量: 22

为什么还要在shop模块把user相关的proto再复制生成一遍呢?不可以在user模块注册服务,shop模块调用服务吗?这样设计的话感觉shop和user就不能分项目了。

1年前 评论
Aliliin (楼主) 1年前
Martin567 (作者) 1年前

还有一个问题,大佬。这里的shop模块是不是可以不要 kratos.Registrar(rr)呢?因为user模块是注册,shop模块是发现。

1年前 评论
Martin567 (作者) 1年前
Aliliin (楼主) 1年前
changyue123 2个月前

注册shop.api后 在consul看到原来注册好的shop.user.service消失 请问有碰到这种情况吗

1年前 评论
Aliliin (楼主) 1年前
fenghe415 (作者) 1年前

咨询一个问题: 期望根据请求中的版本参数来读取对应的配置。 如:请求体中 version 的值是 1.0.1, 就读取 1.0.1 目录下的配置文件。每个版本目录下的配置文件格式都是一样的,但是值会有变化。

根据文档中配置来看, 相同的 key 会进行覆盖,也就是说:不可以通过添加多个path 来解决,因为path 下key都是相同的。如下:

path1 = path.Join(confPath, "1.0.1")
path2 = path.Join(confPath, "1.0.2")
config.WithSource(
   // 添加配置文件
   file.NewSource(path1),
   file.NewSource(path2),
))

如果将版本号作为key,直接写在配置文件中是能达到效果,但是这种方式感觉不太好,并且后面维护也比较麻烦。

期望的是,如果可以在配置中生成 map[version]interface{} 这种格式,不知道有没有办法。或者别的解决方案。谢谢🙏

1年前 评论
Aliliin (楼主) 1年前

感谢🙏,楼主这种方式看起来更加优雅。

我当时是对于这种需要版本控制的配置文件做了重写,也可以达到预期的效果。代码如下:

//目录结构
├── data
│   └── 1.0.0
│       ├── init.json
│       └── reward.json
├── config.yaml
//代码块
func assertVersion(vc *conf.VersionConf) error{
    versionConfData := make(map[string]map[string]interface{})
    versionConfData["data"] = make(map[string]interface{})
    dirs, err := os.ReadDir(versionConfPath)
    if err != nil {
        return err
    }
    for _, dir := range dirs {
        sub := make(map[string]interface{})

        c := config.New(
            config.WithSource(
                file.NewSource(path.Join(versionConfPath, dir.Name())),
            ),
            config.WithDecoder(xconfig.Decoder),  //decoder ,map包含文件名
        )
        if err := c.Load(); err != nil {
            panic(err)
        }
        c.Scan(&sub)

        versionConfData["data"][dir.Name()] = sub
    }

    byteVersionConf, _ := json.Marshal(versionConfData)
    return json.Unmarshal(byteVersionConf, &vc)

}
1年前 评论

为什么我运行出来的会报错呢。

file

file

1年前 评论
lisus2000 (作者) 1年前
Aliliin (楼主) 1年前

'service' parameter must be provided once

1年前 评论
Aliliin (楼主) 1年前

我用的是ETCD做注册和发现的,如果我只是在user服务里面做个注册,而在shop服务里面做发现,这样我是不是就不用复制user下的proto文件那一步了,但是shop的服务发现的conn链接怎么给配置进去newapp 里面呢 :grin:

1年前 评论

file
如果是ETCD做的服务注册和发现 不能像这样写了
file 如果在user端做注册,在shop做发现 你的endpoint应该是你需要发现的服务名字(shop服务目前看是没必要写注册的)

1年前 评论
Aliliin (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!