Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compile warnings in mantis control plane core #434

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public abstract class BaseService implements Service {

private static AtomicInteger SERVICES_COUNTER = new AtomicInteger(0);
private static final AtomicInteger SERVICES_COUNTER = new AtomicInteger(0);
private final boolean awaitsActiveMode;
private final ActiveMode activeMode = new ActiveMode();
private final int myServiceCount;
Expand Down Expand Up @@ -104,23 +104,20 @@ private void awaitLeaderMode() {
}

public void waitAndStart(final Action0 onActive, final BaseService predecessor) {
logger.info(myServiceCount + ": Setting up thread to wait for entering leader mode");
Runnable runnable = new Runnable() {
@Override
public void run() {
awaitLeaderMode();
logger.info(myServiceCount + ": done waiting for leader mode");
if (predecessor != null) {
predecessor.activeMode.awaitInit();
}
logger.info(myServiceCount + ": done waiting for predecessor init");
if (onActive != null) {
onActive.call();
}
synchronized (isInited) {
isInited.set(true);
isInited.notify();
}
logger.info("{}: Setting up thread to wait for entering leader mode", myServiceCount);
Runnable runnable = () -> {
awaitLeaderMode();
logger.info("{}: done waiting for leader mode", myServiceCount);
if (predecessor != null) {
predecessor.activeMode.awaitInit();
}
logger.info("{}: done waiting for predecessor init", myServiceCount);
if (onActive != null) {
onActive.call();
}
synchronized (isInited) {
isInited.set(true);
isInited.notify();
}
};
Thread thr = new Thread(runnable, "BaseService-LeaderModeWaitThread-" + myServiceCount);
Expand All @@ -145,7 +142,7 @@ private void awaitInit() {
}

public void enterActiveMode() {
logger.info(myServiceCount + ": Entering leader mode");
logger.info("{}: Entering leader mode", myServiceCount);
synchronized (isLeaderMode) {
isLeaderMode.set(true);
isLeaderMode.notify();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ public boolean isIdentical(JobAssignmentResult that) {

public static class Failure {

private int workerNumber;
private String type;
private double asking;
private double used;
private double available;
private final int workerNumber;
private final String type;
private final double asking;
private final double used;
private final double available;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class JobSchedulingInfo implements JsonType {
public static final String SendHBParam = "sendHB";


private String jobId;
private Map<Integer, WorkerAssignments> workerAssignments; // index by stage num
private final String jobId;
private final Map<Integer, WorkerAssignments> workerAssignments; // index by stage num

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class MetricsCoercer implements Coercible<MetricsPublisher> {

private Properties props;
private final Properties props;

public MetricsCoercer(Properties props) {
this.props = props;
Expand All @@ -34,32 +34,29 @@ public MetricsCoercer(Properties props) {
@Override
public Coercer<MetricsPublisher> accept(Class<?> clazz) {
if (MetricsPublisher.class.isAssignableFrom(clazz)) {
return new Coercer<MetricsPublisher>() {
@Override
public MetricsPublisher coerce(String className) {
try {
// get properties for publisher
Properties publishProperties = new Properties();
String configPrefix =
(String) props.get("mantis.metricsPublisher.config.prefix");
if (configPrefix != null && configPrefix.length() > 0) {
for (Entry<Object, Object> entry : props.entrySet()) {
if (entry.getKey() instanceof String &&
((String) entry.getKey()).startsWith(configPrefix)) {
publishProperties.put(entry.getKey(), entry.getValue());
}
return className -> {
try {
// get properties for publisher
Properties publishProperties = new Properties();
String configPrefix =
(String) props.get("mantis.metricsPublisher.config.prefix");
if (configPrefix != null && configPrefix.length() > 0) {
for (Entry<Object, Object> entry : props.entrySet()) {
if (entry.getKey() instanceof String &&
((String) entry.getKey()).startsWith(configPrefix)) {
publishProperties.put(entry.getKey(), entry.getValue());
}
}
return (MetricsPublisher) Class.forName(className).getConstructor(Properties.class)
.newInstance(publishProperties);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"The value %s is not a valid class name for %s implementation. ",
className,
MetricsPublisher.class.getName()
), e);
}
return (MetricsPublisher) Class.forName(className).getConstructor(Properties.class)
.newInstance(publishProperties);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"The value %s is not a valid class name for %s implementation. ",
className,
MetricsPublisher.class.getName()
), e);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

public class PostJobStatusRequest {

private String jobId;
private Status status;
private final String jobId;
private final Status status;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
* metrics such as sourcejob drops. Volume may have high variation over time, buffering by time will eliminate the variant.
*/
public class TimeBufferedWorkerOutlier extends WorkerOutlier {
private Map<Integer, CumulatedValue> workerValues = new HashMap<>();
private long bufferedSecs;
private final Map<Integer, CumulatedValue> workerValues = new HashMap<>();
private final long bufferedSecs;

public TimeBufferedWorkerOutlier(long cooldownSecs, long bufferedSecs, Action1<Integer> outlierTrigger) {
super(cooldownSecs, outlierTrigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
@EqualsAndHashCode
public class WorkerAssignments {

private int stage;
private final int stage;
private int numWorkers;
private int activeWorkers;
private Map<Integer, WorkerHost> hosts; // lookup by workerNumber
private final Map<Integer, WorkerHost> hosts; // lookup by workerNumber

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class WorkerOutlier {
private final Observer<DataPoint> observer = new SerializedObserver<>(subject);
private final long cooldownSecs;
private final Action1<Integer> outlierTrigger;
private final long minDataPoints = 16;
private final long maxDataPoints = 20;
private long lastTriggeredAt = 0L;
private long minDataPoints = 16;
private long maxDataPoints = 20;
public WorkerOutlier(long cooldownSecs, Action1<Integer> outlierTrigger) {
this.cooldownSecs = cooldownSecs;
if (outlierTrigger == null)
Expand All @@ -52,40 +52,33 @@ private void start() {
final Map<Integer, Double> values = new HashMap<>();
final Map<Integer, List<Boolean>> isOutlierMap = new HashMap<>();
subject
.doOnNext(new Action1<DataPoint>() {
@Override
public void call(DataPoint dataPoint) {
values.put(dataPoint.index, dataPoint.value);
final int currSize = values.size();
if (currSize > dataPoint.numWorkers) {
for (int i = dataPoint.numWorkers; i < currSize; i++) {
values.remove(i);
isOutlierMap.remove(i);
}
}
SimpleStats simpleStats = new SimpleStats(values.values());
List<Boolean> booleans = isOutlierMap.get(dataPoint.index);
if (booleans == null) {
booleans = new ArrayList<>();
isOutlierMap.put(dataPoint.index, booleans);
.doOnNext(dataPoint -> {
values.put(dataPoint.index, dataPoint.value);
final int currSize = values.size();
if (currSize > dataPoint.numWorkers) {
for (int i = dataPoint.numWorkers; i < currSize; i++) {
values.remove(i);
isOutlierMap.remove(i);
}
if (booleans.size() >= maxDataPoints) // for now hard code to 20 items
booleans.remove(0);
booleans.add(dataPoint.value > simpleStats.getOutlierThreshold());
if ((System.currentTimeMillis() - lastTriggeredAt) > cooldownSecs * 1000) {
if (booleans.size() > minDataPoints) {
int total = 0;
int outlierCnt = 0;
for (boolean b : booleans) {
total++;
if (b)
outlierCnt++;
}
if (outlierCnt > (Math.round((double) total * 0.7))) { // again, hardcode for now
outlierTrigger.call(dataPoint.index);
lastTriggeredAt = System.currentTimeMillis();
booleans.clear();
}
}
SimpleStats simpleStats = new SimpleStats(values.values());
List<Boolean> booleans = isOutlierMap.computeIfAbsent(dataPoint.index, k -> new ArrayList<>());
if (booleans.size() >= maxDataPoints) // for now hard code to 20 items
booleans.remove(0);
booleans.add(dataPoint.value > simpleStats.getOutlierThreshold());
if ((System.currentTimeMillis() - lastTriggeredAt) > cooldownSecs * 1000) {
if (booleans.size() > minDataPoints) {
int total = 0;
int outlierCnt = 0;
for (boolean b : booleans) {
total++;
if (b)
outlierCnt++;
}
if (outlierCnt > (Math.round((double) total * 0.7))) { // again, hardcode for now
outlierTrigger.call(dataPoint.index);
lastTriggeredAt = System.currentTimeMillis();
booleans.clear();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public class WorkerTopologyInfo {

public static class Data {

private String jobName;
private String JobId;
private int workerIndex;
private int workerNumber;
private int stageNumber;
private int numStages;
private final String jobName;
private final String JobId;
private final int workerIndex;
private final int workerNumber;
private final int stageNumber;
private final int numStages;
private int prevStageInitialNumWorkers = -1;
private int nextStageInitialNumWorkers = -1;
private int metricsPort;
Expand Down Expand Up @@ -139,7 +139,7 @@ public Map<String, String> getEnvVars() {

public static class Reader {

private static Data data;
private static final Data data;

static {
data = new Data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

public class WrappedExecuteStageRequest {

private PublishSubject<Boolean> requestSubject;
private ExecuteStageRequest request;
private final PublishSubject<Boolean> requestSubject;
private final ExecuteStageRequest request;

public WrappedExecuteStageRequest(PublishSubject<Boolean> subject, ExecuteStageRequest request) {
this.requestSubject = subject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

public class DefaultObjectMapper extends ObjectMapper {

private static DefaultObjectMapper INSTANCE = new DefaultObjectMapper();
private static final DefaultObjectMapper INSTANCE = new DefaultObjectMapper();

private DefaultObjectMapper() {
this(null);
}

public DefaultObjectMapper(JsonFactory factory) {
super(factory);
SimpleModule serializerModule = new SimpleModule("Mantis Default JSON Serializer", new Version(1, 0, 0, null));
SimpleModule serializerModule = new SimpleModule("Mantis Default JSON Serializer", new Version(1, 0, 0, null, null, null));
registerModule(serializerModule);

configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import io.mantisrx.shaded.org.apache.curator.framework.CuratorFramework;
import io.mantisrx.shaded.org.apache.curator.framework.api.BackgroundCallback;
import io.mantisrx.shaded.org.apache.curator.framework.api.CuratorEvent;
import io.mantisrx.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import io.mantisrx.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,12 +54,7 @@ public ZookeeperMasterMonitor(CuratorFramework curator, String masterPath) {

@Override
public void startUp() throws Exception {
nodeMonitor.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
retrieveMaster();
}
});
nodeMonitor.getListenable().addListener(this::retrieveMaster);

nodeMonitor.start(true);

Expand All @@ -89,18 +81,13 @@ private void retrieveMaster() {
.inBackground(
curator
.getData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
onMasterNodeUpdated(event.getData());
}
})
.inBackground((client, event) -> onMasterNodeUpdated(event.getData()))
.forPath(masterPath)
)
.forPath(masterPath);

} catch (Exception e) {
logger.error("Failed to retrieve updated master information: " + e.getMessage(), e);
logger.error("Failed to retrieve updated master information: {}", e.getMessage(), e);
}

}
Expand All @@ -112,7 +99,7 @@ public Observable<MasterDescription> getMasterObservable() {

/**
*
* @return
* @return the latest master description, or null if the master monitor is not running.
*/
@Override
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

public class MetricsPublisherService implements Service {

private MetricsPublisher publisher;
private int publishFrequency;
private Map<String, String> commonTags = new HashMap<>();
private final MetricsPublisher publisher;
private final int publishFrequency;
private final Map<String, String> commonTags = new HashMap<>();

public MetricsPublisherService(MetricsPublisher publisher, int publishFrequency,
Map<String, String> commonTags) {
Expand All @@ -36,7 +36,7 @@ public MetricsPublisherService(MetricsPublisher publisher, int publishFrequency,
}

public MetricsPublisherService(MetricsPublisher publisher, int publishFrequency) {
this(publisher, publishFrequency, new HashMap<String, String>());
this(publisher, publishFrequency, new HashMap<>());
}

@Override
Expand Down
Loading