Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][vpj] Add a way to run DataWriter jobs in an isolated environment #1265

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/venice-push-job/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ dependencies {
implementation project(':clients:venice-thin-client') // Needed by the KME SchemaReader

implementation libraries.commonsIo
implementation libraries.commonsCli
implementation libraries.fastUtil
implementation libraries.jacksonCore
implementation libraries.jdom
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ public static void incrTotalPutOrDeleteRecordCount(Reporter reporter, long amoun
incrAmountWithGroupCounterName(reporter, TOTAL_PUT_OR_DELETE_COUNT_GROUP_COUNTER_NAME, amount);
}

public static long getEmptyRecordCount(Reporter reporter) {
return getCountWithGroupCounterName(reporter, EMPTY_RECORD_COUNTER_NAME);
}

public static long getWriteAclAuthorizationFailureCount(Reporter reporter) {
return getCountWithGroupCounterName(reporter, WRITE_ACL_FAILURE_GROUP_COUNTER_NAME);
}
Expand Down Expand Up @@ -214,10 +218,18 @@ public static long getOutputRecordsCount(Counters counters) {
return getCountFromCounters(counters, OUTPUT_RECORD_COUNT_GROUP_COUNTER_NAME);
}

public static long getEmptyRecordCount(Counters counters) {
return getCountFromCounters(counters, EMPTY_RECORD_COUNTER_NAME);
}

public static long getWriteAclAuthorizationFailureCount(Counters counters) {
return getCountFromCounters(counters, WRITE_ACL_FAILURE_GROUP_COUNTER_NAME);
}

public static long getDuplicateKeyWithIdenticalCount(Counters counters) {
return getCountFromCounters(counters, DUP_KEY_WITH_IDENTICAL_VALUE_GROUP_COUNTER_NAME);
}

public static long getDuplicateKeyWithDistinctCount(Counters counters) {
return getCountFromCounters(counters, DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ protected void runComputeJob() {
}
}

@Override
public VeniceProperties getJobProperties() {
return vpjProperties;
}

@Override
public PushJobSetting getPushJobSetting() {
return pushJobSetting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public long getSprayAllPartitionsCount() {
return MRJobCounterHelper.getMapperSprayAllPartitionsTriggeredCount(counters);
}

@Override
public long getEmptyRecordCount() {
return MRJobCounterHelper.getEmptyRecordCount(counters);
}

@Override
public long getTotalKeySize() {
return MRJobCounterHelper.getTotalKeySize(counters);
Expand Down Expand Up @@ -55,6 +60,11 @@ public long getWriteAclAuthorizationFailureCount() {
return MRJobCounterHelper.getWriteAclAuthorizationFailureCount(counters);
}

@Override
public long getDuplicateKeyWithIdenticalValueCount() {
return MRJobCounterHelper.getDuplicateKeyWithIdenticalCount(counters);
}

@Override
public long getDuplicateKeyWithDistinctValueCount() {
return MRJobCounterHelper.getDuplicateKeyWithDistinctCount(counters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ default long getSprayAllPartitionsCount() {
return 0;
}

default long getEmptyRecordCount() {
return 0;
}

default long getTotalKeySize() {
return 0;
}
Expand Down Expand Up @@ -84,6 +88,10 @@ default long getWriteAclAuthorizationFailureCount() {
return 0;
}

default long getDuplicateKeyWithIdenticalValueCount() {
return 0;
}

default long getDuplicateKeyWithDistinctValueCount() {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public boolean isTerminal() {

Throwable getFailureReason();

/**
* Return the job properties that were used to configure the job in {@link #configure(VeniceProperties)}
*/
VeniceProperties getJobProperties();

default void kill() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected void validateJob() {
protected abstract void runComputeJob();

@Override
public void configure(VeniceProperties properties) {
public final void configure(VeniceProperties properties) {
LOGGER.warn("Data writer compute job needs additional configs to be configured.");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.linkedin.venice.proxyjob.datawriter.jobs;

import static com.linkedin.venice.vpj.VenicePushJobConstants.PERMISSION_700;

import com.linkedin.venice.hadoop.PushJobSetting;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.hadoop.utils.HadoopUtils;
import com.linkedin.venice.jobs.DataWriterComputeJob;
import com.linkedin.venice.proxyjob.datawriter.task.DelegatingReadOnlyDataWriterTaskTracker;
import com.linkedin.venice.proxyjob.datawriter.utils.ProxyJobCliUtils;
import com.linkedin.venice.proxyjob.datawriter.utils.ProxyJobStatus;
import com.linkedin.venice.proxyjob.datawriter.utils.ProxyJobUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This will be the proxy job for the data writer job. It will launch the data writer job in an isolated environment,
* like a new process, or a remote execution environment. It will act as a proxy between the data writer job and the VPJ
* driver.
*/
public abstract class AbstractDataWriterProxyJob extends DataWriterComputeJob {
private static final Logger LOGGER = LogManager.getLogger(AbstractDataWriterProxyJob.class);
private VeniceProperties props;
private PushJobSetting pushJobSetting;
private DelegatingReadOnlyDataWriterTaskTracker taskTracker;

@Override
public DataWriterTaskTracker getTaskTracker() {
return taskTracker;
}

protected abstract void runProxyJob(List<String> args);

@Override
protected final void runComputeJob() {
String proxyJobStateDir = Utils.escapeFilePathComponent(Utils.getUniqueString("data_writer_proxy_job"));
Path proxyJobStateDirPath = new Path(pushJobSetting.jobTmpDir, proxyJobStateDir);
try {
HadoopUtils.createDirectoryWithPermission(proxyJobStateDirPath, PERMISSION_700);
List<String> args =
ProxyJobCliUtils.toCliArgs(props.toProperties(), pushJobSetting, proxyJobStateDirPath.toUri().toString());
runProxyJob(args);

ProxyJobStatus jobStatus = ProxyJobUtils.getJobStatus(proxyJobStateDirPath);
if (jobStatus == null) {
throw new RuntimeException("Failed to get job status from proxy job state directory: " + proxyJobStateDirPath);
}

taskTracker.setDelegate(jobStatus.getTaskTracker());
logJobMetrics(jobStatus.getTaskTracker());
if (jobStatus.getFailureReason() != null) {
throw new RuntimeException(jobStatus.getFailureReason());
}
} catch (IOException e) {
throw new RuntimeException("Failed to create proxy job state directory: " + proxyJobStateDirPath, e);
}
}

private void logJobMetrics(DataWriterTaskTracker taskTracker) {
LOGGER.info("Job metrics for data writer job:");
LOGGER.info(" Total Output Records: {}", taskTracker.getOutputRecordsCount());
LOGGER.info(" Empty Records: {}", taskTracker.getEmptyRecordCount());
LOGGER.info(" Total Key Size: {}", taskTracker.getTotalKeySize());
LOGGER.info(" Total Uncompressed Value Size: {}", taskTracker.getTotalUncompressedValueSize());
LOGGER.info(" Total Compressed Value Size: {}", taskTracker.getTotalValueSize());
LOGGER.info(" Total Gzip Compressed Value Size: {}", taskTracker.getTotalGzipCompressedValueSize());
LOGGER.info(" Total Zstd Compressed Value Size: {}", taskTracker.getTotalZstdCompressedValueSize());
LOGGER.info(" Spray All Partitions Triggered: {}", taskTracker.getSprayAllPartitionsCount());
LOGGER.info(" Partition Writers Closed: {}", taskTracker.getPartitionWriterCloseCount());
LOGGER.info(" Repush TTL Filtered Records: {}", taskTracker.getRepushTtlFilterCount());
LOGGER.info(" ACL Authorization Failures: {}", taskTracker.getWriteAclAuthorizationFailureCount());
LOGGER.info(" Record Too Large Failures: {}", taskTracker.getRecordTooLargeFailureCount());
LOGGER.info(" Duplicate Key With Identical Value: {}", taskTracker.getDuplicateKeyWithIdenticalValueCount());
LOGGER.info(" Duplicate Key With Distinct Value: {}", taskTracker.getDuplicateKeyWithDistinctValueCount());
}

@Override
public VeniceProperties getJobProperties() {
return props;
}

@Override
protected PushJobSetting getPushJobSetting() {
return pushJobSetting;
}

@Override
public void configure(VeniceProperties props, PushJobSetting pushJobSetting) {
this.props = props;
this.pushJobSetting = pushJobSetting;
this.taskTracker = new DelegatingReadOnlyDataWriterTaskTracker();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.venice.proxyjob.datawriter.jobs;

import static com.linkedin.venice.vpj.VenicePushJobConstants.DATA_WRITER_PROXY_COMPUTE_JOB_CLASS;

import com.linkedin.venice.hadoop.PushJobSetting;
import com.linkedin.venice.jobs.DataWriterComputeJob;
import com.linkedin.venice.proxyjob.datawriter.utils.ProxyJobCliUtils;
import com.linkedin.venice.proxyjob.datawriter.utils.ProxyJobStatus;
import com.linkedin.venice.proxyjob.datawriter.utils.ProxyJobUtils;
import com.linkedin.venice.utils.ReflectUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class DataWriterProxyDriver {
private static final Logger LOGGER = LogManager.getLogger(DataWriterProxyDriver.class);

public static void main(String[] args) throws Exception {
runDataWriterJob(args, true);
}

public static void runDataWriterJob(String[] args, boolean killFromShutdownHook) throws Exception {
CommandLine commandLine = ProxyJobCliUtils.parseArgs(args);
Properties properties = ProxyJobCliUtils.getPropertiesFromCliArgs(commandLine);
VeniceProperties props = new VeniceProperties(properties);

PushJobSetting pushJobSetting = ProxyJobCliUtils.getPushJobSettingFromCliArgs(commandLine);

String outputPathStr = ProxyJobCliUtils.getOutputDirFromCliArgs(commandLine);
Path outputPath = new Path(outputPathStr);

String computeJobClassName = props.getString(DATA_WRITER_PROXY_COMPUTE_JOB_CLASS);

Class objectClass = ReflectUtils.loadClass(computeJobClassName);
Validate.isAssignableFrom(DataWriterComputeJob.class, objectClass);
Class<? extends DataWriterComputeJob> computeJobClass = (Class<? extends DataWriterComputeJob>) objectClass;
DataWriterComputeJob computeJob = ReflectUtils.callConstructor(computeJobClass, new Class[0], new Object[0]);

computeJob.configure(props, pushJobSetting);

// A shutdown hook is needed to kill the compute job to handle the case where the job is killed by the environment.
Thread computeJobKiller = new Thread(() -> {
LOGGER.info("Killing compute job from shutdown hook");
try {
computeJob.kill();
} catch (Exception e) {
throw new RuntimeException(e);
}
});

if (killFromShutdownHook) {
Runtime.getRuntime().addShutdownHook(computeJobKiller);
}

computeJob.runJob();

if (killFromShutdownHook) {
// Job completed, remove the shutdown hook
Runtime.getRuntime().removeShutdownHook(computeJobKiller);
}

ProxyJobStatus status =
new ProxyJobStatus(computeJob.getTaskTracker(), computeJob.getStatus(), computeJob.getFailureReason());
ProxyJobUtils.writeJobStatus(outputPath, status);

Utils.closeQuietlyWithErrorLogged(computeJob);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.venice.proxyjob.datawriter.jobs;

import com.linkedin.venice.utils.ForkedJavaProcess;
import java.util.Collections;
import java.util.List;


public class ForkedProcessProxyJob extends AbstractDataWriterProxyJob {
@Override
protected void runProxyJob(List<String> args) {
try {
ForkedJavaProcess javaProcess =
ForkedJavaProcess.exec(DataWriterProxyDriver.class, args, Collections.emptyList());
javaProcess.waitFor();
} catch (Exception e) {
throw new RuntimeException("Failed to run data writer job", e);
}
}
}
Loading
Loading