Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobActor::WorkerManager's logs contain the WorkerID that they correspond to #411

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.WorkerInfoListHolder;
import io.mantisrx.master.jobcluster.job.worker.*;
import io.mantisrx.master.jobcluster.job.worker.WorkerManagerLogger;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.*;
import io.mantisrx.master.jobcluster.proto.JobClusterProto;
Expand Down Expand Up @@ -147,6 +148,7 @@ public class JobActor extends AbstractActorWithTimers implements IMantisJobManag
private boolean hasJobMaster;
private volatile boolean allWorkersCompleted = false;


/**
* Used by the JobCluster Actor to create this Job Actor.
*
Expand Down Expand Up @@ -1286,6 +1288,8 @@ class WorkerManager implements IWorkerManager {
.build();
private volatile boolean stageAssignmentPotentiallyChanged;

private WorkerManagerLogger workerLogger;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing extra lifecycle for these loggers doesn't seem to provide enough benefits. Adding the extra metadata to the logger message is sufficient enough.

Copy link
Author

@GautamGottipati GautamGottipati Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Andyz26 can you please give me some hints, like what metadata I could add and how. In the issue it was mentioned to implement something like this workerManager.info("Whatever"); where by default workerId should be added. Taking reference from issue #384

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is about jobId at the job actor level and that is a different structure comparing to this workerManager. Here WorkerManager is not 1 to 1 mapping with a specific worker logically. There is no need to maintain a worker-level logger as long as the information needed is logged.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you mean, I can directly add worker Id to Logger.info, where ever worker id is not mentioned.


/**
* Creates an instance of this class.
*
Expand Down Expand Up @@ -1362,6 +1366,9 @@ private void initializeRunningWorkers() {
for (JobWorker worker : stageMeta.getAllWorkers()) {
IMantisWorkerMetadata wm = worker.getMetadata();

// Initializing workerLogger
this.workerLogger = new WorkerManagerLogger(wm.getWorkerId());

if (WorkerState.isRunningState(wm.getState())) {
// send fake heartbeat
try {
Expand Down Expand Up @@ -1413,7 +1420,10 @@ private void initializeRunningWorkers() {
for (JobWorker jobWorker : workersToResubmit) {
LOGGER.warn("discovered workers with missing ports during initialization: {}", jobWorker);
try {
WorkerManagerLogger oldWorkerLogger = this.workerLogger;
this.workerLogger = new WorkerManagerLogger(jobWorker.getMetadata().getWorkerId());
resubmitWorker(jobWorker);
this.workerLogger = oldWorkerLogger;
} catch (Exception e) {
LOGGER.warn("Exception resubmitting worker {} during initializeRunningWorkers due to {}",
jobWorker, e.getMessage(), e);
Expand Down Expand Up @@ -1551,7 +1561,8 @@ private void submitInitialWorkers() throws Exception {

private void queueTask(final IMantisWorkerMetadata workerRequest, final Optional<Long> readyAt) {
final ScheduleRequest schedulingRequest = createSchedulingRequest(workerRequest, readyAt);
LOGGER.info("Queueing up scheduling request {} ", schedulingRequest);
// LOGGER.info("Queueing up scheduling request {} ", schedulingRequest);
workerLogger.info("Queueing up scheduling request "+ schedulingRequest);
try {
scheduler.scheduleWorker(schedulingRequest);
} catch (Exception e) {
Expand All @@ -1572,6 +1583,8 @@ private ScheduleRequest createSchedulingRequest(

final WorkerId workerId = workerRequest.getWorkerId();

this.workerLogger = new WorkerManagerLogger(workerId);

// setup constraints
final List<ConstraintEvaluator> hardConstraints = new ArrayList<>();
final List<VMTaskFitnessCalculator> softConstraints = new ArrayList<>();
Expand Down Expand Up @@ -2201,7 +2214,8 @@ public BehaviorSubject<JobSchedulingInfo> getJobStatusSubject() {
}

private void resubmitWorker(JobWorker oldWorker) throws Exception {
LOGGER.info("Resubmitting worker {}", oldWorker.getMetadata());
// LOGGER.info("Resubmitting worker {}", oldWorker.getMetadata());
workerLogger.info("Resubmitting worker "+oldWorker.getMetadata());
Map<Integer, Integer> workerToStageMap = mantisJobMetaData.getWorkerNumberToStageMap();

IMantisWorkerMetadata oldWorkerMetadata = oldWorker.getMetadata();
Expand Down Expand Up @@ -2250,9 +2264,12 @@ private void resubmitWorker(JobWorker oldWorker) throws Exception {
Optional<Long> delayDuration = of(workerResubmitTime);
// publish a refresh before enqueuing new Task to Scheduler
markStageAssignmentsChanged(true);

this.workerLogger = new WorkerManagerLogger(newWorker.getMetadata().getWorkerId());
// queue the new worker for execution
queueTask(newWorker.getMetadata(), delayDuration);
LOGGER.info("Worker {} successfully queued for scheduling", newWorker);
// LOGGER.info("Worker {} successfully queued for scheduling", newWorker);
workerLogger.info(String.format("Worker %s successfully queued for scheduling",newWorker));
numWorkerResubmissions.increment();
} else {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.master.jobcluster.job.worker;

import io.mantisrx.server.core.domain.WorkerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerManagerLogger {
private static WorkerId workerId;
private static final Logger logger = LoggerFactory.getLogger(WorkerManagerLogger.class);

public WorkerManagerLogger(WorkerId workerId){
this.workerId = workerId;
}

public void info(){
logger.info("WorkerID = {}",workerId);
}

public void info(String message){
logger.info("WorkerID: {} -- {}",workerId,message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void execute() throws CommandException {
}
}
return true;
}).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

parameterInfo.putAll(sysParams);
// create source/sink info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@

public class CreateZipFile implements Command {

private File jobJarFile;
private File jobDescriptor;
private File zipfileName;
private final File jobJarFile;
private final File jobDescriptor;
private final File zipfileName;

public CreateZipFile(File zipfileName,
File jobJarFile, File jobDescriptor) {
Expand All @@ -45,15 +45,17 @@ private void readBytesFromFile(File file, ZipOutputStream os) throws CommandExce
try {
is = new BufferedInputStream(Files.newInputStream(Paths.get(file.toURI())));
byte[] in = new byte[1024];
int bytesRead = -1;
int bytesRead;
while ((bytesRead = is.read(in)) > 0) {
os.write(in, 0, bytesRead);
}
} catch (IOException e) {
throw new CommandException(e);
} finally {
try {
is.close();
if (is != null) {
is.close();
}
} catch (IOException e) {
throw new CommandException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

public class LoadValidateCreate implements Command {

private String jobJarFile;
private String artifactName;
private String version;
private String project;
private String outputLocation;
private final String jobJarFile;
private final String artifactName;
private final String version;
private final String project;
private final String outputLocation;

public LoadValidateCreate(String jobJarFile, String artifactName,
String version, String outputLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

public class LoadValidateCreateDir implements Command {

private String jarPath;
private final String jarPath;

public LoadValidateCreateDir(String jarPath) {
this.jarPath = jarPath;
Expand Down Expand Up @@ -96,7 +96,7 @@ public void execute() throws CommandException {
File jobDescriptor = new File(fileLoop.getParent() + "/" + jsonFile);
new CreateJobDescriptorFile(job, jobDescriptor, fileVersion, fileBase).execute();
} catch (Exception e) {
System.out.println("Got an error " + e.toString());
System.out.println("Got an error " + e);
System.exit(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class ReadJobFromJar implements Command {

private String jobJarFile;
private final String jobJarFile;
@SuppressWarnings("rawtypes")
private Job job;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

public class ReadJobFromZip implements Command {

private static Logger logger = LoggerFactory.getLogger(ReadJobFromZip.class);
private String jobZipFile;
private String artifactName;
private String version;
private static final Logger logger = LoggerFactory.getLogger(ReadJobFromZip.class);
private final String jobZipFile;
private final String artifactName;
private final String version;
@SuppressWarnings("rawtypes")
private Job job;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class ValidateJob implements Command {

@SuppressWarnings("rawtypes")
private Job job;
private final Job job;

@SuppressWarnings("rawtypes")
public ValidateJob(Job job) {
Expand Down