源码解析|无损运维

编辑
更新时间: 2022-05-06 · 周书伟

源码解析:无损运维

SOFARegistry 是一个基于内存存储的分布式注册中心,数据是分散存储在各个节点,为了做到注册中心自身运维期间依然能够对外正常提供服务,需要进行节点下线快速感知和数据迁移。

  • session 存储 client 发送的发布和订阅数据,由于 client 断连重放的特性,session 单机下线后 client 会重放数据到其他节点。

  • data 会接收 session 发送的发布数据,data 单机下线后,meta 会通过 SlotTable 的变更把对应 Slot 的所有权移交其他 data,data 启动后会进行数据同步到达数据完整可对外提供服务的状态。

session、data下线的过程,如何避免下线期间数据丢失和抖动

SessionServer 和 DataServer 下线的相关代码都位于各自 bootstrap 类的 dostop 方法中。

首先讨论 SessionServer 的下线过程:

private void doStop() {
  try {
    LOGGER.info("{} Shutting down Session Server..", new Date().toString());
    stopHttpServer();
    clientNodeConnectionHandler.stop(); // stop process disconnect event
    stopServer();
    // stop http server and client bolt server before add blacklist
    // make sure client reconnect to other sessions and data
    gracefulShutdown();
    stopDataSyncServer();
    stopConsoleServer();
    executorManager.stopScheduler();
  } catch (Throwable e) {
    LOGGER.error("Shutting down Session Server error!", e);
  }
  LOGGER.info("{} Session server is now shutdown...", new Date().toString());
}

从代码中可以看出,SessionServer 服务在下线时,首先会停掉 HttpServer,HttpServer 提供一系列 REST 接口,用于 dashboard 管理、数据查询等;然后停掉 clientNodeConnectionHandler,clientNodeConnectionHandler 是 Exchange 设置的一系列 ChannelHandler 之一,Exchange 作为 Client / Server 连接的抽象,负责节点之间的连接。gracefulShutdown 方法则是通知 Meta(元数据服务器)将本节点加入 Meta 的黑名单中。HttpServer 和客户端的 bolt Server 是在本节点加入黑名单前关停,以保证 client 已经重连到了其他 session 节点上。这样就保证了运维的 session 节点下线期间,client 的数据不会因为 session 节点的不可用,导致数据丢失和抖动。接下来又依次关停了 DataSyncServer,ConsoleServer。

DataServer 下线过程的代码如下:

private void doStop() {
  try {
    LOGGER.info("{} Shutting down Data Server..", new Date().toString());

    gracefulShutdown();

    stopHttpServer();
    stopServer();
    stopDataSyncServer();
    stopNotifyServer();
  } catch (Throwable e) {
    LOGGER.error("Shutting down Data Server error!", e);
  }
  LOGGER.info("{} Data server is now shutdown...", new Date().toString());
}

DataServer 的下线则和 Session 的下线有些区别,由于 DataServer 数据服务器,负责存储具体的服务数据,而且 Slot 均匀地分配给每个节点上,所以下线前需要检测 DataServer 上的插槽状态,所以 doStop 的方法中,首先调用了如下的优雅下线的方法,其中代码中的主要内容如下:

addBlacklistRetryer.call(
    () -> {
      LOGGER.info("[GracefulShutdown] add self to blacklist");
      metaServerService.addSelfToMetaBlacklist();
      return true;
    });
addBlacklistRetryer.call(
    () -> {
      if (fetchStopPushService.isStopPushSwitch()) {
        return true;
      }
      SlotTableStatusResponse statusResponse = metaServerService.getSlotTableStatus();
      if (statusResponse.isProtectionMode()) {
        return true;
      }
      LOGGER.info("[GracefulShutdown] wait no slot");
      if (slotManager.hasSlot()) {
        throw new RuntimeException("current data server still own slot, waiting...");
      }
      return true;
    });
LOGGER.info("add data self to blacklist successfully");

首先节点通知 Meta 加入黑名单,stopmetaServerService.getSlotTableStatus() 获取节点上 SlotTable 的状态,当 Slot 重新分配给其他节点后,该 Data 节点才会成功加入黑名单,并进行接下来 HttpServer、DataSyncServer、NotifyServer 的下线动作。

整体流程是 Data 上的 Slot 逐步迁移,期间仍然对外提供服务。

迁移完成后主动从列表中剔除并设置节点黑名单并下线。

以下图为例,假设要下线的节点是 DataServer-1:将 Data-server 添加到注册中心迁移名单中;再将 Slot-2 对应的 follower(DataServer-2)提升为 Leader;等待 Data 节点达到稳定;继续将 Slot-3 对应的 follower(DataServer-3) 提升为 Leader;按照以上步骤分配掉剩余的 follower Slots;Slot 迁移完毕,Data-server 从 Data 列表中剔除,同时添加为黑名单;从迁移名单中设置为 succeed。

img.png

Session 和 Data下线中的优雅关闭和数据迁移保证了下线期间数据丢失和抖动,期间仍然对外提供服务。

Meta 如何处理 Session、Data 主动下线的请求通知

Meta 集群服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。RegistryForbiddenServerHandler 注册在sessionServerHandlers、dataServerHandlers :

@Bean(name = "sessionServerHandlers")
public Collection<AbstractServerHandler> sessionServerHandlers() {
  Collection<AbstractServerHandler> list = new ArrayList<>();
  list.add(heartbeatRequestHandler());
  list.add(fetchProvideDataRequestHandler());
  list.add(fetchSystemPropertyRequestHandler());
  list.add(registryForbiddenServerHandler());
  list.add(getSlotTableStatusRequestHandler());
  return list;
}

@Bean(name = "dataServerHandlers")
public Collection<AbstractServerHandler> dataServerHandlers() {
  Collection<AbstractServerHandler> list = new ArrayList<>();
  list.add(heartbeatRequestHandler());
  list.add(fetchProvideDataRequestHandler());
  list.add(registryForbiddenServerHandler());
  list.add(fetchSystemPropertyRequestHandler());
  list.add(getSlotTableStatusRequestHandler());
  return list;
}

当有 Data 和 Session 上下线时,会触发 registryForbiddenServerHandler,具体的 doHandle 方法将下线的节点添加到黑名单中:

boolean success = false;
switch (operation) {
  case ADD:
    success = registryForbiddenServerManager.addToBlacklist(ip);
    break;
  case REMOVE:
    success = registryForbiddenServerManager.removeFromBlacklist(ip);
    break;
  default:
    break;
}

Meta 的心跳检测 HeartbeatRequestHandler 会在心跳检测时,获取 Session 的状态,

@Override
public VersionedList<SessionNode> getSessionServerMetaInfo() {
  VersionedList<Lease<SessionNode>> leaseMetaInfo = getLeaseMeta();
  List<SessionNode> sessionNodes = Lists.newArrayList();
  leaseMetaInfo
      .getClusterMembers()
      .forEach(
          lease -> {
            sessionNodes.add(lease.getRenewal());
          });
  return new VersionedList<>(leaseMetaInfo.getEpoch(), sessionNodes);
}

获取 Meta 租约中最新的 Session 节点的信息,更新 Session 节点的信息。

Data 的节点信息也会通过类似的过程更新到 Meta 中,MetaServerManager 会根据节点类型,同步节点的在线信息:

@Override
public VersionedList<DataNode> getDataServerMetaInfo() {
  VersionedList<Lease<DataNode>> leaseMetaInfo = getLeaseMeta();
  List<DataNode> dataNodes = Lists.newArrayList();
  leaseMetaInfo
      .getClusterMembers()
      .forEach(
          lease -> {
            dataNodes.add(lease.getRenewal());
          });
  return new VersionedList<>(leaseMetaInfo.getEpoch(), dataNodes);
}

client 重连回放机制

public void ensureConnected() throws InterruptedException {
  if (isConnected()) {
    return;
  }
  while (!connect()) {
    Thread.sleep(ClientConnection.RECONNECTING_DELAY);
  }
}

在 client 的工作线程中,会通过上述代码以 5s 为间隔时间,反复检查链接状态,client 的检查函数位于工作线程的一个无限循环的检查语句中:

/** Handle. */
@Override
public void handle() {
  //noinspection InfiniteLoopStatement
  while (true) {
    try {
      // check connection status, try to reconnect to the server when connection lose
      client.ensureConnected();

      if (requestQueue.isEmpty()) {
        await(config.getRecheckInterval());
        continue;
      }

      Iterator<TaskEvent> lt = requestQueue.iterator();

      while (lt.hasNext()) {
        client.ensureConnected();
        TaskEvent ev = lt.next();
        lt.remove();
/**.........**/

Data 启动期以及 Data 的启动过程的状态转化,如何确保新分配的 Slot 的数据完整性

启动入口

Data 启动的过程代码位于com.alipay.sofa.registry.server.data.bootstrap.DataServerBootstrap#start

/** start dataserver */
public void start() {
  try {
    LOGGER.info("begin start server");
    LOGGER.info("release properties: {}", ServerEnv.getReleaseProps());
    LOGGER.info("the configuration items are as follows: " + dataServerConfig.toString());

    ReporterUtils.enablePrometheusDefaultExports();

    openDataServer();

    openDataSyncServer();

    openHttpServer();

    renewNode();
    fetchProviderData();

    systemPropertyProcessorManager.startFetchMetaSystemProperty();

    startScheduler();

    TaskMetrics.getInstance().registerBolt();

    postStart();
    Runtime.getRuntime().addShutdownHook(new Thread(this::doStop));

    LOGGER.info("start server success");
  } catch (Throwable e) {
    throw new RuntimeException("start server error", e);
  }
}

DataServer 模块的各个 bean 在 JavaConfig 中统一配置,JavaConfig 类为 DataServerBeanConfiguration, 启动入口类为 DataServerInitializer,该类不由 JavaConfig 管理配置,而是继承了 SmartLifecycle 接口,在启动时由 Spring 框架调用其 start 方法。

该方法中调用了 DataServerBootstrap#start 方法(如上述代码所示),用于启动一系列的初始化服务。

sofaRegistry V6 版本引入了“路由表”(SlotTabel)的概念,“路由表”负责存放每个节点和 N 个 Slot 的映射关系,并保证尽量把所有 Slot 均匀地分配给每个节点。这样,当节点上下线时,只需要修改路由表内容即可。

从代码中可以看出,DataServer 服务在启动时,会启动 DataServer、DataSyncServer、HttpServer 三个 bolt 服务。在启动这些 Server 之时,DataServer 注册了一系列 Handler 来处理各类消息。启动这些 Server 后,调用了renewNode()方法,renewNode 调用 metaServerService的 renewNode,通过获取在心跳请求中将返回 SlotTable 路由表信息、Data 节点将路由表 SlotTable 保存在本地中,具体代码可以参考com.alipay.sofa.registry.server.meta.remoting.meta.MetaServerRenewService#renewNode,下一步又刷新了 Session 的信息,并创建了一个 renew meta 的守护进程,用来持续刷新节点上存储的集群信息:

  private void renewNode() {
    metaServerService.renewNode();
    // init session lease with first renew
    for (ProcessId processId : metaServerService.getSessionProcessIds()) {
      sessionLeaseManager.renewSession(processId);
    }
    metaServerService.startRenewer();
  }

slotManagerImpl 是 DataServer 的一个单例 ConditionalOnMissingBean 对象。当 Data 启动时,SlotManage 负责 Slot 数据的更新和迁移(com.alipay.sofa.registry.server.data.slot.SlotManagerImpl#syncMigrating):

private void syncMigrating(
    SlotState slotState,
    Collection<String> sessions,
    int syncSessionIntervalMs,
    long slotTableEpoch) {
  final Slot slot = slotState.slot;
  if (slotState.migratingStartTime == 0) {
    slotState.migratingStartTime = System.currentTimeMillis();
    slotState.migratingTasks.clear();
    observeLeaderMigratingStart(slot.getId());
    LOGGER.info(
        "start migrating, slotId={}, sessionSize={}, sessions={}",
        slotState.slotId,
        sessions.size(),
        sessions);
  }
  final int notSyncedCount = sessions.size() - slotState.countSyncSuccess(sessions);
  for (String sessionIp : sessions) {
    MigratingTask mtask = slotState.migratingTasks.get(sessionIp);
    if (mtask == null) {
      KeyedTask<SyncSessionTask> ktask =
          commitSyncSessionTask(slot, slotTableEpoch, sessionIp, null, true);
      mtask = new MigratingTask(sessionIp, ktask);
      slotState.migratingTasks.put(sessionIp, mtask);
      LOGGER.info("migrating start,slotId={},session={}", slot.getId(), sessionIp);
      continue;
    }

    if (mtask.task.isFailed() && !mtask.forceSuccess) {
      // failed and not force Success, try to trigger emergency
      if (triggerEmergencyMigrating(slotState, sessions, mtask, notSyncedCount)) {
        LOGGER.info("[emergency]{},session={}", slotState.slotId, mtask.sessionIp);
      } else {
        KeyedTask<SyncSessionTask> ktask =
            commitSyncSessionTask(slot, slotTableEpoch, sessionIp, null, true);
        mtask.task = ktask;
        mtask.tryCount++;
        LOGGER.error(
            "migrating retry,slotId={},try={},session={},create={}/{}",
            slot.getId(),
            mtask.tryCount,
            sessionIp,
            mtask.createTimestamp,
            System.currentTimeMillis() - mtask.createTimestamp);
        continue;
      }
    }
    // force success or migrating finish. try to sync session
    // avoid the time of migrating is too long and block the syncing of session
    if (mtask.task.isOverAfter(syncSessionIntervalMs)) {
      if (syncSession(slotState, sessionIp, null, syncSessionIntervalMs, slotTableEpoch)) {
        LOGGER.info("slotId={}, sync session in migrating, session={}", slot.getId(), sessionIp);
      }
    }
  }
}

Slot 的数据的同步是一个 watchDog 的模式,获取 slotTabel 的变化,会自动同步节点上的 Slot 数据:

  void syncWatch() {
    final int syncSessionIntervalMs =
        dataServerConfig.getSlotLeaderSyncSessionIntervalSecs() * 1000;
    final int syncLeaderIntervalMs =
        dataServerConfig.getSlotFollowerSyncLeaderIntervalSecs() * 1000;
    final long slotTableEpoch = slotTableStates.table.getEpoch();
    for (SlotState slotState : slotTableStates.slotStates.values()) {
      try {
        sync(slotState, syncSessionIntervalMs, syncLeaderIntervalMs, slotTableEpoch);
      } catch (Throwable e) {
        SYNC_ERROR_LOGGER.error(
            "[syncCommit]failed to do sync slot {}, migrated={}",
            slotState.slot,
            slotState.migrated,
            e);
      }
    }
  }

由于节点上线后,Meta 会感知并获取最新的 DataServer 节点,leader Meta 对路由表进行了刷新,因为重新分配 SlotTable 本质上是 对 DataServer 节点和 Slot 槽位之间的映射关系进行重新分配。获取到当前存活的 DataServer 节点信息,从而方便的对之进行 Slot 分配。接下来对 Server 节点的 Slot 数据进行了同步。

上面的两部分代码总体可以概括为以下两个层次:

  • Data 和 Session 通过心跳的机制在 Meta 上进行续约,当 Data 发生节点变更的时候,Meta 此时会重新进行分配,生成新的 SlotTable,并通过广播和心跳的方式返回所有的节点,Session 就会用这份 SlotTable 寻址新的 Data 节点。存在一个时刻,集群中存在两份 SlotTable,分裂时间最大(1s)。

  • Session 上会缓存 Client 的 Pub 和 Sub 数据作为基准数据,在增量发送给 Data 的同时,Slot 的 leader 节点会定时和 Session 进行数据比对,Slot 的 follower 和 leader 也会定时进行数据对比,这样做到整个集群数据能快速达到最终一致,减少异常场景的不可服务时间。

Slot 数据完整度校验的作用在于当一个 Slot 数据所有副本所在节点全部宕机,数据发生丢失时,只能通过 Session 上的缓存数据进行数据回放。但在完全回放完成时,Slot 是不可对外提供读取服务的,以此避免推空或者推错,从而保证了数据的完整性。

流程如下图:

server 启动的最后一步,是通知 Meta 将上线的 Data 节点从 blacklist 移除:

private void postStart() throws Throwable {
  startupRetryer.call(
      () -> {
        LOGGER.info("successful start data server, remove self from blacklist");
        metaServerService.removeSelfFromMetaBlacklist();
        return true;
      });
}

至此,Server 的启动完成,Data 节点的 Slot 数据也根据变更后的 slotTable 同步完成,Data 节点开始在集群内工作。