分布式情况下最终都会面临一个资源抢占的问题,解决问题的方法为抽象一个分布式锁,持有锁则可以操作资源,本文使用etcd实现一个分布式锁

简介

在正式使用之前,先简单介绍一下etcd的clientv3,etcd的client和服务器通信在v3版本为grpc,所以我们在使用时实际上就是使用grpc和服务器通信.

etcd的github地址为: https://github.com/etcd-io/etcd

在github中源代码中存在两个目录:

$ tree -L 1
├── alarm
.....
├── client
├── clientv3
.....

其中 clientv3为我们需要使用的client,本文的主角就是clientv3/concurrency

下载依赖

在明确我们使用的包后,我们直接依赖clientv3包到我们的工程中.

go get github.com/etcd-io/etcd/clientv3

如果需要梯子,请参考: 终端中设置代理

连接到etcd

在很多环境中我们启动etcd都是通过配置tls方式进行的,所以在连接etcd的时候需要使用tls的方式连接(可是百度上很多文章居然都没写.),具体的连接方式如下:

	tlsInfo := transport.TLSInfo{
		CertFile:      "etcd-v3.3.12-linux-amd64/etcd.pem",
		KeyFile:       "etcd-v3.3.12-linux-amd64/etcd-key.pem",
		TrustedCAFile: "etcd-v3.3.12-linux-amd64/ca.pem",
	}
	tlsConfig, err := tlsInfo.ClientConfig()
	if err != nil {
		log.Fatal(err)
	}
	config := clientv3.Config{
		Endpoints: []string{"127.0.0.1:12379"},#此处为etcd server监听地址
		TLS:       tlsConfig,
	}
	client, e := clientv3.New(config)
	if e != nil {
		log.Fatal(e.Error())
	}
	defer client.Close()

使用Mutex获取锁

在使用证书连接到etcd的server后,我们使用clientv3/concurrency包中的Mutex进行分布式锁

	# 生成一个30s超时的上下文
	timeout, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancelFunc()
	# 获取租约
	response, e := client.Grant(timeout, 30)
	if e != nil {
		log.Fatal(e.Error())
	}
	# 通过租约创建session
	session, e := concurrency.NewSession(client, concurrency.WithLease(response.ID))
	if e != nil {
		log.Fatal(e.Error())
	}
	defer session.Close()
	# 通过session和锁前缀
	mutex := concurrency.NewMutex(session, "/lock")
	e = mutex.Lock(timeout)
	if e != nil {
		log.Fatal(e.Error())
	}

	# 业务逻辑

	# 释放锁
	defer mutex.Unlock(timeout)

整体流程还是相对复杂的,接下来我们将一点一点解析:

1.生成30s超时的上下文(context)

锁的竞争是有时间的,不可能一直竞争下去,设定一个30s的超时时间,是让mutex.Lock()的时候有一个获取锁的时间,在此时间内,如果没有获取到锁则应该重试或者提示失败.

2.client.Grant获取租约

租约设置了一个和上下文一样的超时时间,保证租约有足够的时间

3.根据租约创建一个session回话,维护租约的过期时间

在client中抽象出了一个session对象来持续保持租约不过期,具体源码为:

//会话表示在客户端的生存期内保持活动的租约。
//应用程序可能会使用会话来解释活动性。
type Session struct {
	client *v3.Client
	opts   *sessionOptions
	id     v3.LeaseID

	cancel context.CancelFunc
	donec  <-chan struct{}
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
	ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
	for _, opt := range opts {
		opt(ops)
	}

	id := ops.leaseID
	if id == v3.NoLease {
		resp, err := client.Grant(ops.ctx, int64(ops.ttl))
		if err != nil {
			return nil, err
		}
		id = v3.LeaseID(resp.ID)
	}

	ctx, cancel := context.WithCancel(ops.ctx)
	keepAlive, err := client.KeepAlive(ctx, id)
	if err != nil || keepAlive == nil {
		cancel()
		return nil, err
	}

	donec := make(chan struct{})
	s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
	//在客户端错误或取消上下文之前保持租约的活动状态
	// keep the lease alive until client error or cancelled context
	go func() {
		defer close(donec)
		for range keepAlive {
            //在保持活动频道关闭前接收信息
			// eat messages until keep alive channel closes
		}
	}()

	return s, nil
}

其实就是在newSession中启动一个协程不断读取keepAlive这个channel的数据

4.concurrency.NewMutex创建一个锁,调用lock开始竞争锁

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
	s := m.s
	client := m.s.Client()

    //生成锁的key
	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    //使用事务机制
    //比较key的revision为0(0标示没有key)
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    //则put key,并设置租约
	// put self in lock waiters via myKey; oldest waiter holds lock
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    //否则 获取这个key,重用租约中的锁(这里主要目的是在于重入)
    //通过第二次获取锁,判断锁是否存在来支持重入
    //所以只要租约一致,那么是可以重入的.
	// reuse key in case this session already holds the lock
	get := v3.OpGet(m.myKey)
    //通过前缀获取最先创建的key
	// fetch current holder to complete uncontended path with only one RPC
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return err
	}
    //获取到自身的revision(注意,此处CreateRevision和Revision不一定相等)
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
    // 通过对比自身的revision和最先创建的key的revision得出谁获得了锁
    // 例如 自身revision:5,最先创建的key createRevision:3  那么不获得锁,进入waitDeletes
    //     自身revision:5,最先创建的key createRevision:5  那么获得锁
    
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	// 等待其他程序释放锁,并删除其他revisions
	// wait for deletion revisions prior to myKey
	hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	// release lock key if wait failed
	if werr != nil {
		m.Unlock(client.Ctx())
	} else {
		m.hdr = hdr
	}
	return werr
}
// 等待持有锁的key删除
// 内部实现为等其他所有比当前createRevision小的key,监听删除事件
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    //WithLastCreate 按照CreateRevision排序,降序 例如 5 4 3 2 1 
    //WithMaxCreateRev 获取比maxCreateRev小的key
	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
	for {
		resp, err := client.Get(ctx, pfx, getOpts...)
		if err != nil {
			return nil, err
		}
		if len(resp.Kvs) == 0 {
			return resp.Header, nil
		}
		lastKey := string(resp.Kvs[0].Key)
        //监听删除事件
		if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
			return nil, err
		}
	}
}
//从revision开始监听删除事件,因为revision存在,所以也避免了ABA问题
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
	cctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var wr v3.WatchResponse
	wch := client.Watch(cctx, key, v3.WithRev(rev))
	for wr = range wch {
		for _, ev := range wr.Events {
            //遇到删除事件才返回
			if ev.Type == mvccpb.DELETE {
				return nil
			}
		}
	}
	if err := wr.Err(); err != nil {
		return err
	}
	if err := ctx.Err(); err != nil {
		return err
	}
	return fmt.Errorf("lost watcher waiting for delete")
}

CreateRevision: 创建时的revision

Header.Revision: etcd server现在的revision

ABA问题: CAS和ABA问题

5.释放锁

这里释放锁,比较简单,就是删除此key

//释放锁
func (m *Mutex) Unlock(ctx context.Context) error {
	client := m.s.Client()
	if _, err := client.Delete(ctx, m.myKey); err != nil {
		return err
	}
	m.myKey = "\x00"
	m.myRev = -1
	return nil
}

revision

刚才说了那么多revision,这里我们需要详细的了解一下etcd中的mvcc多版本机制

那么我们先来明确几个概念:

main ID:在etcd中每个事务的唯一id,全局递增不重复.

sub ID: 在事务中的连续多个修改操作会从0开始编号,这个编号就是sub ID

revision: 由(mainID,subID)组成的唯一标识

所以现在我们的revision就是这么来的啦…

参照

etcd实现分布式锁流程

mutex.go

election.go