可观测-日志

在日常开发中的日志相关的实践

1、zap日志库

2、中间件客户端日志接口的实现

3、efk日志,日志收集与展示

4、k8s下多pod,日志收集与展示

5、报警

1、zap日志

github.com/luxun9527/zaplog

日志库是项目开始首先就要确定的库,zap可以说是goland中使用的最广泛的库,这是一个我的配置

package zaplog

import (
    "github.com/luxun9527/zaplog/report"
    "github.com/mitchellh/mapstructure"
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
    "gopkg.in/natefinch/lumberjack.v2"
    "log"
    "os"
    "reflect"
    "time"
)

const (
    // _defaultBufferSize specifies the default size used by Buffer.
    _defaultBufferSize = 256 * 1024 // 256 kB

    // _defaultFlushInterval specifies the default flush interval for
    // Buffer.
    _defaultFlushInterval = 30 * time.Second
)
const (
    _file    = "file"
    _console = "console"
)

type Config struct {
    Name string `json:",optional" mapstructure:"name"`
    //日志级别 debug info warn panic
    Level zap.AtomicLevel `json:"Level" mapstructure:"level"`
    //在error级别的时候 是否显示堆栈
    Stacktrace bool `json:",default=true" mapstructure:"stacktrace"`
    //添加调用者信息
    AddCaller bool `json:",default=true" mapstructure:"addCaller"`
    //调用链,往上多少级 ,在一些中间件,对日志有包装,可以通过这个选项指定。
    CallerShip int `json:",default=3" mapstructure:"callerShip"`
    //输出到哪里标准输出console,还是文件file
    Mode string `json:",default=console" mapstructure:"mode"`
    //文件名称加路径
    FileName string `json:",optional" mapstructure:"filename"`
    //error级别的日志输入到不同的地方,默认console 输出到标准错误输出,file可以指定文件
    ErrorFileName string `json:",optional" mapstructure:"errorFileName"`
    // 日志文件大小 单位MB 默认500MB
    MaxSize int `json:",optional" mapstructure:"maxSize"`
    //日志保留天数
    MaxAge int `json:",optional" mapstructure:"maxAge"`
    //日志最大保留的个数
    MaxBackup int `json:",optional" mapstructure:"maxBackUp"`
    //异步日志 日志将先输入到内存到,定时批量落盘。如果设置这个值,要保证在程序退出的时候调用Sync(),在开发阶段不用设置为true。
    Async bool `json:",optional" mapstructure:"async"`
    //是否输出json格式
    Json bool `json:",optional" mapstructure:"json"`
    //是否日志压缩
    Compress bool `json:",optional" mapstructure:"compress"`
    // file 模式是否输出到控制台
    Console bool `json:"console" mapstructure:"console"`
    // 非json格式,是否加上颜色。
    Color bool `json:",default=true" mapstructure:"color"`
    //是否report
    IsReport bool `json:",optional" mapstructure:"isReport"`
    //report配置
    ReportConfig report.ReportConfig `json:",optional" mapstructure:"reportConfig"`
    options      []zap.Option
}

func (lc *Config) UpdateLevel(level zapcore.Level) {
    lc.Level.SetLevel(level)
}

func (lc *Config) Build() *zap.Logger {
    if lc.Mode != _file && lc.Mode != _console {
        log.Panicln("mode must be console or file")
    }

    if lc.Mode == _file && lc.FileName == "" {
        log.Panicln("file mode, but file name is empty")
    }
    var (
        ws      zapcore.WriteSyncer
        errorWs zapcore.WriteSyncer
        encoder zapcore.Encoder
    )
    encoderConfig := zapcore.EncoderConfig{
        //当存储的格式为JSON的时候这些作为可以key
        MessageKey:    "message",
        LevelKey:      "level",
        TimeKey:       "time",
        NameKey:       "logger",
        CallerKey:     "caller",
        StacktraceKey: "stacktrace",
        LineEnding:    zapcore.DefaultLineEnding,
        //以上字段输出的格式
        EncodeLevel:    zapcore.LowercaseLevelEncoder,
        EncodeTime:     CustomTimeEncoder,
        EncodeDuration: zapcore.SecondsDurationEncoder,
        EncodeCaller:   zapcore.FullCallerEncoder,
    }
    if lc.Mode == _console {
        ws = zapcore.Lock(os.Stdout)
    } else {
        normalConfig := &lumberjack.Logger{
            Filename:   lc.FileName,
            MaxSize:    lc.MaxSize,
            MaxAge:     lc.MaxAge,
            MaxBackups: lc.MaxBackup,
            LocalTime:  true,
            Compress:   lc.Compress,
        }
        if lc.ErrorFileName != "" {
            errorConfig := &lumberjack.Logger{
                Filename:   lc.ErrorFileName,
                MaxSize:    lc.MaxSize,
                MaxAge:     lc.MaxAge,
                MaxBackups: lc.MaxBackup,
                LocalTime:  true,
                Compress:   lc.Compress,
            }
            errorWs = zapcore.Lock(zapcore.AddSync(errorConfig))
        }
        ws = zapcore.Lock(zapcore.AddSync(normalConfig))
    }
    //是否加上颜色。
    if lc.Color && !lc.Json {
        encoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
    }
    encoder = zapcore.NewConsoleEncoder(encoderConfig)
    if lc.Json {
        encoder = zapcore.NewJSONEncoder(encoderConfig)
    }

    if lc.Async {
        ws = &zapcore.BufferedWriteSyncer{
            WS:            ws,
            Size:          _defaultBufferSize,
            FlushInterval: _defaultFlushInterval,
        }
        if errorWs != nil {
            errorWs = &zapcore.BufferedWriteSyncer{
                WS:            errorWs,
                Size:          _defaultBufferSize,
                FlushInterval: _defaultFlushInterval,
            }
        }
    }

    var c = []zapcore.Core{zapcore.NewCore(encoder, ws, lc.Level)}
    if errorWs != nil {
        highCore := zapcore.NewCore(encoder, errorWs, zapcore.ErrorLevel)
        c = append(c, highCore)
    }
    //文件模式同时输出到控制台
    if lc.Mode == _file && lc.Console {
        consoleWs := zapcore.NewCore(encoder, zapcore.Lock(os.Stdout), zapcore.ErrorLevel)
        c = append(c, consoleWs)
    }
    if lc.IsReport {
        //上报的格式一律json
        if !lc.Json {
            encoderConfig.EncodeLevel = zapcore.LowercaseLevelEncoder
            encoder = zapcore.NewJSONEncoder(encoderConfig)
        }
        //指定级别的日志上报。
        highCore := zapcore.NewCore(encoder, report.NewReportWriterBuffer(lc.ReportConfig), lc.ReportConfig.Level)
        c = append(c, highCore)
    }

    core := zapcore.NewTee(c...)

    logger := zap.New(core)
    //是否新增调用者信息
    if lc.AddCaller {
        lc.options = append(lc.options, zap.AddCaller())
        if lc.CallerShip != 0 {
            lc.options = append(lc.options, zap.AddCallerSkip(lc.CallerShip))
        }
    }
    //当错误时是否添加堆栈信息
    if lc.Stacktrace {
        //在error级别以上添加堆栈
        lc.options = append(lc.options, zap.AddStacktrace(zap.ErrorLevel))
    }
    if lc.Name != "" {
        logger = logger.With(zap.String("project", lc.Name))
    }

    return logger.WithOptions(lc.options...)

}

func CustomTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
    enc.AppendString(t.Format("2006-01-02-15:04:05"))
}

// StringToLogLevelHookFunc viper的string转zapcore.Level
func StringToLogLevelHookFunc() mapstructure.DecodeHookFunc {
    return func(
        f reflect.Type,
        t reflect.Type,
        data interface{}) (interface{}, error) {
        if f.Kind() != reflect.String {
            return data, nil
        }
        atomicLevel, err := zap.ParseAtomicLevel(data.(string))
        if err != nil {
            return data, nil
        }
        // Convert it by parsing
        return atomicLevel, nil
    }
}

json格式,推荐测试,生产阶段使用,对后续的分析更加友好

{"level":"debug","time":"2024-08-31-22:26:39","caller":"E:/openproject/zaplog/zap_config_test.go:43","message":"test level debug","project":"test"}
{"level":"info","time":"2024-08-31-22:26:39","caller":"E:/openproject/zaplog/zap_config_test.go:44","message":"info level info","project":"test"}
{"level":"warn","time":"2024-08-31-22:26:39","caller":"E:/openproject/zaplog/zap_config_test.go:45","message":"warn level warn","project":"test"}
{"level":"error","time":"2024-08-31-22:26:39","caller":"E:/openproject/zaplog/zap_config_test.go:46","message":"error level error","project":"test"}
{"level":"panic","time":"2024-08-31-22:26:39","caller":"E:/openproject/zaplog/zap_config_test.go:47","message":"panic level panic","project":"test","stacktrace":"github.com/luxun9527/zaplog.TestConsoleJson\n\tE:/openproject/zaplog/zap_config_test.go:47\ntesting.tRunner\n\tE:/goroot/src/testing/testing.go:1689"}
--- FAIL: TestConsoleJson (0.00s)
panic: panic level panic [recovered]
    panic: panic level panic

非json格式。推荐开发阶段使用。可读性比较好。

2024-08-31-22:26:07    DEBUG    E:/openproject/zaplog/zap_config_test.go:43    test level debug    {"project": "test"}
2024-08-31-22:26:07    INFO    E:/openproject/zaplog/zap_config_test.go:44    info level info    {"project": "test"}
2024-08-31-22:26:07    WARN    E:/openproject/zaplog/zap_config_test.go:45    warn level warn    {"project": "test"}
2024-08-31-22:26:07    ERROR    E:/openproject/zaplog/zap_config_test.go:46    error level error    {"project": "test"}
2024-08-31-22:26:07    PANIC    E:/openproject/zaplog/zap_config_test.go:47    panic level panic    {"project": "test"}
github.com/luxun9527/zaplog.TestConsoleJson
    E:/openproject/zaplog/zap_config_test.go:47
testing.tRunner
    E:/goroot/src/testing/testing.go:1689
--- FAIL: TestConsoleJson (0.00s)
panic: panic level panic [recovered]
    panic: panic level panic

2、go中间件日志

在日常开发中,有一些日志不是并不是我们输出的,比如,一些中间件的sdk提供输出的日志。

如es,gorm,kafka,但是这样sdk都提供了自定义的日志输出接口。开发阶段首先要去确定封装好。

github.com/luxun9527/zaplog/blob/m...

gorm

gorm.io/zh_CN/docs/logger.html

github.com/moul/zapgorm2

示例es

/*
elastic.SetErrorLog(EsErrorLog), // 启用错误日志
elastic.SetInfoLog(EsInfoLog),  // 启用信息日志
*/

var (
    ErrorEsLogger = &esLogger{
        logger: logger.With(zap.String("module", EsModuleKey)).Sugar(),
        level:  zapcore.ErrorLevel,
    }
    InfoEsLogger = &esLogger{
        logger: logger.With(zap.String("module", EsModuleKey)).Sugar(),
        level:  zapcore.ErrorLevel,
    }
)

type esLogger struct {
    logger *zap.SugaredLogger
    level  zapcore.Level
}

func (esLog *esLogger) Printf(format string, v ...interface{}) {
    if esLog.level == zapcore.InfoLevel {
        esLog.logger.Infof(format, v...)
    } else {
        esLog.logger.Errorf(format, v...)
    }
}

3、efk日志收集与展示

efk安装

gitee.com/zhengqingya/docker-compo...

e :elasticsearch 日志的存储。

f: filebeat 日志收集,从日志文件中收集。
yuweizzz.github.io/post/detail_abo...
www.elastic.co/guide/en/beats/file...

k: kibana 日志的展示。

filebeat配置。

# 可参考 https://www.elastic.co/guide/en/beats/filebeat/7.14/filebeat-reference-yml.html

# 收集系统日志
filebeat:
  inputs:
    - type: log
      enabled: true
      paths:
        - /usr/share/filebeat/my-log/demo*.log
      tags: [ "es" ]
    # JSON 解析
      json.keys_under_root: true         # 将解析出的键值对直接放到根对象下
      json.add_error_key: true           # 如果解析出错,增加一个包含错误信息的字段
      json.message_key: "message"        # 如果 JSON 对象中有一个字段包含日志的消息内容,可以指定这个字段的名称
#      multiline:
#        pattern: '^\{'                # 匹配以 `{` 开头和 `}` 结尾的 JSON 行
#        negate: false                       # 如果不匹配,将该行与上一行合并 negate 控制 Filebeat 是将匹配到的行还是未匹配到的行作为多行日志的合并依据。true为不匹配的行,false为匹配的行
#        match: after                       # 将不匹配的行合并到前一行

processors:
  - drop_fields:
      fields: ["host", "ecs", "log", "prospector", "agent", "input", "beat", "offset"]
      ignore_missing: true
# 输出到elasticsearch
output.elasticsearch:
  hosts: [ "elasticsearch:9200" ] # ES地址,可以有多个,用逗号","隔开
  username: "elastic"             # ES用户名
  password: "123456"              # ES密码
  indices:
    - index: "%{[project]}-%{+yyyy.MM.dd}"     # 使用日志中的字段名
      when.contains:
        tags: "es"

使用docker-compose安装后直接就可以看到了,这个是在kibana中看到的效果。

img

4、k8s下多pod,日志收集与展示

前置:

k8s中pod的日志在/var/log/containers/目录下

k8s中可以通过helm安装efk

参考。

blog.csdn.net/jmmyhans/article/det...

cloud.tencent.com/developer/invent...

安装es

1、拉取helm
helm repo add elastic https://helm.elastic.co

helm pull elastic/elasticsearch --version 7.17.1

pull之后解压可以看到es默认是用使用statefulset+headless部署的,作为一个简单示例,只安装一个单节点,且并不配置https证书。

2、创建一个secret存储账号和密码

能有效减少密码暴露的风险。

kubectl create secret generic elastic-credentials \
  --from-literal=username=elastic --from-literal=password=admin123
3、创建pv

es需要一个30Gi的pv, 也是简单创建一个磁盘存储。

apiVersion: "v1"
kind: "PersistentVolume"
metadata:
  name: "es-0"
spec:
  capacity:
    storage: "50Gi"
  accessModes:
    - "ReadWriteOnce"
  persistentVolumeReclaimPolicy: Retain
  hostPath:
    path: /root/smb/k8s/es-0
4、自定义helm配置
---
## 设置集群名称
clusterName: "elasticsearch"
## 设置节点名称
nodeGroup: "master"
## 这个节点的角色,可以是数据节点同时也是master节点
roles:
  master: "true" 
  ingest: "true"
  data: "true"
  remote_cluster_client: "true"
  ml: "true"

## 指定镜像与镜像版本,现在国内很多镜像源都无法使用了。
image: "docker.1panel.dev/library/elasticsearch"
imageTag: "7.17.1"
imagePullPolicy: "IfNotPresent"
## 副本数
replicas: 1

## JVM 配置参数
esJavaOpts: "-Xmx1g -Xms1g"
## 部署资源配置(生成环境一定要设置大些)
resources:
  requests:
    cpu: "1000m"
    memory: "2Gi"
  limits:
    cpu: "1000m"
    memory: "2Gi"
## 数据持久卷配置
persistence:
  enabled: true
volumeClaimTemplate:
  accessModes: [ "ReadWriteOnce" ]
  resources:
    requests:
      storage: 30Gi
# ============安全配置============
#单机版是yellow
clusterHealthCheckParams: "wait_for_status=yellow&timeout=1s"

#挂载前面的secret
secretMounts:
  - name: elastic-certificates
    secretName: elastic-certificates
    path: /usr/share/elasticsearch/config/certs
protocol: http

esConfig:
  elasticsearch.yml: |
    xpack.security.enabled: true
    xpack.security.transport.ssl.enabled: true
    xpack.security.authc.api_key.enabled: true

service:
  enabled: true
  labels: {}
  labelsHeadless: {}
  type: NodePort
  # Consider that all endpoints are considered "ready" even if the Pods themselves are not
  # https://kubernetes.io/docs/reference/kubernetes-api/service-resources/service-v1/#ServiceSpec
  publishNotReadyAddresses: false
  nodePort: "32003"
  annotations: {}
  httpPortName: http
  transportPortName: transport
  loadBalancerIP: ""
  loadBalancerSourceRanges: []
  externalTrafficPolicy: ""

extraEnvs:
  - name: ELASTIC_USERNAME
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: username
  - name: ELASTIC_PASSWORD
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: password
# ============调度配置============
## 设置调度策略
## - hard:只有当有足够的节点时 Pod 才会被调度,并且它们永远不会出现在同一个节点上
## - soft:尽最大努力调度
antiAffinity: "hard"
## 容忍配置(一般 kubernetes master 或其它设置污点的节点,只有指定容忍才能进行调度,如果测试环境只有三个节点,则可以开启在 master 节点安装应用)
#tolerations:
#  - operator: "Exists"  ##容忍全部污点

执行命令安装。

helm install es elasticsearch -f ./extra.yaml

安装kibana

类似上面的操作,先下载helm包。

helm pull elastic/kibana --version 7.17.1

然后指定一个我们自定义的extra.yaml

---
## 指定镜像与镜像版本
image: "docker.1panel.dev/library/kibana"
imageTag: "7.17.1"
## 配置 集群内ElasticSearch 地址
elasticsearchHosts: "http://elasticsearch-master-0.elasticsearch-master-headless.default:9200"
# ============环境变量配置============
## 环境变量配置,这里引入上面设置的用户名、密码 secret 文件
extraEnvs:
  - name: ELASTICSEARCH_USERNAME
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: username
  - name: ELASTICSEARCH_PASSWORD
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: password
# ============资源配置============
resources:
  requests:
    cpu: "1000m"
    memory: "2Gi"
  limits:
    cpu: "1000m"
    memory: "2Gi"
# ============配置 Kibana 参数============
kibanaConfig:
  kibana.yml: |
    i18n.locale: "zh-CN"

# ============Service 配置============
service:
  type: NodePort
  nodePort: "30601"

kibana 是用deploy的方式部署的。

helm install kibana kibana -f ./extra.yaml

安装filebeat

helm pull elastic/filebeat --version 7.17.1
image: "registry.cn-hangzhou.aliyuncs.com/zhengqing/filebeat"
imageTag: "7.14.1"


daemonset:
  filebeatConfig:
    filebeat.yml: |
      logging.level: debug

      filebeat.inputs:
      - type: container
        paths:
           - /var/log/containers/*.log
      processors:
        - script:
            lang: javascript
            id: clean_log_field
            source: >
               function process(event) {
                  var message = event.Get("message");

                }

        - decode_json_fields:  #解析json
            fields: ["message"]
            add_error_key: true
            target: ""  # 解析后的字段将放在根目录下
        - add_kubernetes_metadata: 
            #添加k8s描述字段
            default_indexers.enabled: true
            default_matchers.enabled: true
            host: ${NODE_NAME} 
            matchers:
            - logs_path:
                logs_path: "/var/log/containers/"
        - drop_event:
            when:
              not:
               equals: 
                kubernetes.namespace: "app"  # 只收集指定命名空间下的日志。
        - drop_fields:
            #删除的多余字段
            fields: ["host", "tags", "ecs", "log", "prospector", "agent","input", "beat", "offset","message","container","stream","container","kubernetes.namespace_labels","kubernetes.node.labels","kubernetes.node.uid","kubernetes.deployment","kubernetes.namespace_uid"]
            ignore_missing: true

      output.elasticsearch:
        host: '${NODE_NAME}'
        hosts: 'http://elasticsearch-master-0.elasticsearch-master-headless.default:9200'
        username: "elastic"             # ES用户名
        password: "admin123"              # ES密码
        indices:    # 自动创建索引,使用命名空间名+pod名字+时间的格式。
             - index: "%{[kubernetes.namespace]}-%{[kubernetes.pod.name]}-%{+yyyy.MM.dd}"   # 自定义索引格式

# ============资源配置============
resources:
  requests:
    cpu: "1000m"
    memory: "2Gi"
  limits:
    cpu: "1000m"
    memory: "2Gi"
helm install filebeat  filebeat  -f ./extra.yaml

最终的效果。

img

5、告警

指定的日志级别告警。将告警信息发送到办公im中

elastic.ac.cn/guide/en/kibana/curr...

elastic通过办公im的webhook报警这个功能好像是收费的。推荐在zap日志中新增一个core来实现告警功能。具体可以参考我的实现github.com/luxun9527/zaplog/tree/m... 您的评论,star都是对我的鼓励。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!