From 46ea59bc0f0c535cf53ef66d9c270702ccd27bcd Mon Sep 17 00:00:00 2001 From: fanjianye Date: Mon, 23 Sep 2024 15:43:50 +0800 Subject: [PATCH] fix broker lost rack information --- .../bookkeeper/PulsarRegistrationClient.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index be945d988fb88..89dbf2be990b0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -181,8 +181,13 @@ private CompletableFuture>> getBookiesThenFreshCache(Str @Override public CompletableFuture watchWritableBookies(RegistrationListener registrationListener) { writableBookiesWatchers.add(registrationListener); + // trigger all listeners in writableBookiesWatchers one by one. It aims to keep a sync way + // to make sure the previous listener has finished when a new listener is register. + // Though it would bring duplicate trigger listener problem, but since watchWritableBookies + // is only executed when bookieClient construct, the duplicate problem is acceptable. return getWritableBookies() - .thenAcceptAsync(registrationListener::onBookiesChanged, executor); + .thenAcceptAsync(bookies -> + writableBookiesWatchers.forEach(w -> w.onBookiesChanged(bookies)), executor); } @Override @@ -193,8 +198,13 @@ public void unwatchWritableBookies(RegistrationListener registrationListener) { @Override public CompletableFuture watchReadOnlyBookies(RegistrationListener registrationListener) { readOnlyBookiesWatchers.add(registrationListener); + // trigger all listeners in readOnlyBookiesWatchers one by one. It aims to keep a sync way + // to make sure the previous listener has finished when a new listener is register. + // Though it would bring duplicate trigger listener problem, but since watchReadOnlyBookies + // is only executed when bookieClient construct, the duplicate problem is acceptable. return getReadOnlyBookies() - .thenAcceptAsync(registrationListener::onBookiesChanged, executor); + .thenAcceptAsync(bookies -> + readOnlyBookiesWatchers.forEach(w -> w.onBookiesChanged(bookies)), executor); } @Override