前言
此次源码解析均在 sofa-registry:6.1.4-SNAPSHOT 版本下分析
一、架构流程图
二、订阅流程
以客户端首次订阅,且服务发布方已注册的场景为例,订阅流程主要分为三步,
- 客户端发起订阅
- session server 处理订阅任务,从缓存(或 data server)拉取地址列表
- 向客户端推送地址列表
2.1 客户端发起订阅
客户端发起订阅的方式是异步的,首先将订阅注册的任务添加到客户端的内存队列中。
2.2 session server 处理订阅请求
- session server 接收到订阅注册任务后,主要是通过 SessionRegistry#regsiter 方法处理的,判断当前是服务消费方,添加到订阅者缓存中;
case SUBSCRIBER:
Subscriber subscriber = (Subscriber) storeData;
//
if (!sessionInterests.add(subscriber)) {
break;
}
sessionRegistryStrategy.afterSubscriberRegister(subscriber);
break;
- 触发RegProcessor#fireOnReg方法,将订阅者放入buffer中,参考源码如下:
boolean fireOnReg(Subscriber subscriber) {
final String dataInfoId = subscriber.getDataInfoId();
// 从若干个BufferWorker数组找到其中一个
BufferWorker worker = indexOf(subscriber.getDataInfoId());
// 将dataInfoId和subscriber存到BufferWorker线程中的subMap中
// subMap的key为dataInfoId,value为SubBuffer
SubBuffer buffer = worker.subMap.computeIfAbsent(dataInfoId, k -> new SubBuffer());
return buffer.add(subscriber);
}
2.3 session server 拉取地址列表
BufferWorker 线程循环处理 map 缓存中的订阅注册任务,处理流程如下:
- 从 worker 的 subMap 取出所有 dataInfoId 和订阅者列表,并对每个 dataInfoId 分别处理
- 通过 RegProcessor#processBuffer 方法处理每个 dataInfoId 和对应的订阅者
int processBuffer(Ref ref, int hitSize) {
List<Subscriber> subscribers = Lists.newArrayListWithCapacity(hitSize);
for (Map.Entry<String, Subscriber> e : ref.subscriberMap.entrySet()) {
final Subscriber sub = e.getValue();
// 若订阅者已经推送过,直接忽略
if (!sub.hasPushed()) {
subscribers.add(sub);
}
// 这里因为subscriberMap是引用,没有锁保护,所以sub可能已经被新的subscriber替换掉
// try to remove the sub, but subs maybe changes
ref.subscriberMap.remove(sub.getRegisterId(), sub);
}
if (!subscribers.isEmpty()) {
// 从缓存中获取dataInfoId的地址列表,并推送给subscribers
regHandler.onReg(ref.dataInfoId, subscribers);
}
// 返回推送地址列表的订阅者数量
return subscribers.size();
}
通过 FirePushService#getDatum 方法从缓存中获取地址列表。该缓存使用 Guava Cache 的LoadingCache,当缓存中没有 dataInfoId 的地址列表时,会自动从 data server 获取地址列表,并放在缓存中。
通过 FirePushService#processPush 方法将地址列表推送给所有订阅者
- 首先通过 firePush 方法将 PushTas k放入 buffer
- 等待 PushTaskBuffer.BufferWorker 线程异步处理任务
2.4 session server 推送地址列表
- PushProcessor 初始化时默认创建 4 个 PushTaskBuffer.BufferWorker 线程;
- BufferWorker 线程循环执行 watchBuffer 方法,将 worker 中缓存的过期任务删除后进行处理,具体逻辑见下边源码;
int watchBuffer(BufferWorker worker) {
int bufferedSize = worker.bufferMap.size();
if (bufferedSize >= MAX_BUFFERED_SIZE) {
LOGGER.warn("arrived max buffered size: buffered={}", bufferedSize);
}
// 获取推送任务
List<PushTask> pending = worker.transferAndMerge();
int count = 0;
for (PushTask task : pending) {
// 将任务放进线程池执行
if (task.commit()) {
count++;
}
}
if (pending.size() > 0 || count > 0) {
LOGGER.info("buffers={},commits={}", pending.size(), count);
}
return count;
}
- 推送地址列表给客户端。
三、发布流程
服务发布流程主要分为下面5步:
- 客户端服务注册
- session server 处理服务发布请求
- data server 保存服务注册数据,并生成数据变更通知
- session server 接收数据变更通知,拉取数据
- session server 推送地址列表
3.1 服务注册
客户端进行发布注册,与上面客户端订阅的逻辑一样,都是先将请求放在队列里,等待异步处理,此处不再赘述。
3.2 session server 处理服务发布请求
- SessionRegistry#register 方法判断请求来自服务发布方;
- 将服务发布方放进 SessionDataStore 缓存中;
- 由于服务发布的数据最终要写入 data server,这里首先通过 DataNodeService#register 方法放到阻塞队列中异步处理;
具体代码逻辑如下:
case PUBLISHER:
Publisher publisher = (Publisher) storeData;
publisher.setSessionProcessId(ServerEnv.PROCESS_ID);
// 放入缓存
if (!sessionDataStore.add(publisher)) {
break;
}
// 存入data服务器中
// All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
// are handed over to WriteDataAcceptor
writeDataAcceptor.accept(
new PublisherWriteDataRequest(
publisher, WriteDataRequest.WriteDataRequestType.PUBLISHER));
sessionRegistryStrategy.afterPublisherRegister(publisher);
break;
3.3 data server 保存服务注册数据
3.3.1 存储 publisher
- 通过 DataNodeServiceImpl.Worker#run 循环处理队列中的请求
public void run() {
for (; ; ) {
try {
// 从队列中取出第一个
final Req firstReq = queue.poll(200, TimeUnit.MILLISECONDS);
if (firstReq != null) {
// 批量获取请求,默认最多取100个
Map<Integer, LinkedList<Object>> reqs =
drainReq(queue, sessionServerConfig.getDataNodeMaxBatchSize());
// send by order, firstReq.slotId is the first one
LinkedList<Object> firstBatch = reqs.remove(firstReq.slotId);
if (firstBatch == null) {
firstBatch = Lists.newLinkedList();
}
firstBatch.addFirst(firstReq.req);
request(firstReq.slotId, firstBatch);
for (Map.Entry<Integer, LinkedList<Object>> batch : reqs.entrySet()) {
// 批量发起请求
request(batch.getKey(), batch.getValue());
}
}
// check the retry
// 重试逻辑
if (!retryBatches.isEmpty()) {
final Iterator<RetryBatch> it = retryBatches.iterator();
List<RetryBatch> retries = Lists.newArrayList();
while (it.hasNext()) {
RetryBatch batch = it.next();
it.remove();
if (!DataNodeServiceImpl.this.request(batch.batch)) {
retries.add(batch);
}
}
for (RetryBatch retry : retries) {
retry(retry);
}
}
} catch (Throwable e) {
LOGGER.safeError("failed to request batch", e);
}
}
}
- data server 的 BatchPutDataHandler 收到请求
- tryAddPublisher:当 registerId 对应服务没有发布过或传入发布方 version 最新时,存入LocalDatumStorage;仅当新增发布方或已有发布方地址列表发生改变时,返回 true,并生成 DataChange 事件;
private boolean tryAddPublisher(Publisher publisher) {
PublisherEnvelope exist = pubMap.get(publisher.getRegisterId());
final RegisterVersion registerVersion = publisher.registerVersion();
if (exist == null) {
PublisherEnvelope envelope = PublisherEnvelope.of(publisher);
pubMap.put(publisher.getRegisterId(), envelope);
// 当服务发布时,添加成功
return envelope.isPub();
}
// 版本号没有变化,添加失败
if (exist.registerVersion.equals(registerVersion)) {
...
return false;
}
// 版本号是旧的,直接忽略,添加失败
if (!exist.registerVersion.orderThan(registerVersion)) {
...
return false;
}
PublisherEnvelope envelope = PublisherEnvelope.of(publisher);
// 存储
pubMap.put(publisher.getRegisterId(), envelope);
if (exist.publisher == null) {
// publisher is null after client_off
...
// 首次发布,添加成功
return envelope.isPub();
}
try {
// 判断地址列表是否变化,未变化返回false
boolean same =
exist.publisher.getDataList() == null
? publisher.getDataList() == null
: exist.publisher.getDataList().equals(publisher.getDataList());
.
...
return !same;
} catch (Throwable t) {
...
return true;
}
}
- 存储成功,产生 DataChange 事件,
public Object doHandle(Channel channel, BatchRequest request) {
final ProcessId sessionProcessId = request.getSessionProcessId();
processSessionProcessId(channel, sessionProcessId);
final SlotAccess slotAccess =
checkAccess(request.getSlotId(), request.getSlotTableEpoch(), request.getSlotLeaderEpoch());
if (slotAccess.isMoved() || slotAccess.isMisMatch()) {
// only reject the when moved
return SlotAccessGenericResponse.failedResponse(slotAccess);
}
final String slotIdStr = String.valueOf(request.getSlotId());
final Set<String> changeDataInfoIds = Sets.newHashSetWithExpectedSize(128);
try {
for (Object req : request.getRequest()) {
// contains publisher and unPublisher
if (req instanceof Publisher) {
Publisher publisher = (Publisher) req;
DatumVersion updatedVersion = doHandle(publisher);
if (updatedVersion != null) {
// // 存储成功就加入列表,用于生成DataChange事件
changeDataInfoIds.add(publisher.getDataInfoId());
}
. ...
}
...
}
} finally {
// if has exception, try to notify the req which was handled
if (!changeDataInfoIds.isEmpty()) {
// 生成DataChange事件
dataChangeEventCenter.onChange(
changeDataInfoIds, DataChangeType.PUT, dataServerConfig.getLocalDataCenter());
}
}
private DatumVersion doHandle(Publisher publisher) {
publisher = Publisher.internPublisher(publisher);
...
// 存储服务发布方地址
.
return localDatumStorage.put(publisher);
}
- DataChangeEventCenter#onChange 方法将 dataInfoId 列表加到缓存 map 中,每个数据中心对应一个 dataInfoId 列表,等待异步处理。
3.3.2 生成数据变更通知
- dataServer 启动时创建了一个 DataChangeEventCenter.ChangeMerger 线程,用于处理数据变更通知的缓存;
- 获取服务发布方的最新版本号
- 生成变更通知交给线程池执行,发送给 session server
boolean handleChanges(Map<String, List<Channel>> channelsMap) {
// first clean the event
final int maxItems = dataServerConfig.getNotifyMaxItems();
final List<DataChangeEvent> events = transferChangeEvent(maxItems);
if (events.isEmpty()) {
return false;
}
if (channelsMap.isEmpty()) {
LOGGER.error("session conn is empty when change");
return false;
}
// for循环遍历处理所有events
for (DataChangeEvent event : events) {
final Map<String, DatumVersion> changes =
Maps.newHashMapWithExpectedSize(event.getDataInfoIds().size());
final String dataCenter = event.getDataCenter();
for (String dataInfoId : event.getDataInfoIds()) {
// 获取最新版本号
DatumVersion datumVersion = datumCache.getVersion(dataCenter, dataInfoId);
if (datumVersion != null) {
changes.put(dataInfoId, datumVersion);
}
}
if (changes.isEmpty()) {
continue;
}
for (Map.Entry<String, DatumVersion> entry : changes.entrySet()) {
LOGGER.info("datum change notify: {},{}", entry.getKey(), entry.getValue());
}
for (Map.Entry<String, List<Channel>> entry : channelsMap.entrySet()) {
// 随机获取一个session server 推送数据变更通知
Channel channel = CollectionUtils.getRandom(entry.getValue());
try {
// 放入线程池中执行
notifyExecutor.execute(
channel.getRemoteAddress(),
new ChangeNotifier(channel, event.getDataCenter(), changes, event.getTraceTimes()));
CHANGE_COMMIT_COUNTER.inc();
} catch (FastRejectedExecutionException e) {
CHANGE_SKIP_COUNTER.inc();
LOGGER.warn("commit notify full, {}, {}, {}", channel, changes.size(), e.getMessage());
} catch (Throwable e) {
CHANGE_SKIP_COUNTER.inc();
LOGGER.error("commit notify failed, {}, {}", channel, changes.size(), e);
}
}
}
return true;
}
- session server 收到数据变更通知,由 DataChangeRequestHandler#doHandle 方法处理;
public Object doHandle(Channel channel, DataChangeRequest dataChangeRequest) {
if (!pushSwitchService.canPush()) {
return null;
}
final String dataNode = RemotingHelper.getRemoteHostAddress(channel);
final String dataCenter = dataChangeRequest.getDataCenter();
final long changeTimestamp = System.currentTimeMillis();
for (Map.Entry<String, DatumVersion> e : dataChangeRequest.getDataInfoIds().entrySet()) {
final String dataInfoId = e.getKey();
final DatumVersion version = e.getValue();
// 判断版本号,忽略旧版本数据
Interests.InterestVersionCheck check =
sessionInterests.checkInterestVersion(dataCenter, dataInfoId, version.getValue());
if (!check.interested) {
if (check != Interests.InterestVersionCheck.NoSub) {
// log exclude NoSub
LOGGER.info("[SkipChange]{},{}, ver={}, {}", dataInfoId, dataCenter, version, check);
}
continue;
}
final TriggerPushContext changeCtx =
new TriggerPushContext(
dataCenter,
version.getValue(),
dataNode,
changeTimestamp,
dataChangeRequest.getTimes());
// 加入缓存map
firePushService.fireOnChange(dataInfoId, changeCtx);
}
return null;
}
3.5 session server 拉取地址列表
同样地,此处采用生产者消费者模型,通过 worker 线程异步循环处理 ChangeTask,获取地址列表,更新地址列表缓存,推送给客户端,流程图如下。
doExecuteOnChange,获取地址列表源码:
boolean doExecuteOnChange(String changeDataInfoId, TriggerPushContext changeCtx) {
final long expectVersion = changeCtx.getExpectDatumVersion();
// 获取地址列表
final SubDatum datum = getDatum(changeCtx.dataCenter, changeDataInfoId, expectVersion);
if (datum == null) {
// datum change, but get null datum, should not happen
LOGGER.error("[changeNil] {},{},{}", changeCtx.dataCenter, changeDataInfoId, expectVersion);
return false;
}
// 判断版本号,获取到的最新版本号不应该比预期的小
if (datum.getVersion() < expectVersion) {
LOGGER.error(
"[changeLessVer] {},{},{}<{}",
changeCtx.dataCenter,
changeDataInfoId,
datum.getVersion(),
expectVersion);
return false;
}
onDatumChange(changeCtx, datum);
return true;
}
// 获取地址列表
SubDatum getDatum(String dataCenter, String dataInfoId, long expectVersion) {
Key key = new Key(DatumKey.class.getName(), new DatumKey(dataInfoId, dataCenter));
Value value = sessionCacheService.getValueIfPresent(key);
if (value != null) {
SubDatum datum = (SubDatum) value.getPayload();
// 缓存中的数据更新,以缓存中的数据为准
if (datum != null && datum.getVersion() >= expectVersion) {
// the expect version got
CACHE_HIT_COUNTER.inc();
return datum;
}
}
CACHE_MISS_COUNTER.inc();
// invalidate缓存中的数据
// the cache is too old
sessionCacheService.invalidate(key);
// 获取并更新缓存
value = sessionCacheService.getValue(key);
return value == null ? null : (SubDatum) value.getPayload();
}
3.6 session server 推送地址列表
- 逻辑与发起订阅后的推送逻辑一样,这里不再展开。
四、性能优化
异步处理
SOFARegistry 是 AP 的,订阅发布流程基本上都采用了异步处理的方式,牺牲了一部分一致性。内部大量采用了 Map 或队列作为缓存,解耦生产者和消费者,极大地提升了客户端 session server和 data server 的性能; 缓存:
SessionServer 通过 SessionCacheService 缓存地址列表,避免频繁请求给 data server 造成较大压力,影响存储稳定性;内部通过推拉结合的方式解决地址更新的问题,一方面通过数据变更通知 session server 更新地址列表,另一方面通过 VersionWatchDog 定时扫描版本号,拉取 data server 变更的地址列表;
合并 merge
发布服务写入 data server 时采用了批量处理的方式,减少与 data server 的网络交互,避免频繁请求给 data server 造成较大压力,影响存储稳定性。
五、并发控制
SOFARegistry 并发控制的方式主要有两种:
- 乐观并发控制:通过版本号避免旧版本数据写入,如 Publisher 的 registerVersion 可以防止 data server 将旧版本的 publisher 存入;Datum 的 version 可以防止将旧版本的地址列表写入缓存;
- 悲观并发控制:如 PublisherGroup 中通过读写锁在更改版本号,或增删发布者时进行写锁控制,防止并发写入;对查询等进行读锁控制。