Go 语言中使用 ETCD
Go语言中使用 ETCD
> 关于ETCD的简介可以在百度上或者google上查找。可能我们对他的应用环境更加感兴趣,下面我列举一些经典的应用场景:
服务发现(Service Discovery)
服务发现要解决的也是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立连接。
消息发布与订阅
在分布式系统中,最适用的一种组件间通信方式就是消息发布与订阅
负载均衡
分布式系统中,为了保证服务的高可用以及数据的一致性,通常都会把数据和服务部署多份,以此达到对等服务,即使其中的某一个服务失效了,也不影响使用。由此带来的坏处是数据写入性能下降,而好处则是数据访问时的负载均衡。
分布式通知与协调
与消息发布和订阅有些相似
分布式锁
因为etcd使用Raft算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序
分布式队列
分布式队列的常规用法与场景五中所描述的分布式锁的控制时序用法类似,即创建一个先进先出的队列,保证顺序。
集群监控与Leader竞
通过etcd来进行监控实现起来非常简单并且实时性强。
更多应用场景需要我们自己多探索
一. 安装 ETCD
由于 etcd 是Golang编写,所以安装和部署都非常简单,直接启动编译好的文件即可。但我们一般都会使用集群的etcd,这里推荐使用Docker。下面是我在GitHub上找到的Docker启动单机版etcd的docker-compose.yaml文件,如果项目中还有使用其他服务,可以自己添加在文件中一起启动。集群版的可以自己在GitHub上找对应的版本。
version: '3.8'
networks:
app-tier:
driver: bridge
services:
etcd:
image: 'bitnami/etcd:latest'
environment:
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
ports:
- 2379:2379
- 2380:2380
networks:
- app-tier
myapp:
image: 'bitnami/etcd:latest'
networks:
- app-tier
以上文件保存为 docker-compose.yaml
然后使用命令: docker-compse up -d
启动服务。
一个不错的一直维护的ETCD的Dockerfile和docker-compose的GitHub仓库:
> Github 地址:github.com/bitnami/bitnami-docker-...
二. 安装 Go 语言 ETCD 包
> 这里我使用Mac作为开发环境,Golang版本为 1.14.3
我们使用ETCD官方提供的ETCD的包,因为本身ETCD就是使用GOlang编写的,所以在Go中使用也是更加方便。
go get github.com/coreos/etcd
这里因为etcd依赖的包版本变更,导致不能运行,所以需要修改 go.mod 文件让ETCD跑起来。go.mod 文件如下:
github.com/coreos/etcd v3.3.22+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
//github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/go-systemd/v22 v22.1.0
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
go.etcd.io/bbolt v1.3.4 // indirect
go.etcd.io/etcd v3.3.22+incompatible
go.uber.org/zap v1.15.0 // indirect
//google.golang.org/grpc v1.29.1 // indirect
google.golang.org/grpc v1.26.0
> 上面被注释掉的两个包就是版本过高,导致etcd
不能正常运行而报错,所以换成对应的版本。
三. 创建 ETCD 连接
func mian(){
var (
client *clientv3.Client
config clientv3.Config
err error
)
config = clientv3.Config{
// 这里的 Endpoints 是一个字符串数组切片,支持配合多个节点
Endpoints: []string{"127.0.0.1:2379"},
// DialTimeout 连接超时设置
DialTimeout: time.Duration(5) * time.Millisecond,
}
if client, err = clientv3.New(config); err != nil {
return
}
}
这样就可以得到一个etcd
的客户端实例
四. ETCD 的 KV 操作
ETCD 中 KV
的操作需要一个KV实例,创建KV实例需要 etcd 的客户端实例来创建
kv := clientv3.NewKV(client)
拿到 KV
实例之后就可以对 KV 进行操作了,下面就是 KV 的三个基本操作, etcd 里面没有update操作,因为PUT操作已经满足我们创建和更新操作了,所以只有PUT。
1. KV 的 PUT 操作
我们先尝试往 etcd 中 put
一个数据
// 第一个参数为上下文,第二个为 KEY, 第三个为 VALUE。也可以传第四个参数 option,比如:给这个KEY 加一个过期时间, 在KEY过期机制里面会有详细记录
putResponse, err := kv.Put(context.TODO(),"/testDir/User/user1","user info")
在 etcd
的各种操作中都会有对应的 Response, put 操作也不例外,同样返回 putResponse,这是一个对象,里面包含Header, PrevKv。 PrevKv提供了 Put
之前的这个 key
的 KV 值。Header 提供了etcd 的 Revision 和其他 etcd 的信息和方法。
2. KV 的 GET 操作
我们依然使用刚刚拿到的 KV 实例进行操作
// 用法一: 第一个参数为上下文,第二个为 KEY,即可获取对应 Key 的 Value
getResponse, err := kv.Gut(context.TODO(),"/testDir/User/user1")
// 用法二: 第一个参数为上下文,第二个为 KEY,第三个为可选参数 option,这个操作会返回前缀为 "/testDir/User/" 下所有 Key 的 Value, 相当于获取一个列表
gutResponse, err := kv.Gut(context.TODO(),"/testDir/User/", clientv3.WithPrefix())
同样,getResponse 也返回了很多信息,但最主要的是我们获取的 Key 的 Value,如果是使用第二种用法,会返回一个 KVs,是一个 KV 的数组切片。也包含了Header, Count, More, More是一个bool
值,指示是否有更多键可以返回要求的范围。 Count 返回返回数据的数量
3. KV 的 DELETE 操作
// 用法一: 第一个参数为上下文,第二个为 KEY,即可获取对应 Key 的 Value
deleteResponse, err := kv.Delete(context.TODO(),"/testDir/User/user1")
// delete 也可以删除 "/testDir/User/" 下所有的key,类似get的操作,但第三个参数要传 WithPrefix()
deleteResponse 返回 Header, Deleted,PrevKvs, Header中包含信息与之前差不多,后面的Deleted 是一个int64值,代表删除数量。PrevKvs 返回删除之前的 KV 值
五. ETCD 的租约机制
Etcd 中支持类似 Redis 中的 key 过期机制,使用这一功能配合其他etcd功能可以实现非常多的强大的功能,例如:分布式锁,服务发现等。
1. 获取租约实例
> 要使用 etcd 的租约需要获得租约的 Lease 实例,我们先创建一个:
// 使用clientv3 创建一个lease的实例,传入 etcd 的 client 实例
lease := clientv3.New(client)
2. 申请租约
获得实例申请一个租约,然后拿到租约 ID
// 申请租约使用lease实例的Grant方法,第一个参数还是上下文,第二个是TTL,过期时间
grant, err := lease,Grant(context.TODO(), 5)
// 拿到租约 ID, 拿到这个租约 ID 之后,就可以使用 kv 实例进行 put 操作,加上option参数的 WithLease, 就可以给一个 key 设置过期时间
leaseID := grant.ID
3. 续租
既然是租约,当然是可以续约的,续租有两种方式,一种是自动续租,一种是手动续租。
自动续租
// 自动续租时需要传入要续租的租约 ID,lease的 KeepAlive 会启动一个协程执行自动续约,每次续约事件是我们第一次申请租约时设置的时间。
aliveChan, err := lease.KeepAlive(context.TODO(), leaseID)
> 自动续租返回一个续租的结果,是一个channel,里面放着续租应答。下面是一个完整的续租代码
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Println(err.Error())
}
// Get a lease instance
lease := clientv3.NewLease(cli)
grant, err := lease.Grant(context.TODO(), 10)
if err != nil {
fmt.Println(err.Error())
return
}
// lease id
leaseID := grant.ID
// 自动续租
alive, err := lease.KeepAlive(context.TODO(), leaseID)
if err != nil {
fmt.Println(err)
return
}
// 处理续租应答的协程
go func() {
for {
select {
case res := <-alive:
if res == nil {
fmt.Println("租约已经失效")
goto END
} else {
fmt.Println("自动续租应答:", res.ID)
}
}
}
END:
}()
// Get KV of client
kv := clientv3.NewKV(cli)
put, err := kv.Put(context.TODO(), "/testDir/User/user1", "11", clientv3.WithLease(leaseID))
if err != nil {
fmt.Println(err)
return
}
fmt.Println("写入成功:", put.Header.Revision)
for {
get, err := kv.Get(context.TODO(),"/testDir/User/user1)
if err != nil {
fmt.Println(err)
return
}
if get.Count == 0 {
fmt.Println("租约过期了")
break
}
fmt.Println("还没过期", get.Kvs)
time.Sleep(time.Second * 2)
}
}
手动续租就不去记录了。
六. ETCD 的 WATCHER
etcd 的 Watcher 可以监听指定的 key 的各种操作,我们先获取 watcher 实例
// 获取 watcher 实例的方法跟之前的 lease 和 kv 一样
watcher := clientv3.NewWatcher(cli)
拿到 watcher 实例之后,还需要指定从哪个 Revision 开始监听哪个 Key,所以还需要拿一个Key的Revision。比如我们还是想监听 “/testDir/User/”下面所有Key的变化,我们可以这样。
// 先 Get 我们需要监听的 "/testDir/User/" 的getResponse,
get, err = jm.Kv.Get(context.TODO(), "/testDir/User/", clientv3.WithPrefix())
// 从get操作的下一个Revision开始监听,这就是下一个Revision
watchStartRevision = get.Header.Revision + 1
// 然后使用watcher对象对这个目录进行监听
watchChan := watcher.Watch(context.TODO(), "/testDir/User/", clientv3.WithRev(watchStartRevision), clientv3.WithPrefix())
// 监听后返回一个 Channel 里面传回监听到的 "/testDir/User/" 下面的 key 的事件变化,下面是对事件变化的处理
for w := range watchChan {
for _, event := range w.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "revision:", event.Kv.CreateRevision)
case mvccpb.DELETE:
fmt.Println("删除:", event.Kv.ModRevision)
}
}
}
七. ETCD 分布式锁原理
因为etcd使用Raft算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序。
保持独占即所有获取锁的用户最终只有一个可以得到。etcd为此提供了一套实现分布式锁原子操作CAS(CompareAndSwap)的API。通过设置prevExist值,可以保证在多个节点同时去创建某个目录时,只有一个成功。而创建成功的用户就可以认为是获得了锁。
控制时序,即所有想要获得锁的用户都会被安排执行,但是获得锁的顺序也是全局唯一的,同时决定了执行顺序。etcd为此也提供了一套API(自动创建有序键),对一个目录建值时指定为POST动作,这样etcd会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用API按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号。
八. ETCD 的服务发现
服务发现要解决的也是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立连接。本质上来说,服务发现就是想要了解集群中是否有进程在监听udp或tcp端口,并且通过名字就可以查找和连接。通过服务发现机制,在etcd中注册某个服务名字的目录,在该目录下存储可用的服务节点的IP。在使用服务的过程中,只要从服务目录下查找可用的服务节点去使用即可。
本作品采用《CC 协议》,转载必须注明作者和本文链接