From 892f744baae158f47e5294e26d97b47ec49f9692 Mon Sep 17 00:00:00 2001 From: Yisheng Zhou Date: Mon, 25 Nov 2024 14:28:20 -0800 Subject: [PATCH 1/5] Stop service before broker recovery action --- .../actions/kafka/BrokerRecoveryAction.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java index a5e6f47e..1f63ad24 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java @@ -20,6 +20,7 @@ import com.pinterest.orion.core.actions.alert.AlertLevel; import com.pinterest.orion.core.actions.alert.AlertMessage; import com.pinterest.orion.core.actions.aws.ReplaceEC2InstanceAction; +import com.pinterest.orion.core.actions.generic.GenericActions.ServiceStopAction; import com.pinterest.orion.core.actions.generic.NodeAction; import com.pinterest.orion.core.actions.generic.ServiceStabilityCheckAction; import com.pinterest.orion.server.OrionServer; @@ -31,6 +32,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; public class BrokerRecoveryAction extends NodeAction { @@ -42,6 +44,7 @@ public class BrokerRecoveryAction extends NodeAction { private static final int retryIntervalMilliseconds = 1000; public static final String ATTR_TRY_TO_RESTART_KEY = "try_restart"; + public static final String ATTR_STOP_SERVICE_BEFORE_ACTION = "stop_service_before_action"; public static final String ATTR_NODE_EXISTS_KEY = "node_exists"; public static final String ATTR_NONEXISTENT_HOST_KEY = "nonexistent_host"; public static final String CONF_DRY_RUN_REPLACEMENT_KEY = "dry_run"; @@ -86,6 +89,37 @@ public void runAction() { nodeId = node.getCurrentNodeInfo().getNodeId(); String hostname = node.getCurrentNodeInfo().getHostname(); int port = node.getCurrentNodeInfo().getServicePort(); + + // Try to gracefully shutdown service before recovery actions. + // This can make sure the broker is not the leader of any partition. + boolean stopServiceBeforeAction = true; + if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION)) { + stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION).getValue(); + } + if (stopServiceBeforeAction) { + // Metrics + try { + ServiceStopAction stopServiceAction = new ServiceStopAction(); + stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); + getEngine().dispatchChild(this, stopServiceAction); + getResult().appendOut("Attempt to stop service before recovery action."); + try { + stopServiceAction.get(30, TimeUnit.SECONDS); + } catch (Exception e) { + getResult().appendErr( + "Unable to stop service before broker replacement. Will keep moving forward. Error message:" + + e.getMessage()); + } + } catch (Exception e) { + getResult().appendErr( + "Encounter unknown error when stopping service. Will keep moving forward. Error message:" + + e.getMessage()); + } + } else { + getResult().appendOut( + "Skip stopping service before recovery actions. The configuration is set to false."); + } + boolean tryRestart = false; if (containsAttribute(ATTR_TRY_TO_RESTART_KEY)) { tryRestart = getAttribute(ATTR_TRY_TO_RESTART_KEY).getValue(); From dacaec3ec6bf924e245470f2894c14887c725bdb Mon Sep 17 00:00:00 2001 From: Yisheng Zhou Date: Mon, 25 Nov 2024 14:42:03 -0800 Subject: [PATCH 2/5] Add metrics call --- .../orion/core/actions/kafka/BrokerRecoveryAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java index 1f63ad24..522c08a2 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java @@ -97,7 +97,7 @@ public void runAction() { stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION).getValue(); } if (stopServiceBeforeAction) { - // Metrics + OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_action")).inc(); try { ServiceStopAction stopServiceAction = new ServiceStopAction(); stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); From b1e2ddc71fa18f96197599f69037fb42b3726aab Mon Sep 17 00:00:00 2001 From: Yisheng Zhou Date: Tue, 26 Nov 2024 09:13:57 -0800 Subject: [PATCH 3/5] Move the logic before replacement --- .../actions/kafka/BrokerRecoveryAction.java | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java index 522c08a2..4a59c9de 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java @@ -44,7 +44,7 @@ public class BrokerRecoveryAction extends NodeAction { private static final int retryIntervalMilliseconds = 1000; public static final String ATTR_TRY_TO_RESTART_KEY = "try_restart"; - public static final String ATTR_STOP_SERVICE_BEFORE_ACTION = "stop_service_before_action"; + public static final String ATTR_STOP_SERVICE_BEFORE_REPLACEMENT = "stop_service_before_replacement"; public static final String ATTR_NODE_EXISTS_KEY = "node_exists"; public static final String ATTR_NONEXISTENT_HOST_KEY = "nonexistent_host"; public static final String CONF_DRY_RUN_REPLACEMENT_KEY = "dry_run"; @@ -90,36 +90,6 @@ public void runAction() { String hostname = node.getCurrentNodeInfo().getHostname(); int port = node.getCurrentNodeInfo().getServicePort(); - // Try to gracefully shutdown service before recovery actions. - // This can make sure the broker is not the leader of any partition. - boolean stopServiceBeforeAction = true; - if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION)) { - stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION).getValue(); - } - if (stopServiceBeforeAction) { - OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_action")).inc(); - try { - ServiceStopAction stopServiceAction = new ServiceStopAction(); - stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); - getEngine().dispatchChild(this, stopServiceAction); - getResult().appendOut("Attempt to stop service before recovery action."); - try { - stopServiceAction.get(30, TimeUnit.SECONDS); - } catch (Exception e) { - getResult().appendErr( - "Unable to stop service before broker replacement. Will keep moving forward. Error message:" - + e.getMessage()); - } - } catch (Exception e) { - getResult().appendErr( - "Encounter unknown error when stopping service. Will keep moving forward. Error message:" - + e.getMessage()); - } - } else { - getResult().appendOut( - "Skip stopping service before recovery actions. The configuration is set to false."); - } - boolean tryRestart = false; if (containsAttribute(ATTR_TRY_TO_RESTART_KEY)) { tryRestart = getAttribute(ATTR_TRY_TO_RESTART_KEY).getValue(); @@ -160,6 +130,37 @@ public void runAction() { getResult().appendOut("Start replacing broker"); + // Try to gracefully shutdown service before replacing broker. + // This can make sure the broker is not the leader of any partition. + boolean stopServiceBeforeAction = true; + if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT)) { + stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT).getValue(); + } + if (stopServiceBeforeAction) { + OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_replacement")).inc(); + try { + ServiceStopAction stopServiceAction = new ServiceStopAction(); + stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); + getEngine().dispatchChild(this, stopServiceAction); + getResult().appendOut("Attempt to stop service before broker replacement."); + try { + stopServiceAction.get(30, TimeUnit.SECONDS); + } catch (Exception e) { + getResult().appendErr( + "Unable to stop service before broker replacement. Will keep moving forward. Error message:" + + e.getMessage()); + } + } catch (Exception e) { + getResult().appendErr( + "Encounter unknown error when stopping service. Will keep moving forward. Error message:" + + e.getMessage()); + } + } else { + getResult().appendOut( + "Skip stopping service before broker replacement. " + + "The stop_service_before_replacement configuration is set to false."); + } + try { // if dry run skip dispatching the replacement action and report success, but still set the cooldown flag for the cluster if (!isDryRun) { From 8e23b94ec83cdb1301d7074a4f7430cc13718f66 Mon Sep 17 00:00:00 2001 From: Yisheng Zhou Date: Tue, 26 Nov 2024 09:16:02 -0800 Subject: [PATCH 4/5] Rename variables --- .../orion/core/actions/kafka/BrokerRecoveryAction.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java index 4a59c9de..c772f7c9 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java @@ -89,7 +89,6 @@ public void runAction() { nodeId = node.getCurrentNodeInfo().getNodeId(); String hostname = node.getCurrentNodeInfo().getHostname(); int port = node.getCurrentNodeInfo().getServicePort(); - boolean tryRestart = false; if (containsAttribute(ATTR_TRY_TO_RESTART_KEY)) { tryRestart = getAttribute(ATTR_TRY_TO_RESTART_KEY).getValue(); @@ -132,12 +131,12 @@ public void runAction() { // Try to gracefully shutdown service before replacing broker. // This can make sure the broker is not the leader of any partition. - boolean stopServiceBeforeAction = true; + boolean stopServiceBeforeReplacement = true; if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT)) { - stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT).getValue(); + stopServiceBeforeReplacement = getAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT).getValue(); } - if (stopServiceBeforeAction) { - OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_replacement")).inc(); + if (stopServiceBeforeReplacement) { + OrionServer.METRICS.counter(metricPrefix.resolve(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT)).inc(); try { ServiceStopAction stopServiceAction = new ServiceStopAction(); stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); From 752d9d697b3be5bc4bb6585eebd366a1136e7fca Mon Sep 17 00:00:00 2001 From: Yisheng Zhou Date: Tue, 26 Nov 2024 10:05:15 -0800 Subject: [PATCH 5/5] Address comments --- .../actions/kafka/BrokerRecoveryAction.java | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java index c772f7c9..c1ecdec5 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java @@ -44,7 +44,6 @@ public class BrokerRecoveryAction extends NodeAction { private static final int retryIntervalMilliseconds = 1000; public static final String ATTR_TRY_TO_RESTART_KEY = "try_restart"; - public static final String ATTR_STOP_SERVICE_BEFORE_REPLACEMENT = "stop_service_before_replacement"; public static final String ATTR_NODE_EXISTS_KEY = "node_exists"; public static final String ATTR_NONEXISTENT_HOST_KEY = "nonexistent_host"; public static final String CONF_DRY_RUN_REPLACEMENT_KEY = "dry_run"; @@ -130,34 +129,23 @@ public void runAction() { getResult().appendOut("Start replacing broker"); // Try to gracefully shutdown service before replacing broker. - // This can make sure the broker is not the leader of any partition. - boolean stopServiceBeforeReplacement = true; - if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT)) { - stopServiceBeforeReplacement = getAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT).getValue(); - } - if (stopServiceBeforeReplacement) { - OrionServer.METRICS.counter(metricPrefix.resolve(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT)).inc(); + // This ensures partition leaderships are moved away from the broker before the replacement process starts. + try { + ServiceStopAction stopServiceAction = new ServiceStopAction(); + stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); + getEngine().dispatchChild(this, stopServiceAction); + getResult().appendOut("Attempt to stop service before broker replacement."); try { - ServiceStopAction stopServiceAction = new ServiceStopAction(); - stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); - getEngine().dispatchChild(this, stopServiceAction); - getResult().appendOut("Attempt to stop service before broker replacement."); - try { - stopServiceAction.get(30, TimeUnit.SECONDS); - } catch (Exception e) { - getResult().appendErr( - "Unable to stop service before broker replacement. Will keep moving forward. Error message:" - + e.getMessage()); - } + stopServiceAction.get(30, TimeUnit.SECONDS); } catch (Exception e) { getResult().appendErr( - "Encounter unknown error when stopping service. Will keep moving forward. Error message:" + "Unable to stop service before broker replacement. Will keep moving forward. Error message:" + e.getMessage()); } - } else { - getResult().appendOut( - "Skip stopping service before broker replacement. " + - "The stop_service_before_replacement configuration is set to false."); + } catch (Exception e) { + getResult().appendErr( + "Encounter unknown error when stopping service. Will keep moving forward. Error message:" + + e.getMessage()); } try {