源码解析|registry meta 选主

编辑
更新时间: 2024-04-10 · Webster-Yang

背景

SOFARgeistry 分为 Session、Data 和 Meta 三个模块。Session 模块用于 client 交互,可以横向扩容,可以承受大量 client 连接请求;Data 是数据存储模块,利用 slot 分片机制来均衡负载,使用备份来解决高可用问题;Meta 是 Session、Data 的注册中心,采用分布式锁来选举 leader,本文详细阐述 Meta 如何选主。

基于 MySQL 的分布式锁

MySQL 表

drop table if exists distribute_lock;
CREATE TABLE distribute_lock (
  id bigint(20) NOT NULL AUTO_INCREMENT primary key,
  data_center varchar(128) NOT NULL,
  lock_name varchar(1024) NOT NULL,
  owner varchar(512) NOT NULL,
  duration bigint(20) NOT NULL,
  term     bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '任期',
  term_duration    bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '租期',
  gmt_create timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
  gmt_modified timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
  UNIQUE KEY `uk_data_center_lock` (`data_center`, `lock_name`),
  KEY `idx_lock_owner` (`owner`)
);

表的增改查等操作

public interface DistributeLockMapper {

  /**
   * query by dataCenter and lockName
   *
   * @param dataCenter
   * @param lockName
   * @return
   */
  public DistributeLockDomain queryDistLock(
      @Param("dataCenter") String dataCenter, @Param("lockName") String lockName);

  /**
   * compete lock, it will throw exception if lockName existed
   *
   * @param lock
   */
  public void competeLockOnInsert(DistributeLockDomain lock) throws Exception;

  /**
   * compete lock with cas
   *
   * @param competeLock
   * @return
   */
  public void competeLockOnUpdate(FollowCompeteLockDomain competeLock);

  /** renew lock last update time */
  public void ownerHeartbeat(DistributeLockDomain lock);

  /** force reset owner and duration */
  public void forceRefresh(DistributeLockDomain lock);
}

整体流程

step1:启动时创建锁记录,

<insert id="competeLockOnInsert" parameterType="com.alipay.sofa.registry.jdbc.domain.DistributeLockDomain">
        <![CDATA[
       INSERT /*+ QUERY_TIMEOUT(2000000) */ INTO distribute_lock
       (
           data_center,
           lock_name,
           owner,
           duration,
           gmt_create,
           gmt_modified,
           `term`,
           `term_duration`
       )
       VALUES
       (
           #{dataCenter},
           #{lockName},
           #{owner},
           #{duration},
           NOW(3),
           NOW(3),
           1,
           0
       )
       ON DUPLICATE KEY UPDATE lock_name = #{lockName}
       ]]>
    </insert>

step2:leader 每秒提交心跳,更新表

    <update id="ownerHeartbeat" parameterType="com.alipay.sofa.registry.jdbc.domain.DistributeLockDomain">
        <![CDATA[
          update /*+ QUERY_TIMEOUT(2000000) */ distribute_lock set owner = #{owner}, gmt_modified =  NOW(3), `term_duration` = (`term_duration` + 1)
          where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner} and term = #{term}  and `term_duration` = #{termDuration} and timestampdiff(SECOND, gmt_modified, NOW()) < #{duration}/1000
        ]]>
    </update>

step3:follower 每秒判断锁是否过期,如果过期,则 cas 竞选 leader

  public DistributeLockDomain onFollowWorking(DistributeLockDomain lock, String myself) {
    /** as follow, do compete if lock expire */
    if (lock.expire()) {
      LOG.info("lock expire: {}, meta elector start: {}", lock, myself);
      distributeLockMapper.competeLockOnUpdate(
          new FollowCompeteLockDomain(
              lock.getDataCenter(),
              lock.getLockName(),
              lock.getOwner(),
              lock.getGmtModified(),
              myself,
              lock.getDuration(),
              lock.getTerm(),
              lock.getTermDuration()));
      DistributeLockDomain newLock =
          distributeLockMapper.queryDistLock(lock.getDataCenter(), lock.getLockName());
      LOG.info("elector finish, new lock: {}", lock);
      return newLock;
    }
    return lock;
  }
  public boolean expire() {

    return gmtDbServerTime.getTime() > gmtModified.getTime() + duration;
  }
  <update id="competeLockOnUpdate">
        <!-- update cas with dataCenter,lockName,owner,gmtModified  -->
        <![CDATA[
       update /*+ QUERY_TIMEOUT(2000000) */ distribute_lock set owner = #{newOwner}, gmt_modified =  NOW(3), term = (term + 1) , `term_duration` = 0
       where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner} and term = #{term} and `term_duration` = #{termDuration} and timestampdiff(SECOND, gmt_modified, NOW()) > #{duration}/1000
       ]]>
    </update>

step4:如果 leader 发生切换,通知 xxx

时序图

💡 Tips:输入/画板或点击上方工具栏img,选择「画板」、绘制流程图、架构图等各种图形。

img

类图

img

主要源代码解析

  public void elect() {
    synchronized (this) {
      if (isObserver) {//如果是Observer,不参与选主
        leaderInfo = doQuery();
      } else {
        leaderInfo = doElect();
      }

      if (amILeader()) {//我是leader
        onIamLeader();//我从follower变成leader,通知xxx
      } else {//我不是leader
        onIamNotLeader();//我从leader变成follower,通知xxx
      }
    }
  }
@Override
  protected LeaderInfo doElect() {
    //1、查询锁
    DistributeLockDomain lock =
        distributeLockMapper.queryDistLock(defaultCommonConfig.getClusterId(tableName()), lockName);
   //2、不存在则创建锁
    /** compete and return leader */
    if (lock == null) {
      return competeLeader(defaultCommonConfig.getClusterId(tableName()));
    }
   //3、判断角色
    ElectorRole role = amILeader(lock.getOwner()) ? ElectorRole.LEADER : ElectorRole.FOLLOWER;
    if (role == ElectorRole.LEADER) {
      lock = onLeaderWorking(lock, myself());//4、提交心跳
    } else {
      lock = onFollowWorking(lock, myself());//5、判断过期与否,如过期,则cas竞争锁
    }
    LeaderInfo result = leaderFrom(lock);//6、锁信息转换为LeaderInfo
    LOG.info("meta role : {}, leaderInfo: {}", role, result);
    return result;
  }

Meta 集群 leader 变更

1、初始化

ip 角色 备注
10.177.41.99 follower
10.177.41.100 leader
10.181.152.223 follower

img

img

2、kill 10.177.41.100 节点后

ip 角色 备注
10.177.41.99 follower competeLockOnUpdate 执行失败,即 term 、term_duration 匹配不上,sql 没有修改行数据
10.177.41.100 kill
10.181.160.18 leader competeLockOnUpdate 执行成功,即 term 、term_duration 匹配的上,即当选 leader,之后 term 递增

img

img

3、kill 10.181.160.18 节点后

ip 角色 备注
10.177.41.99 leader competeLockOnUpdate 执行成功,当选 leader
10.177.41.100 kill
10.181.160.18 kill

img

img