Skip to content

Commit

Permalink
增加注释
Browse files Browse the repository at this point in the history
  • Loading branch information
haohao0103 committed Jun 27, 2024
1 parent c364da0 commit d936ec5
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public void connectPdWatch(String leader) {
partitionWatcher = pdWatch.watchPartition(new PDWatch.Listener<>() {
@Override
public void onNext(PartitionEvent response) {
// log.info("PDClient receive partition event {}-{} {}",
// response.getGraph(), response.getPartitionId(), response.getChangeType());
log.info("PDClient receive partition event {}-{} {}",
response.getGraph(), response.getPartitionId(), response.getChangeType());
invalidPartitionCache(response.getGraph(), response.getPartitionId());

if (response.getChangeType() == PartitionEvent.ChangeType.DEL) {
Expand Down Expand Up @@ -472,8 +472,9 @@ public KVPair<Metapb.Partition, Metapb.Shard> getPartition(String graphName, byt
public KVPair<Metapb.Partition, Metapb.Shard> getPartition(String graphName, byte[] key,
int code) throws
PDException {
KVPair<Metapb.Partition, Metapb.Shard> partShard =
cache.getPartitionByCode(graphName, code);
// KVPair<Metapb.Partition, Metapb.Shard> partShard =
// cache.getPartitionByCode(graphName, code);
KVPair<Metapb.Partition, Metapb.Shard> partShard = this.getPartitionByCode(graphName, code);
partShard = getKvPair(graphName, key, partShard);
return partShard;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,15 @@ private AbstractConnector(Listener<L> listener, PulseType pulseType) {
}

void init() {
// 创建一个PulseCreateRequest的Builder对象,并设置其pulseType属性为当前对象的pulseType属性
PulseCreateRequest.Builder builder = PulseCreateRequest.newBuilder()
.setPulseType(this.pulseType);

// 获取PDPulseImpl对象的stub属性,并调用其pulse方法,传入当前对象作为参数,将返回的RequestStream赋值给this.reqStream
this.reqStream = PDPulseImpl.this.stub.pulse(this);

// 调用reqBuilder的clear方法清空之前的设置,然后设置其createRequest属性为builder对象,
// 最后调用build方法生成一个PulseRequest对象,并将其发送给reqStream
this.reqStream.onNext(reqBuilder.clear().setCreateRequest(builder).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,13 +937,16 @@ private boolean checkTargetCount(int fromCount, int toCount, int shardCount) {
*/
public void partitionHeartbeat(Metapb.PartitionStats stats) throws PDException {

// 是从pd 存储中获取的数据,stats是从store 侧传入的partition信息
Metapb.ShardGroup shardGroup = storeService.getShardGroup(stats.getId());
// shard group version changes
// (shard group 由pd控制, 在分裂等操作后,可能出现短暂不一致的情况,以pd为准)
// store控制shard leader
if (shardGroup != null &&
(shardGroup.getVersion() < stats.getLeaderTerm() ||
shardGroup.getConfVer() < stats.getConfVer())) {
// 如果进来说明partition 对应的shargroup 信息改变了
// onPartitionChanged();
storeService.updateShardGroup(stats.getId(),
stats.getShardList(), stats.getLeaderTerm(),
stats.getConfVer());
Expand All @@ -965,27 +968,39 @@ public void partitionHeartbeat(Metapb.PartitionStats stats) throws PDException {
* @param stats
*/
private void checkShardState(Metapb.Partition partition, Metapb.PartitionStats stats) {

try {
// 离线shard的计数器
int offCount = 0;
// 遍历分区统计信息中的每个shard统计信息
for (Metapb.ShardStats shard : stats.getShardStatsList()) {
// 如果shard状态为离线
if (shard.getState() == Metapb.ShardState.SState_Offline) {
// 离线shard计数器加一
offCount++;
}
}

// 如果分区状态不是离线状态
if (partition.getState() != Metapb.PartitionState.PState_Offline) {
// 如果离线shard计数为0
if (offCount == 0) {
// 更新分区状态为正常
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Normal);
// 如果离线shard数量少于shard总数的一半
} else if (offCount * 2 < stats.getShardStatsCount()) {
// 更新分区状态为警告
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Warn);
// 否则
} else {
// 更新分区状态为警告(此处的else分支和上面的else if分支代码相同,可能是冗余的)
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Warn);
}
}
} catch (Exception e) {
// 打印异常日志
log.error("Partition {}-{} checkShardState exception {}",
partition.getGraphName(), partition.getId(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,31 @@ public class StoreNodeService {
private Metapb.ClusterStats clusterStats;

public StoreNodeService(PDConfig config) {
// 将传入的配置赋值给成员变量pdConfig
this.pdConfig = config;

// 调用MetadataFactory的newStoreInfoMeta方法创建StoreInfoMeta对象,并将配置作为参数传入
storeInfoMeta = MetadataFactory.newStoreInfoMeta(pdConfig);

// 调用MetadataFactory的newTaskInfoMeta方法创建TaskInfoMeta对象,并将配置作为参数传入
taskInfoMeta = MetadataFactory.newTaskInfoMeta(pdConfig);

// 创建一个线程安全的ArrayList,并将其赋值给shardGroupStatusListeners成员变量
shardGroupStatusListeners = Collections.synchronizedList(new ArrayList<>());

// 创建一个线程安全的ArrayList,并将其赋值给statusListeners成员变量
statusListeners = Collections.synchronizedList(new ArrayList<StoreStatusListener>());

// 创建一个Metapb.ClusterStats对象,并设置其状态为Cluster_Not_Ready,时间戳为当前时间戳
clusterStats = Metapb.ClusterStats.newBuilder()
.setState(Metapb.ClusterState.Cluster_Not_Ready)
.setTimestamp(System.currentTimeMillis())
.build();

// 创建一个KvService对象,并将配置作为参数传入
kvService = new KvService(pdConfig);

// 创建一个ConfigService对象,并将配置作为参数传入
configService = new ConfigService(pdConfig);
}

Expand Down Expand Up @@ -705,6 +720,13 @@ public synchronized void updateShardGroupState(int groupId, Metapb.PartitionStat
* @param storeStats
* @throws PDException
*/
/**
* 心跳函数,用于更新Store的状态信息,并返回集群状态信息。
*
* @param storeStats 存储节点的状态信息
* @return 集群状态信息
* @throws PDException 当Store不存在或者已经被移除时,抛出异常
*/
public Metapb.ClusterStats heartBeat(Metapb.StoreStats storeStats) throws PDException {
this.storeInfoMeta.updateStoreStats(storeStats);
Metapb.Store lastStore = this.getStore(storeStats.getStoreId());
Expand Down Expand Up @@ -813,7 +835,7 @@ public synchronized void checkStoreStatus() {
if (activeStores.size() < pdConfig.getMinStoreCount()) {
builder.setState(Metapb.ClusterState.Cluster_Not_Ready);
builder.setMessage("The number of active stores is " + activeStores.size()
+ ", less than pd.initial-store-count:" +
+ ", less than pd.min-store-count:" +
pdConfig.getMinStoreCount());
}
Map<Long, Metapb.Store> storeMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,21 @@ public List<Metapb.Store> patrolStores() throws PDException {
}

List<Metapb.Store> changedStores = new ArrayList<>();

// 获取所有store列表
// 检查store在线状态
List<Metapb.Store> stores = storeService.getStores("");

// 获取所有活跃的store的ID和Store对象映射关系
Map<Long, Metapb.Store> activeStores = storeService.getActiveStores("")
.stream().collect(
Collectors.toMap(Metapb.Store::getId, t -> t));
Collectors.toMap(Metapb.Store::getId, t -> t));

// 遍历所有store
for (Metapb.Store store : stores) {
Metapb.Store changeStore = null;

// 判断store是否在线(状态为Up或Unknown)且不在活跃store列表中
if ((store.getState() == Metapb.StoreState.Up
|| store.getState() == Metapb.StoreState.Unknown)
&& !activeStores.containsKey(store.getId())) {
Expand All @@ -214,14 +222,16 @@ public List<Metapb.Store> patrolStores() throws PDException {
.setState(Metapb.StoreState.Offline)
.build();

// 判断store状态为Exiting且不在活跃store列表中,
// 或者store状态为Offline且离线时间超过最大允许时间,并且集群启动时间也超过最大允许时间
} else if ((store.getState() == Metapb.StoreState.Exiting &&
!activeStores.containsKey(store.getId())) ||
(store.getState() == Metapb.StoreState.Offline &&
(System.currentTimeMillis() - store.getLastHeartbeat() >
pdConfig.getStore().getMaxDownTime() * 1000) &&
(System.currentTimeMillis() - clusterStartTime >
pdConfig.getStore().getMaxDownTime() * 1000))) {
//手工修改为下线或者离线达到时长
// 手工修改为下线或者离线达到时长
// 修改状态为关机, 增加 checkStoreCanOffline 检测
if (storeService.checkStoreCanOffline(store)) {
changeStore = Metapb.Store.newBuilder(store)
Expand All @@ -231,11 +241,15 @@ public List<Metapb.Store> patrolStores() throws PDException {
log.info("patrolStores store {} Offline", changeStore.getId());
}
}

// 如果store状态发生了变化,则更新store并添加到changedStores列表中
if (changeStore != null) {
storeService.updateStore(changeStore);
changedStores.add(changeStore);
}
}

// 返回状态发生变化的store列表
return changedStores;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,18 +337,24 @@ private void cancelObserver() {
}

private void addObserver(PulseCreateRequest request) {
// 如果subject不为空,则直接返回
if (this.subject != null) {
return;
}

// 获取脉冲类型
PulseType pulseType = getPulseType(request);
// 如果脉冲类型为空,则直接返回
if (pulseType == null) {
return;
}

// 根据脉冲类型获取subject
this.subject = getSubject(pulseType);
// 创建观察者ID
this.observerId = createObserverId();

// 向subject中添加观察者
this.subject.addObserver(this.observerId, this.responseObserver);
}

Expand All @@ -370,14 +376,19 @@ private PulseType getPulseType(PulseCreateRequest request) {
}

private AbstractObserverSubject getSubject(PulseType pulseType) {
// 从subjectHolder中根据脉冲类型的名称获取对应的观察者主体
AbstractObserverSubject subject = subjectHolder.get(pulseType.name());

// 如果观察者主体为空
if (subject == null) {
// 调用responseObserver的onError方法,传入一个异常对象,表示不支持的脉冲类型
responseObserver.onError(
new Exception("Unsupported pulse-type: " + pulseType.name()));
// 返回null
return null;
}

// 返回观察者主体
return subject;
}

Expand Down Expand Up @@ -406,26 +417,43 @@ private void handleNotice(PulseNoticeRequest noticeRequest) {
}
}

/**
* 接收来自pdpulseimpl 发送的请求
*
* @param pulseRequest 接收到的脉冲请求
*/
@Override
public void onNext(PulseRequest pulseRequest) {

// 如果请求包含创建请求
if (pulseRequest.hasCreateRequest()) {
// 添加观察者
this.addObserver(pulseRequest.getCreateRequest());
// 结束方法
return;
}

// 如果请求包含取消请求
if (pulseRequest.hasCancelRequest()) {
// 取消观察者
this.cancelObserver();
// 结束方法
return;
}

// 如果请求包含通知请求
if (pulseRequest.hasNoticeRequest()) {
// 处理通知
this.handleNotice(pulseRequest.getNoticeRequest());
}

// 如果请求包含确认请求
if (pulseRequest.hasAckRequest()) {
// 确认通知
this.ackNotice(pulseRequest.getAckRequest().getNoticeId()
, pulseRequest.getAckRequest().getObserverId());
// 通知ID
, pulseRequest.getAckRequest().getObserverId()
// 观察者ID
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void init() throws PDException {
RaftEngine.getInstance().addStateListener(partitionService);
pdConfig.setIdService(idService);

// 接收心跳消息
// 接收脉搏消息,这个脉搏信息是来自哪里呢?
PDPulseSubject.listenPartitionHeartbeat(new PulseListener<PartitionHeartbeatRequest>() {
@Override
public void onNext(PartitionHeartbeatRequest request) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,18 @@ public static void notifyError(int code, String message){

private static Long createWatcherId() {
synchronized (lock) {
// 线程让步,给其他线程执行的机会
Thread.yield();

try {
// 线程休眠1毫秒
Thread.sleep(1);
} catch (InterruptedException e) {
// 捕获中断异常,并打印错误日志
log.error("Failed to sleep", e);
}

// 返回当前系统时间的毫秒数作为watcher的id
return System.currentTimeMillis();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,29 @@ public void delete(@Context GraphManager manager,
}

public static Id checkAndParseVertexId(String idValue) {
// 如果idValue为空,则返回null
if (idValue == null) {
return null;
}

// 判断idValue是否以"U"开头
boolean uuid = idValue.startsWith("U\"");

// 如果idValue以"U"开头,则去掉开头的"U"
if (uuid) {
idValue = idValue.substring(1);
}

try {
// 将idValue解析为Object类型的对象
Object id = JsonUtil.fromJson(idValue, Object.class);

// 根据uuid的值返回对应的Id类型
// 如果uuid为true,则返回Text类型的UUID
// 如果uuid为false,则返回HugeVertex的id值
return uuid ? Text.uuid((String) id) : HugeVertex.getIdValue(id);
} catch (Exception e) {
// 如果解析过程中出现异常,则抛出IllegalArgumentException异常
throw new IllegalArgumentException(String.format(
"The vertex id must be formatted as Number/String/UUID" +
", but got '%s'", idValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,11 @@ private Consumer<PartitionFaultResponse> notifyPartitionConsumer(HgNodeStatus st

private Consumer<Throwable> notifyErrConsumer(HgNodeStatus status) {
return t -> {
// 调用 nodeManager 的 notifying 方法,传入图名、HgStoreNotice 对象
nodeManager.notifying(
// 传入图名
this.graphName,
// 创建一个 HgStoreNotice 对象,并传入节点 ID、节点状态、错误消息的字符串
HgStoreNotice.of(this.nodeSession.getStoreNode().getNodeId(), status,
t.getMessage())
);
Expand Down
Loading

0 comments on commit d936ec5

Please sign in to comment.