diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java index b2472cd6e..e91c69772 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java @@ -36,6 +36,7 @@ import io.mantisrx.master.events.LifecycleEventsProto; import io.mantisrx.master.jobcluster.WorkerInfoListHolder; import io.mantisrx.master.jobcluster.job.worker.*; +import io.mantisrx.master.jobcluster.job.worker.WorkerManagerLogger; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.*; import io.mantisrx.master.jobcluster.proto.JobClusterProto; @@ -147,6 +148,7 @@ public class JobActor extends AbstractActorWithTimers implements IMantisJobManag private boolean hasJobMaster; private volatile boolean allWorkersCompleted = false; + /** * Used by the JobCluster Actor to create this Job Actor. * @@ -1286,6 +1288,8 @@ class WorkerManager implements IWorkerManager { .build(); private volatile boolean stageAssignmentPotentiallyChanged; + private WorkerManagerLogger workerLogger; + /** * Creates an instance of this class. * @@ -1362,6 +1366,9 @@ private void initializeRunningWorkers() { for (JobWorker worker : stageMeta.getAllWorkers()) { IMantisWorkerMetadata wm = worker.getMetadata(); + // Initializing workerLogger + this.workerLogger = new WorkerManagerLogger(wm.getWorkerId()); + if (WorkerState.isRunningState(wm.getState())) { // send fake heartbeat try { @@ -1413,7 +1420,10 @@ private void initializeRunningWorkers() { for (JobWorker jobWorker : workersToResubmit) { LOGGER.warn("discovered workers with missing ports during initialization: {}", jobWorker); try { + WorkerManagerLogger oldWorkerLogger = this.workerLogger; + this.workerLogger = new WorkerManagerLogger(jobWorker.getMetadata().getWorkerId()); resubmitWorker(jobWorker); + this.workerLogger = oldWorkerLogger; } catch (Exception e) { LOGGER.warn("Exception resubmitting worker {} during initializeRunningWorkers due to {}", jobWorker, e.getMessage(), e); @@ -1551,7 +1561,8 @@ private void submitInitialWorkers() throws Exception { private void queueTask(final IMantisWorkerMetadata workerRequest, final Optional readyAt) { final ScheduleRequest schedulingRequest = createSchedulingRequest(workerRequest, readyAt); - LOGGER.info("Queueing up scheduling request {} ", schedulingRequest); +// LOGGER.info("Queueing up scheduling request {} ", schedulingRequest); + workerLogger.info("Queueing up scheduling request "+ schedulingRequest); try { scheduler.scheduleWorker(schedulingRequest); } catch (Exception e) { @@ -1572,6 +1583,8 @@ private ScheduleRequest createSchedulingRequest( final WorkerId workerId = workerRequest.getWorkerId(); + this.workerLogger = new WorkerManagerLogger(workerId); + // setup constraints final List hardConstraints = new ArrayList<>(); final List softConstraints = new ArrayList<>(); @@ -2201,7 +2214,8 @@ public BehaviorSubject getJobStatusSubject() { } private void resubmitWorker(JobWorker oldWorker) throws Exception { - LOGGER.info("Resubmitting worker {}", oldWorker.getMetadata()); +// LOGGER.info("Resubmitting worker {}", oldWorker.getMetadata()); + workerLogger.info("Resubmitting worker "+oldWorker.getMetadata()); Map workerToStageMap = mantisJobMetaData.getWorkerNumberToStageMap(); IMantisWorkerMetadata oldWorkerMetadata = oldWorker.getMetadata(); @@ -2250,9 +2264,12 @@ private void resubmitWorker(JobWorker oldWorker) throws Exception { Optional delayDuration = of(workerResubmitTime); // publish a refresh before enqueuing new Task to Scheduler markStageAssignmentsChanged(true); + + this.workerLogger = new WorkerManagerLogger(newWorker.getMetadata().getWorkerId()); // queue the new worker for execution queueTask(newWorker.getMetadata(), delayDuration); - LOGGER.info("Worker {} successfully queued for scheduling", newWorker); +// LOGGER.info("Worker {} successfully queued for scheduling", newWorker); + workerLogger.info(String.format("Worker %s successfully queued for scheduling",newWorker)); numWorkerResubmissions.increment(); } else { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/worker/WorkerManagerLogger.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/worker/WorkerManagerLogger.java new file mode 100644 index 000000000..35d2aa9a8 --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/worker/WorkerManagerLogger.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.master.jobcluster.job.worker; + +import io.mantisrx.server.core.domain.WorkerId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WorkerManagerLogger { + private static WorkerId workerId; + private static final Logger logger = LoggerFactory.getLogger(WorkerManagerLogger.class); + + public WorkerManagerLogger(WorkerId workerId){ + this.workerId = workerId; + } + + public void info(){ + logger.info("WorkerID = {}",workerId); + } + + public void info(String message){ + logger.info("WorkerID: {} -- {}",workerId,message); + } + +} diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateJobDescriptorFile.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateJobDescriptorFile.java index cafab1fc6..d39084059 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateJobDescriptorFile.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateJobDescriptorFile.java @@ -110,7 +110,7 @@ public void execute() throws CommandException { } } return true; - }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); parameterInfo.putAll(sysParams); // create source/sink info diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateZipFile.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateZipFile.java index 96bd5c7fe..e49abbf1a 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateZipFile.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/CreateZipFile.java @@ -29,9 +29,9 @@ public class CreateZipFile implements Command { - private File jobJarFile; - private File jobDescriptor; - private File zipfileName; + private final File jobJarFile; + private final File jobDescriptor; + private final File zipfileName; public CreateZipFile(File zipfileName, File jobJarFile, File jobDescriptor) { @@ -45,7 +45,7 @@ private void readBytesFromFile(File file, ZipOutputStream os) throws CommandExce try { is = new BufferedInputStream(Files.newInputStream(Paths.get(file.toURI()))); byte[] in = new byte[1024]; - int bytesRead = -1; + int bytesRead; while ((bytesRead = is.read(in)) > 0) { os.write(in, 0, bytesRead); } @@ -53,7 +53,9 @@ private void readBytesFromFile(File file, ZipOutputStream os) throws CommandExce throw new CommandException(e); } finally { try { - is.close(); + if (is != null) { + is.close(); + } } catch (IOException e) { throw new CommandException(e); } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreate.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreate.java index 12e508a12..62f543b4a 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreate.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreate.java @@ -22,11 +22,11 @@ public class LoadValidateCreate implements Command { - private String jobJarFile; - private String artifactName; - private String version; - private String project; - private String outputLocation; + private final String jobJarFile; + private final String artifactName; + private final String version; + private final String project; + private final String outputLocation; public LoadValidateCreate(String jobJarFile, String artifactName, String version, String outputLocation, diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreateDir.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreateDir.java index 185063cd0..ff592ac06 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreateDir.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/LoadValidateCreateDir.java @@ -22,7 +22,7 @@ public class LoadValidateCreateDir implements Command { - private String jarPath; + private final String jarPath; public LoadValidateCreateDir(String jarPath) { this.jarPath = jarPath; @@ -96,7 +96,7 @@ public void execute() throws CommandException { File jobDescriptor = new File(fileLoop.getParent() + "/" + jsonFile); new CreateJobDescriptorFile(job, jobDescriptor, fileVersion, fileBase).execute(); } catch (Exception e) { - System.out.println("Got an error " + e.toString()); + System.out.println("Got an error " + e); System.exit(1); } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromJar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromJar.java index beaeef49e..3ddb54379 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromJar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromJar.java @@ -30,7 +30,7 @@ public class ReadJobFromJar implements Command { - private String jobJarFile; + private final String jobJarFile; @SuppressWarnings("rawtypes") private Job job; diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromZip.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromZip.java index 4a8fd4b94..9bdc5311e 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromZip.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ReadJobFromZip.java @@ -31,10 +31,10 @@ public class ReadJobFromZip implements Command { - private static Logger logger = LoggerFactory.getLogger(ReadJobFromZip.class); - private String jobZipFile; - private String artifactName; - private String version; + private static final Logger logger = LoggerFactory.getLogger(ReadJobFromZip.class); + private final String jobZipFile; + private final String artifactName; + private final String version; @SuppressWarnings("rawtypes") private Job job; diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ValidateJob.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ValidateJob.java index a684f2870..0b47a8feb 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ValidateJob.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/command/ValidateJob.java @@ -26,7 +26,7 @@ public class ValidateJob implements Command { @SuppressWarnings("rawtypes") - private Job job; + private final Job job; @SuppressWarnings("rawtypes") public ValidateJob(Job job) {