Skip to content

Commit

Permalink
Merge pull request #309 from pinterest/stop_broker_before_recovery_ac…
Browse files Browse the repository at this point in the history
…tion

Stop Kafka Service Before Running Broker Recovery Process
  • Loading branch information
yisheng-zhou authored Nov 26, 2024
2 parents 5267c45 + 752d9d6 commit 869442a
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.pinterest.orion.core.actions.alert.AlertLevel;
import com.pinterest.orion.core.actions.alert.AlertMessage;
import com.pinterest.orion.core.actions.aws.ReplaceEC2InstanceAction;
import com.pinterest.orion.core.actions.generic.GenericActions.ServiceStopAction;
import com.pinterest.orion.core.actions.generic.NodeAction;
import com.pinterest.orion.core.actions.generic.ServiceStabilityCheckAction;
import com.pinterest.orion.server.OrionServer;
Expand All @@ -31,6 +32,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class BrokerRecoveryAction extends NodeAction {
Expand Down Expand Up @@ -126,6 +128,26 @@ public void runAction() {

getResult().appendOut("Start replacing broker");

// Try to gracefully shutdown service before replacing broker.
// This ensures partition leaderships are moved away from the broker before the replacement process starts.
try {
ServiceStopAction stopServiceAction = new ServiceStopAction();
stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID);
getEngine().dispatchChild(this, stopServiceAction);
getResult().appendOut("Attempt to stop service before broker replacement.");
try {
stopServiceAction.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
getResult().appendErr(
"Unable to stop service before broker replacement. Will keep moving forward. Error message:"
+ e.getMessage());
}
} catch (Exception e) {
getResult().appendErr(
"Encounter unknown error when stopping service. Will keep moving forward. Error message:"
+ e.getMessage());
}

try {
// if dry run skip dispatching the replacement action and report success, but still set the cooldown flag for the cluster
if (!isDryRun) {
Expand Down

0 comments on commit 869442a

Please sign in to comment.