Skip to content

Commit

Permalink
Merge pull request apache#52 from liyinan926/master
Browse files Browse the repository at this point in the history
Fixed the job configuration file monitor
  • Loading branch information
liyinan926 committed Mar 18, 2015
2 parents 8b8b3c3 + ec1e22e commit b40904e
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
Expand All @@ -28,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
Expand Down Expand Up @@ -491,7 +491,7 @@ public int compare(FileStatus fileStatus1, FileStatus fileStatus2) {
* Schedule locally configured Gobblin jobs.
*/
private void scheduleLocallyConfiguredJobs()
throws IOException, JobException {
throws ConfigurationException, JobException {
LOG.info("Scheduling locally configured jobs");
for (Properties jobProps : loadLocalJobConfigs()) {
boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false"));
Expand All @@ -503,7 +503,7 @@ private void scheduleLocallyConfiguredJobs()
* Load local job configurations.
*/
private List<Properties> loadLocalJobConfigs()
throws IOException {
throws ConfigurationException {
List<Properties> jobConfigs = SchedulerUtils.loadJobConfigs(this.properties);
LOG.info(String.format(jobConfigs.size() <= 1 ? "Loaded %d job configuration" : "Loaded %d job configurations",
jobConfigs.size()));
Expand Down
120 changes: 70 additions & 50 deletions gobblin-scheduler/src/main/java/gobblin/scheduler/JobScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
package gobblin.scheduler;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.quartz.CronScheduleBuilder;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
Expand All @@ -50,6 +47,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractIdleService;

import gobblin.configuration.ConfigurationKeys;
Expand Down Expand Up @@ -273,11 +271,20 @@ public void runJob(Properties jobProps, JobListener jobListener)
}
}

/**
* Get the names of the scheduled jobs.
*
* @return names of the scheduled jobs
*/
public Collection<String> getScheduledJobs() {
return this.scheduledJobs.keySet();
}

/**
* Schedule locally configured Gobblin jobs.
*/
private void scheduleLocallyConfiguredJobs()
throws IOException, JobException {
throws ConfigurationException, JobException {
LOG.info("Scheduling locally configured jobs");
for (Properties jobProps : loadLocalJobConfigs()) {
boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false"));
Expand All @@ -289,7 +296,7 @@ private void scheduleLocallyConfiguredJobs()
* Load local job configurations.
*/
private List<Properties> loadLocalJobConfigs()
throws IOException {
throws ConfigurationException {
List<Properties> jobConfigs = SchedulerUtils.loadJobConfigs(this.properties);
LOG.info(String.format(jobConfigs.size() <= 1 ? "Loaded %d job configuration" : "Loaded %d job configurations",
jobConfigs.size()));
Expand All @@ -301,40 +308,47 @@ private List<Properties> loadLocalJobConfigs()
* Start the job configuration file monitor.
*
* <p>
* The job configuration file monitor currently only supports monitoring
* newly added job configuration files.
* The job configuration file monitor currently only supports monitoring the following types of changes:
*
* <ul>
* <li>New job configuration files.</li>
* <li>Changes to existing job configuration files.</li>
* <li>Changes to existing common properties file with a .properties extension.</li>
* </ul>
* </p>
*
* <p>
* This monitor has one limitation: in case more than one file including at least one common properties
* file are changed between two adjacent checks, the reloading of affected job configuration files may
* be intermixed and applied in an order that is not desirable. This is because the order the listener
* is called on the changes is not controlled by Gobblin, but instead by the monitor itself.
* </p>
*/
private void startJobConfigFileMonitor()
throws Exception {
File jobConfigFileDir = new File(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY));
FileAlterationObserver observer = new FileAlterationObserver(jobConfigFileDir);
final File jobConfigFileDir = new File(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY));
FileAlterationListener listener = new FileAlterationListenerAdaptor() {
/**
* Called when a new job configuration file is dropped in.
*/
@Override
public void onFileCreate(File file) {
int pos = file.getName().lastIndexOf(".");
String fileExtension = pos >= 0 ? file.getName().substring(pos + 1) : "";
String fileExtension = Files.getFileExtension(file.getName());
if (!jobConfigFileExtensions.contains(fileExtension)) {
// Not a job configuration file, ignore.
return;
}

LOG.info("Detected new job configuration file " + file.getAbsolutePath());
Properties jobProps = new Properties();
// First add framework configuration properties
jobProps.putAll(properties);
// Then load job configuration properties from the new job configuration file
loadJobConfig(jobProps, file);

// Schedule the new job
// Load the new job configuration and schedule the new job
try {
LOG.info("Detected new job configuration file " + file.getAbsolutePath());
Properties jobProps = SchedulerUtils.loadJobConfig(properties, file, jobConfigFileDir);
boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false"));
scheduleJob(jobProps, runOnce ? new RunOnceJobListener() : new EmailNotificationJobListener());
} catch (Throwable t) {
LOG.error("Failed to schedule new job loaded from job configuration file " + file.getAbsolutePath(), t);
} catch (ConfigurationException ce) {
LOG.error("Failed to load from job configuration file " + file.getAbsolutePath(), ce);
} catch (JobException je) {
LOG.error("Failed to schedule new job loaded from job configuration file " + file.getAbsolutePath(), je);
}
}

Expand All @@ -343,45 +357,51 @@ public void onFileCreate(File file) {
*/
@Override
public void onFileChange(File file) {
int pos = file.getName().lastIndexOf(".");
String fileExtension = pos >= 0 ? file.getName().substring(pos + 1) : "";
String fileExtension = Files.getFileExtension(file.getName());
if (fileExtension.equalsIgnoreCase(SchedulerUtils.JOB_PROPS_FILE_EXTENSION)) {
LOG.info("Detected change to common properties file " + file.getAbsolutePath());
try {
for (Properties jobProps : SchedulerUtils.loadJobConfigs(properties, file, jobConfigFileDir)) {
try {
rescheduleJob(jobProps);
} catch (JobException je) {
LOG.error("Failed to reschedule job reloaded from job configuration file " + jobProps
.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY), je);
}
}
} catch (ConfigurationException ce) {
LOG.error("Failed to reload job configuration files affected by changes to " + file.getAbsolutePath(), ce);
}
return;
}

if (!jobConfigFileExtensions.contains(fileExtension)) {
// Not a job configuration file, ignore.
return;
}

LOG.info("Detected change to job configuration file " + file.getAbsolutePath());
Properties jobProps = new Properties();
// First add framework configuration properties
jobProps.putAll(properties);
// Then load the updated job configuration properties
loadJobConfig(jobProps, file);

String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
try {
// First unschedule and delete the old job
unscheduleJob(jobName);
boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false"));
// Reschedule the job with the new job configuration
scheduleJob(jobProps, runOnce ? new RunOnceJobListener() : new EmailNotificationJobListener());
} catch (Throwable t) {
LOG.error("Failed to update existing job " + jobName, t);
LOG.info("Detected change to job configuration file " + file.getAbsolutePath());
Properties jobProps = SchedulerUtils.loadJobConfig(properties, file, jobConfigFileDir);
rescheduleJob(jobProps);
} catch (ConfigurationException ce) {
LOG.error("Failed to reload from job configuration file " + file.getAbsolutePath(), ce);
} catch (JobException je) {
LOG.error("Failed to reschedule job reloaded from job configuration file " + file.getAbsolutePath(), je);
}
}

private void loadJobConfig(Properties jobProps, File file) {
try {
jobProps.load(new InputStreamReader(new FileInputStream(file), Charset.forName(
ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
jobProps.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, file.getAbsolutePath());
} catch (Exception e) {
LOG.error("Failed to load job configuration from file " + file.getAbsolutePath(), e);
}
private void rescheduleJob(Properties jobProps) throws JobException {
String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
// First unschedule and delete the old job
unscheduleJob(jobName);
boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false"));
// Reschedule the job with the new job configuration
scheduleJob(jobProps, runOnce ? new RunOnceJobListener() : new EmailNotificationJobListener());
}
};

observer.addListener(listener);
this.fileAlterationMonitor.addObserver(observer);
SchedulerUtils.addFileAlterationObserver(this.fileAlterationMonitor, listener, jobConfigFileDir);
this.fileAlterationMonitor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@
import com.google.common.util.concurrent.ServiceManager;

import gobblin.configuration.ConfigurationKeys;
import gobblin.runtime.TaskExecutor;
import gobblin.runtime.TaskStateTracker;
import gobblin.runtime.WorkUnitManager;
import gobblin.runtime.local.LocalJobManager;
import gobblin.runtime.local.LocalTaskStateTracker;


/**
* Unit tests for the job configuration file monitor in {@link LocalJobManager}.
* Unit tests for the job configuration file monitor in {@link gobblin.scheduler.JobScheduler}.
*
* @author ynli
*/
Expand All @@ -47,7 +42,7 @@ public class JobConfigFileMonitorTest {
private static final String JOB_CONFIG_FILE_DIR = "gobblin-test/resource/job-conf";

private ServiceManager serviceManager;
private LocalJobManager jobManager;
private JobScheduler jobScheduler;
private File newJobConfigFile;

@BeforeClass
Expand All @@ -59,16 +54,8 @@ public void setUp()
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL_KEY, "1000");
properties.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");

TaskExecutor taskExecutor = new TaskExecutor(properties);
TaskStateTracker taskStateTracker = new LocalTaskStateTracker(properties, taskExecutor);
WorkUnitManager workUnitManager = new WorkUnitManager(taskExecutor, taskStateTracker);
this.jobManager = new LocalJobManager(workUnitManager, properties);
((LocalTaskStateTracker) taskStateTracker).setJobManager(this.jobManager);

this.serviceManager = new ServiceManager(Lists.newArrayList(
// The order matters due to dependencies between services
taskExecutor, taskStateTracker, workUnitManager, this.jobManager));

this.jobScheduler = new JobScheduler(properties);
this.serviceManager = new ServiceManager(Lists.newArrayList(this.jobScheduler));
this.serviceManager.startAsync();
}

Expand All @@ -77,7 +64,7 @@ public void testAddNewJobConfigFile()
throws Exception {
Thread.sleep(2000);

Assert.assertEquals(this.jobManager.getScheduledJobs().size(), 3);
Assert.assertEquals(this.jobScheduler.getScheduledJobs().size(), 3);

// Create a new job configuration file by making a copy of an existing
// one and giving a different job name
Expand All @@ -89,7 +76,7 @@ public void testAddNewJobConfigFile()

Thread.sleep(2000);

Set<String> jobNames = Sets.newHashSet(this.jobManager.getScheduledJobs());
Set<String> jobNames = Sets.newHashSet(this.jobScheduler.getScheduledJobs());
Assert.assertEquals(jobNames.size(), 4);
Assert.assertTrue(jobNames.contains("GobblinTest1"));
Assert.assertTrue(jobNames.contains("GobblinTest2"));
Expand All @@ -101,7 +88,7 @@ public void testAddNewJobConfigFile()
@Test(dependsOnMethods = {"testAddNewJobConfigFile"})
public void testChangeJobConfigFile()
throws Exception {
Assert.assertEquals(this.jobManager.getScheduledJobs().size(), 4);
Assert.assertEquals(this.jobScheduler.getScheduledJobs().size(), 4);

// Make a change to the new job configuration file
Properties jobProps = new Properties();
Expand All @@ -111,7 +98,7 @@ public void testChangeJobConfigFile()

Thread.sleep(2000);

Set<String> jobNames = Sets.newHashSet(this.jobManager.getScheduledJobs());
Set<String> jobNames = Sets.newHashSet(this.jobScheduler.getScheduledJobs());
Assert.assertEquals(jobNames.size(), 4);
Assert.assertTrue(jobNames.contains("GobblinTest1"));
Assert.assertTrue(jobNames.contains("GobblinTest2"));
Expand All @@ -123,7 +110,7 @@ public void testChangeJobConfigFile()
@Test(dependsOnMethods = {"testChangeJobConfigFile"})
public void testUnscheduleJob()
throws Exception {
Assert.assertEquals(this.jobManager.getScheduledJobs().size(), 4);
Assert.assertEquals(this.jobScheduler.getScheduledJobs().size(), 4);

// Disable the new job by setting job.disabled=true
Properties jobProps = new Properties();
Expand All @@ -133,7 +120,7 @@ public void testUnscheduleJob()

Thread.sleep(2000);

Set<String> jobNames = Sets.newHashSet(this.jobManager.getScheduledJobs());
Set<String> jobNames = Sets.newHashSet(this.jobScheduler.getScheduledJobs());
Assert.assertEquals(jobNames.size(), 3);
Assert.assertTrue(jobNames.contains("GobblinTest1"));
Assert.assertTrue(jobNames.contains("GobblinTest2"));
Expand Down
1 change: 1 addition & 0 deletions gobblin-utility/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {

compile externalDependency.commonsConfiguration
compile externalDependency.commonsEmail
compile externalDependency.commonsIO
compile externalDependency.commonsLang
compile externalDependency.guava
compile externalDependency.slf4j
Expand Down
Loading

0 comments on commit b40904e

Please sign in to comment.