Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17688: Add Option to limit Balancer aboveAvgUtilized nodes num in each iteration. #7225

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,20 @@ StateStoreMetrics shows the statistics of the State Store component in Router-ba
| `Cache`*BaseRecord*`LoadNumOps` | Number of times store records are loaded in the State Store Cache from State Store |
| `Cache`*BaseRecord*`LoadAvgTime` | Average time of loading State Store Cache from State Store in milliseconds |

BalancerMetrics
-----------------
The metrics present statistics from the Balancer's perspective.

| Name | Description |
|:---------------------------------|:---------------------------------------------------------------------|
| `IterateRunning` | If a balancer iterate is running, the value is 1, otherwise, it is 0 |
| `BytesLeftToMove` | Bytes left to move to make cluster balanced |
| `BytesMovedInCurrentRun` | Bytes that already moved in current doBalance run |
| `NumOfUnderUtilizedNodes` | Number of under utilized nodes |
| `NumOfOverUtilizedNodes` | Number of over utilized nodes |
| `NumOfAboveAverageUtilizedNodes` | Number of above avg utilized nodes |
| `NumOfBelowAvgUtilizedNodes` | Number of below avg utilized nodes |

yarn context
============

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ public class Balancer {
+ "that highly utilized datanodes get scheduled first."
+ "\n\t[-limitOverUtilizedNum <specified maximum number of overUtilized datanodes>]"
+ "\tLimit the maximum number of overUtilized datanodes."
+ "\n\t[-limitAboveAvgUtilizedNum <specified maximum number of aboveAvgUtilized datanodes>]"
+ "\tLimit the maximum number of aboveAvgUtilized datanodes."
+ "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks.";

@VisibleForTesting
Expand All @@ -239,6 +241,7 @@ public class Balancer {
private final long defaultBlockSize;
private final boolean sortTopNodes;
private final int limitOverUtilizedNum;
private final int limitAboveAvgUtilizedNum;
private final BalancerMetrics metrics;

// all data node lists
Expand Down Expand Up @@ -368,6 +371,7 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
this.runDuringUpgrade = p.getRunDuringUpgrade();
this.sortTopNodes = p.getSortTopNodes();
this.limitOverUtilizedNum = p.getLimitOverUtilizedNum();
this.limitAboveAvgUtilizedNum = p.getLimitAboveAvgUtilizedNum();

this.maxSizeToMove = getLongBytes(conf,
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
Expand Down Expand Up @@ -487,11 +491,22 @@ private long init(List<DatanodeStorageReport> reports) {
limitOverUtilizedNum();
}

// Limit the maximum number of aboveAvgUtilized datanodes
// If excludedAboveAvgUtilizedNum is greater than 0, The aboveAvgUtilized nodes num is limited
int excludedAboveAvgUtilizedNum =
Math.max(aboveAvgUtilized.size() - limitAboveAvgUtilizedNum, 0);
if (excludedAboveAvgUtilizedNum > 0) {
limitAboveAvgUtilizedNum();
}

logUtilizationCollections();
metrics.setNumOfOverUtilizedNodes(overUtilized.size());
metrics.setNumOfUnderUtilizedNodes(underUtilized.size());

Preconditions.checkState(dispatcher.getStorageGroupMap().size() - excludedOverUtilizedNum
metrics.setNumOfAboveAvgUtilizedNodes(aboveAvgUtilized.size());
metrics.setNumOfBelowAvgUtilizedNodes(belowAvgUtilized.size());

Preconditions.checkState(dispatcher.getStorageGroupMap().size()
- excludedOverUtilizedNum - excludedAboveAvgUtilizedNum
== overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+ belowAvgUtilized.size(),
"Mismatched number of storage groups");
Expand Down Expand Up @@ -529,6 +544,20 @@ private void limitOverUtilizedNum() {
}
}

private void limitAboveAvgUtilizedNum() {
Preconditions.checkState(aboveAvgUtilized instanceof LinkedList,
"Collection aboveAvgUtilized is not a LinkedList.");

LinkedList<Source> list = (LinkedList<Source>) aboveAvgUtilized;

LOG.info("Limiting aboveAvgUtilized nodes num");

int size = aboveAvgUtilized.size();
for (int i = 0; i < size - limitAboveAvgUtilizedNum; i++) {
list.removeLast();
}
}

private static long computeMaxSize2Move(final long capacity, final long remaining,
final double utilizationDiff, final long max) {
final double diff = Math.abs(utilizationDiff);
Expand Down Expand Up @@ -1144,6 +1173,14 @@ static BalancerParameters parse(String[] args) {
"limitOverUtilizedNum must be non-negative");
LOG.info("Using a limitOverUtilizedNum of {}", limitNum);
b.setLimitOverUtilizedNum(limitNum);
} else if ("-limitAboveAvgUtilizedNum".equalsIgnoreCase(args[i])) {
Preconditions.checkArgument(++i < args.length,
"limitAboveAvgUtilizedNum value is missing: args = " + Arrays.toString(args));
int limitNum = Integer.parseInt(args[i]);
Preconditions.checkArgument(limitNum >= 0,
"limitAboveAvgUtilizedNum must be non-negative");
LOG.info("Using a limitAboveAvgUtilizedNum of {}", limitNum);
b.setLimitAboveAvgUtilizedNum(limitNum);
} else {
throw new IllegalArgumentException("args = "
+ Arrays.toString(args));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ final class BalancerMetrics {
@Metric("Number of over utilized nodes")
private MutableGaugeInt numOfOverUtilizedNodes;

@Metric("Number of above avg utilized nodes")
private MutableGaugeInt numOfAboveAvgUtilizedNodes;

@Metric("Number of below avg utilized nodes")
private MutableGaugeInt numOfBelowAvgUtilizedNodes;

private BalancerMetrics(Balancer b) {
this.balancer = b;
}
Expand Down Expand Up @@ -77,4 +83,12 @@ void setNumOfUnderUtilizedNodes(int numOfUnderUtilizedNodes) {
void setNumOfOverUtilizedNodes(int numOfOverUtilizedNodes) {
this.numOfOverUtilizedNodes.set(numOfOverUtilizedNodes);
}

void setNumOfAboveAvgUtilizedNodes(int numOfAboveAvgUtilizedNodes) {
this.numOfAboveAvgUtilizedNodes.set(numOfAboveAvgUtilizedNodes);
}

void setNumOfBelowAvgUtilizedNodes(int numOfBelowAvgUtilizedNodes) {
this.numOfBelowAvgUtilizedNodes.set(numOfBelowAvgUtilizedNodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ final class BalancerParameters {
private final boolean sortTopNodes;

private final int limitOverUtilizedNum;
private final int limitAboveAvgUtilizedNum;

static final BalancerParameters DEFAULT = new BalancerParameters();

Expand All @@ -88,6 +89,7 @@ private BalancerParameters(Builder builder) {
this.runAsService = builder.runAsService;
this.sortTopNodes = builder.sortTopNodes;
this.limitOverUtilizedNum = builder.limitOverUtilizedNum;
this.limitAboveAvgUtilizedNum = builder.limitAboveAvgUtilizedNum;
this.hotBlockTimeInterval = builder.hotBlockTimeInterval;
}

Expand Down Expand Up @@ -147,6 +149,10 @@ int getLimitOverUtilizedNum() {
return this.limitOverUtilizedNum;
}

int getLimitAboveAvgUtilizedNum() {
return this.limitAboveAvgUtilizedNum;
}

long getHotBlockTimeInterval() {
return this.hotBlockTimeInterval;
}
Expand Down Expand Up @@ -185,6 +191,7 @@ static class Builder {
private boolean runAsService = false;
private boolean sortTopNodes = false;
private int limitOverUtilizedNum = Integer.MAX_VALUE;
private int limitAboveAvgUtilizedNum = Integer.MAX_VALUE;
private long hotBlockTimeInterval = 0;

Builder() {
Expand Down Expand Up @@ -265,6 +272,11 @@ Builder setLimitOverUtilizedNum(int overUtilizedNum) {
return this;
}

Builder setLimitAboveAvgUtilizedNum(int aboveAvgUtilizedNum) {
this.limitAboveAvgUtilizedNum = aboveAvgUtilizedNum;
return this;
}

BalancerParameters build() {
return new BalancerParameters(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,16 @@ Usage:
[-exclude [-f <hosts-file> | <comma-separated list of hosts>]]
[-include [-f <hosts-file> | <comma-separated list of hosts>]]
[-source [-f <hosts-file> | <comma-separated list of hosts>]]
[-excludeSource [-f <hosts-file> | <comma-separated list of hosts>]]
[-target [-f <hosts-file> | <comma-separated list of hosts>]]
[-excludeTarget [-f <hosts-file> | <comma-separated list of hosts>]]
[-blockpools <comma-separated list of blockpool ids>]
[-idleiterations <idleiterations>]
[-runDuringUpgrade]
[-asService]
[-sortTopNodes]
[-limitOverUtilizedNum <specified maximum number of overUtilized datanodes>]
[-limitAboveAvgUtilizedNum <specified maximum number of aboveAvgUtilized datanodes>]
[-hotBlockTimeInterval <specified time interval>]

| COMMAND\_OPTION | Description |
Expand All @@ -303,12 +307,16 @@ Usage:
| `-exclude -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Excludes the specified datanodes from being balanced by the balancer. |
| `-include -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Includes only the specified datanodes to be balanced by the balancer. |
| `-source -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Pick only the specified datanodes as source nodes. |
| `-excludeSource -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Excludes the specified datanodes to be selected as a source. |
| `-target -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Pick only the specified datanodes as target nodes. |
| `-excludeTarget -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Excludes the specified datanodes from being selected as a target. |
| `-blockpools` \<comma-separated list of blockpool ids\> | The balancer will only run on blockpools included in this list. |
| `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
| `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
| `-asService` | Run Balancer as a long running service. |
| `-sortTopNodes` | Sort datanodes based on the utilization so that highly utilized datanodes get scheduled first. |
| `-limitOverUtilizedNum` | Limit the maximum number of overUtilized datanodes. |
| `-limitAboveAvgUtilizedNum` | Limit the maximum number of aboveAvgUtilized datanodes. |
| `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
| `-h`\|`--help` | Display the tool usage and help information and exit. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,62 @@ public void testBalancerWithLimitOverUtilizedNum() throws Exception {
}
}

@Test(timeout = 60000)
public void testBalancerWithLimitAboveAvgUtilizedNum() throws Exception {
final Configuration conf = new HdfsConfiguration();
// Init the config (block size to 100)
initConf(conf);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000);

final long totalCapacity = 1000L;
final int diffBetweenNodes = 50;

// 5 nodes with 80%, 85%, 90%, 95%, 100% usage
final int numOfDn = 5;
final long[] capacityArray = new long[numOfDn];
Arrays.fill(capacityArray, totalCapacity);

try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDn)
.simulatedCapacities(capacityArray)
.build()) {
cluster.setDataNodesDead();
List<DataNode> dataNodes = cluster.getDataNodes();
// Create top used nodes
for (int i = 0; i < numOfDn; i++) {
// Bring one node alive
DataNodeTestUtils.triggerHeartbeat(dataNodes.get(i));
DataNodeTestUtils.triggerBlockReport(dataNodes.get(i));
// Create nodes with: 80%, 85%, 90%, 95%, 100%
int nodeCapacity = (int) totalCapacity - diffBetweenNodes * (numOfDn - i - 1);

TestBalancer.createFile(cluster, new Path("test_file" + i), nodeCapacity, (short) 1, 0);
cluster.setDataNodesDead();
}

// Bring all nodes alive
cluster.triggerHeartbeats();
cluster.triggerBlockReports();
cluster.waitFirstBRCompleted(0, 6000);

final BalancerParameters balancerParameters = Balancer.Cli.parse(new String[] {
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "10",
"-limitAboveAvgUtilizedNum", "0"
});

final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
List<NameNodeConnector> connectors =
NameNodeConnector.newNameNodeConnectors(namenodes, Balancer.class.getSimpleName(),
Balancer.BALANCER_ID_PATH, conf, BalancerParameters.DEFAULT.getMaxIdleIteration());

final Balancer balancer = new Balancer(connectors.get(0), balancerParameters, conf);
Balancer.Result balancerResult = balancer.runOneIteration();
assertTrue("BalancerResult is not as expected. " + balancerResult,
(balancerResult.getBytesAlreadyMoved() == 0 && balancerResult.getBlocksMoved() == 0));
}
}

@Test(timeout = 60000)
public void testBalancerMetricsDuplicate() throws Exception {
final Configuration conf = new HdfsConfiguration();
Expand Down
Loading