1. etcd 架构
网络层:提供网络数据读写功能,监听服务端口,完成集群节点之间数据通信,收发客户端数据。
Raft模块:Raft强一致性算法的具体实现。
存储模块:涉及KV存储、WAL文件、Snapshot管理等,用户处理etcd支持的各类功能的事务,包括数据索引 节点状态变更、监控与反馈、事件处理与执行 ,是 etcd 对用户提供的大多数 API 功能的具体实现。
复制状态机:这是一个抽象的模块,状态机的数据维护在内存中,定期持久化到磁盘,每次写请求都会持久化到 WAL 文件,并根据写请求的内容修改状态机数据。
1.1. etcd 整体架构
从大体上可以将其划分为以下4个模块
- http:负责对外提供http访问接口和http client
- raft 状态机:根据接受的raft消息进行状态转移,调用各状态下的动作。
- wal 日志存储:持久化存储日志条目。
- kv数据存储:kv数据的存储引擎,v3支持不同的后端存储,当前采用boltdb。通过boltdb支持事务操作。 相对于v2,v3的主要改动点为:
- 使用grpc进行peer之间和与客户端之间通信
- v2的store是在内存中的一棵树,v3采用抽象了一个kvstore,支持不同的后端存储数据库。增强了事务能力。 去除单元测试代码,etcd v2的代码行数约40k,v3的代码行数约70k。
典型内部处理流程
我们将上面架构图的各个部分进行编号,以便下文的处理流程介绍中,对应找到每个流程处理的组件位置。
- 消息入口 一个etcd节点运行以后,有3个通道接收外界消息,以kv数据的增删改查请求处理为例,介绍这3个通道的工作机制。
- client的http调用:会通过注册到http模块的keysHandler的ServeHTTP方法处理。解析好的消息调用EtcdServer的Do()方法处理。(图中2)
- client的grpc调用:启动时会向grpc server注册quotaKVServer对象,quotaKVServer是以组合的方式增强了kvServer这个数据结构。grpc消息解析完以后会调用kvServer的Range、Put、DeleteRange、Txn、Compact等方法。kvServer中包含有一个RaftKV的接口,由EtcdServer这个结构实现。所以最后就是调用到EtcdServer的Range、Put、DeleteRange、Txn、Compact等方法。(图中1)
节点之间的grpc消息:每个EtcdServer中包含有Transport结构,Transport中会有一个peers的map,每个peer封装了节点到其他某个节点的通信方式。包括streamReader、streamWriter等,用于消息的发送和接收。streamReader中有recvc和propc队列,streamReader处理完接收到的消息会将消息推到这连个队列中。由peer去处理,peer调用raftNode的Process方法处理消息。(图中3、4)
EtcdServer消息处理 对于客户端消息,调用到EtcdServer处理时,一般都是先注册一个等待队列,调用node的Propose方法,然后用等待队列阻塞等待消息处理完成。Propose方法会往propc队列中推送一条MsgProp消息。 对于节点间的消息,raftNode的Process是直接调用node的step方法,将消息推送到node的recvc或者propc队列中。 可以看到,外界所有消息这时候都到了node结构中的recvc队列或者propc队列中。(图中5)
node处理消息 node启动时会启动一个协程,处理node的各个队列中的消息,当然也包括recvc和propc队列。从propc和recvc队列中拿到消息,会调用raft对象的Step方法,raft对象封装了raft的协议数据和操作,其对外的Step方法是真正raft协议状态机的步进方法。当接收到消息以后,根据协议类型、Term字段做相应的状态改变处理,或者对选举请求做相应处理。对于一般的kv增删改查数据请求消息,会调用内部的step方法。内部的step方法是一个可动态改变的方法,将随状态机的状态变化而变化。当状态机处于leader状态时,该方法就是stepLeader;当状态机处于follower状态时,该方法就是stepFollower;当状态机处于Candidate状态时,该方法就是stepCandidate。leader状态会直接处理MsgProp消息。将消息中的日志条目存入本地缓存。follower则会直接将MsgProp消息转发给leader,转发的过程是将先将消息推送到raft的msgs数组中。 node处理完消息以后,要么生成了缓存中的日志条目,要么生成了将要发送出去的消息。缓存中的日志条目需要进一步处理(比如同步和持久化),而消息需要进一步处理发送出去。处理过程还是在node的这个协程中,在循环开始会调用newReady,将需要进一步处理的日志和需要发送出去的消息,以及状态改变信息,都封装在一个Ready消息中。Ready消息会推行到readyc队列中。(图中5)
raftNode的处理 raftNode的start()方法另外启动了一个协程,处理readyc队列(图中6)。取出需要发送的message,调用transport的Send方法并将其发送出去(图中4)。调用storage的Save方法持久化存储日志条目或者快照(图中9、10),更新kv缓存。 另外需要将已经同步好的日志应用到状态机中,让状态机更新状态和kv存储,通知等待请求完成的客户端。因此需要将已经确定同步好的日志、快照等信息封装在一个apply消息中推送到applyc队列。
EtcdServer的apply处理 EtcdServer会处理这个applyc队列,会将snapshot和entries都apply到kv存储中去(图中8)。最后调用applyWait的Trigger,唤醒客户端请求的等待线程,返回客户端的请求。
重要的数据结构 1.EtcdServer: 是整个etcd节点的功能的入口,包含etcd节点运行过程中需要的大部分成员。
type EtcdServer struct { // 当前正在发送的snapshot数量 inflightSnapshots int64 //已经apply到状态机的日志index appliedIndex uint64 //已经提交的日志index,也就是leader确认多数成员已经同步了的日志index committedIndex uint64 //已经持久化到kvstore的index consistIndex consistentIndex //配置项 Cfg _ServerConfig //启动成功并注册了自己到cluster,关闭这个通道。 readych chan struct{} //重要的数据结果,存储了raft的状态机信息。 r raftNode //满多少条日志需要进行snapshot snapCount uint64 //为了同步调用情况下让调用者阻塞等待调用结果的。 w wait.Wait //下面3个结果都是为了实现linearizable 读使用的 readMu sync.RWMutex readwaitc chan struct{} readNotifier _notifier //停止通道 stop chan struct{} //停止时关闭这个通道 stopping chan struct{} //etcd的start函数中的循环退出,会关闭这个通道 done chan struct{} //错误通道,用以传入不可恢复的错误,关闭raft状态机。 errorc chan error //etcd实例id id types.ID //etcd实例属性 attributes membership.Attributes //集群信息 cluster _membership.RaftCluster //v2的kv存储 store store.Store //用以snapshot snapshotter _snap.Snapshotter //v2的applier,用于将commited index apply到raft状态机 applyV2 ApplierV2 //v3的applier,用于将commited index apply到raft状态机 applyV3 applierV3 //剥去了鉴权和配额功能的applyV3 applyV3Base applierV3 //apply的等待队列,等待某个index的日志apply完成 applyWait wait.WaitTime //v3用的kv存储 kv mvcc.ConsistentWatchableKV //v3用,作用是实现过期时间 lessor lease.Lessor //守护后端存储的锁,改变后端存储和获取后端存储是使用 bemu sync.Mutex //后端存储 be backend.Backend //存储鉴权数据 authStore auth.AuthStore //存储告警数据 alarmStore _alarm.AlarmStore //当前节点状态 stats _stats.ServerStats //leader状态 lstats _stats.LeaderStats //v2用,实现ttl数据过期的 SyncTicker _time.Ticker //压缩数据的周期任务 compactor _compactor.Periodic //用于发送远程请求 peerRt http.RoundTripper //用于生成请求id reqIDGen _idutil.Generator // forceVersionC is used to force the version monitor loop // to detect the cluster version immediately. forceVersionC chan struct{} // wgMu blocks concurrent waitgroup mutation while server stopping wgMu sync.RWMutex // wg is used to wait for the go routines that depends on the server state // to exit when stopping the server. wg sync.WaitGroup // ctx is used for etcd-initiated requests that may need to be canceled // on etcd server shutdown. ctx context.Context cancel context.CancelFunc leadTimeMu sync.RWMutex leadElectedTime time.Time } 2. raftNode:raft状态机,维护raft状态机的步进和状态迁移。
type raftNode struct { // Cache of the latest raft index and raft term the server has seen. // These three unit64 fields must be the first elements to keep 64-bit // alignment for atomic access to the fields. //状态机当前状态,index代表当前已经apply到状态机的日志index,term是最新日志条目的term,lead是当前的leader id index uint64 term uint64 lead uint64 //包含了node、storage等重要数据结构 raftNodeConfig // a chan to send/receive snapshot msgSnapC chan raftpb.Message // a chan to send out apply applyc chan apply // a chan to send out readState readStateC chan raft.ReadState // utility ticker _time.Ticker // contention detectors for raft heartbeat message td _contention.TimeoutDetector stopped chan struct{} done chan struct{} } 3. node:包含在raftNode中,是Node接口的实现。里面包含一个协程和多个队列,是状态机消息处理的入口。
type node struct { //Propose队列,调用raftNode的Propose即把Propose消息塞到这个队列里 propc chan pb.Message //Message队列,除Propose消息以外其他消息塞到这个队列里 recvc chan pb.Message //集群配置信息队列,当集群节点改变时,需要将修改信息塞到这个队列里 confc chan pb.ConfChange //外部通过这个队列获取修改后集群配置信息 confstatec chan pb.ConfState //已经准备好apply的信息队列 readyc chan Ready //每次apply好了以后往这个队列里塞个空对象。通知可以继续准备Ready消息。 advancec chan struct{} //tick信息队列,用于调用心跳 tickc chan struct{} done chan struct{} stop chan struct{} status chan chan Status logger Logger }