diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/local/LocalJobManager.java b/gobblin-runtime/src/main/java/gobblin/runtime/local/LocalJobManager.java index 041ea62cbaa..5506d342109 100644 --- a/gobblin-runtime/src/main/java/gobblin/runtime/local/LocalJobManager.java +++ b/gobblin-runtime/src/main/java/gobblin/runtime/local/LocalJobManager.java @@ -13,7 +13,6 @@ import java.io.File; import java.io.FileInputStream; -import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Constructor; @@ -28,6 +27,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.monitor.FileAlterationListener; import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; import org.apache.commons.io.monitor.FileAlterationMonitor; @@ -491,7 +491,7 @@ public int compare(FileStatus fileStatus1, FileStatus fileStatus2) { * Schedule locally configured Gobblin jobs. */ private void scheduleLocallyConfiguredJobs() - throws IOException, JobException { + throws ConfigurationException, JobException { LOG.info("Scheduling locally configured jobs"); for (Properties jobProps : loadLocalJobConfigs()) { boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false")); @@ -503,7 +503,7 @@ private void scheduleLocallyConfiguredJobs() * Load local job configurations. */ private List loadLocalJobConfigs() - throws IOException { + throws ConfigurationException { List jobConfigs = SchedulerUtils.loadJobConfigs(this.properties); LOG.info(String.format(jobConfigs.size() <= 1 ? "Loaded %d job configuration" : "Loaded %d job configurations", jobConfigs.size())); diff --git a/gobblin-scheduler/src/main/java/gobblin/scheduler/JobScheduler.java b/gobblin-scheduler/src/main/java/gobblin/scheduler/JobScheduler.java index 1ab191e2d10..5c4e89480a2 100644 --- a/gobblin-scheduler/src/main/java/gobblin/scheduler/JobScheduler.java +++ b/gobblin-scheduler/src/main/java/gobblin/scheduler/JobScheduler.java @@ -12,10 +12,7 @@ package gobblin.scheduler; import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.Charset; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; @@ -23,10 +20,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.monitor.FileAlterationListener; import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; import org.apache.commons.io.monitor.FileAlterationMonitor; -import org.apache.commons.io.monitor.FileAlterationObserver; import org.quartz.CronScheduleBuilder; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; @@ -50,6 +47,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractIdleService; import gobblin.configuration.ConfigurationKeys; @@ -273,11 +271,20 @@ public void runJob(Properties jobProps, JobListener jobListener) } } + /** + * Get the names of the scheduled jobs. + * + * @return names of the scheduled jobs + */ + public Collection getScheduledJobs() { + return this.scheduledJobs.keySet(); + } + /** * Schedule locally configured Gobblin jobs. */ private void scheduleLocallyConfiguredJobs() - throws IOException, JobException { + throws ConfigurationException, JobException { LOG.info("Scheduling locally configured jobs"); for (Properties jobProps : loadLocalJobConfigs()) { boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false")); @@ -289,7 +296,7 @@ private void scheduleLocallyConfiguredJobs() * Load local job configurations. */ private List loadLocalJobConfigs() - throws IOException { + throws ConfigurationException { List jobConfigs = SchedulerUtils.loadJobConfigs(this.properties); LOG.info(String.format(jobConfigs.size() <= 1 ? "Loaded %d job configuration" : "Loaded %d job configurations", jobConfigs.size())); @@ -301,40 +308,47 @@ private List loadLocalJobConfigs() * Start the job configuration file monitor. * *

- * The job configuration file monitor currently only supports monitoring - * newly added job configuration files. + * The job configuration file monitor currently only supports monitoring the following types of changes: + * + *

    + *
  • New job configuration files.
  • + *
  • Changes to existing job configuration files.
  • + *
  • Changes to existing common properties file with a .properties extension.
  • + *
+ *

+ * + *

+ * This monitor has one limitation: in case more than one file including at least one common properties + * file are changed between two adjacent checks, the reloading of affected job configuration files may + * be intermixed and applied in an order that is not desirable. This is because the order the listener + * is called on the changes is not controlled by Gobblin, but instead by the monitor itself. *

*/ private void startJobConfigFileMonitor() throws Exception { - File jobConfigFileDir = new File(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)); - FileAlterationObserver observer = new FileAlterationObserver(jobConfigFileDir); + final File jobConfigFileDir = new File(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)); FileAlterationListener listener = new FileAlterationListenerAdaptor() { /** * Called when a new job configuration file is dropped in. */ @Override public void onFileCreate(File file) { - int pos = file.getName().lastIndexOf("."); - String fileExtension = pos >= 0 ? file.getName().substring(pos + 1) : ""; + String fileExtension = Files.getFileExtension(file.getName()); if (!jobConfigFileExtensions.contains(fileExtension)) { // Not a job configuration file, ignore. return; } - LOG.info("Detected new job configuration file " + file.getAbsolutePath()); - Properties jobProps = new Properties(); - // First add framework configuration properties - jobProps.putAll(properties); - // Then load job configuration properties from the new job configuration file - loadJobConfig(jobProps, file); - - // Schedule the new job + // Load the new job configuration and schedule the new job try { + LOG.info("Detected new job configuration file " + file.getAbsolutePath()); + Properties jobProps = SchedulerUtils.loadJobConfig(properties, file, jobConfigFileDir); boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false")); scheduleJob(jobProps, runOnce ? new RunOnceJobListener() : new EmailNotificationJobListener()); - } catch (Throwable t) { - LOG.error("Failed to schedule new job loaded from job configuration file " + file.getAbsolutePath(), t); + } catch (ConfigurationException ce) { + LOG.error("Failed to load from job configuration file " + file.getAbsolutePath(), ce); + } catch (JobException je) { + LOG.error("Failed to schedule new job loaded from job configuration file " + file.getAbsolutePath(), je); } } @@ -343,45 +357,51 @@ public void onFileCreate(File file) { */ @Override public void onFileChange(File file) { - int pos = file.getName().lastIndexOf("."); - String fileExtension = pos >= 0 ? file.getName().substring(pos + 1) : ""; + String fileExtension = Files.getFileExtension(file.getName()); + if (fileExtension.equalsIgnoreCase(SchedulerUtils.JOB_PROPS_FILE_EXTENSION)) { + LOG.info("Detected change to common properties file " + file.getAbsolutePath()); + try { + for (Properties jobProps : SchedulerUtils.loadJobConfigs(properties, file, jobConfigFileDir)) { + try { + rescheduleJob(jobProps); + } catch (JobException je) { + LOG.error("Failed to reschedule job reloaded from job configuration file " + jobProps + .getProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY), je); + } + } + } catch (ConfigurationException ce) { + LOG.error("Failed to reload job configuration files affected by changes to " + file.getAbsolutePath(), ce); + } + return; + } + if (!jobConfigFileExtensions.contains(fileExtension)) { // Not a job configuration file, ignore. return; } - LOG.info("Detected change to job configuration file " + file.getAbsolutePath()); - Properties jobProps = new Properties(); - // First add framework configuration properties - jobProps.putAll(properties); - // Then load the updated job configuration properties - loadJobConfig(jobProps, file); - - String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); try { - // First unschedule and delete the old job - unscheduleJob(jobName); - boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false")); - // Reschedule the job with the new job configuration - scheduleJob(jobProps, runOnce ? new RunOnceJobListener() : new EmailNotificationJobListener()); - } catch (Throwable t) { - LOG.error("Failed to update existing job " + jobName, t); + LOG.info("Detected change to job configuration file " + file.getAbsolutePath()); + Properties jobProps = SchedulerUtils.loadJobConfig(properties, file, jobConfigFileDir); + rescheduleJob(jobProps); + } catch (ConfigurationException ce) { + LOG.error("Failed to reload from job configuration file " + file.getAbsolutePath(), ce); + } catch (JobException je) { + LOG.error("Failed to reschedule job reloaded from job configuration file " + file.getAbsolutePath(), je); } } - private void loadJobConfig(Properties jobProps, File file) { - try { - jobProps.load(new InputStreamReader(new FileInputStream(file), Charset.forName( - ConfigurationKeys.DEFAULT_CHARSET_ENCODING))); - jobProps.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, file.getAbsolutePath()); - } catch (Exception e) { - LOG.error("Failed to load job configuration from file " + file.getAbsolutePath(), e); - } + private void rescheduleJob(Properties jobProps) throws JobException { + String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); + // First unschedule and delete the old job + unscheduleJob(jobName); + boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false")); + // Reschedule the job with the new job configuration + scheduleJob(jobProps, runOnce ? new RunOnceJobListener() : new EmailNotificationJobListener()); } }; - observer.addListener(listener); - this.fileAlterationMonitor.addObserver(observer); + SchedulerUtils.addFileAlterationObserver(this.fileAlterationMonitor, listener, jobConfigFileDir); this.fileAlterationMonitor.start(); } diff --git a/gobblin-scheduler/src/test/java/gobblin/scheduler/JobConfigFileMonitorTest.java b/gobblin-scheduler/src/test/java/gobblin/scheduler/JobConfigFileMonitorTest.java index daf02d2f445..807766db787 100644 --- a/gobblin-scheduler/src/test/java/gobblin/scheduler/JobConfigFileMonitorTest.java +++ b/gobblin-scheduler/src/test/java/gobblin/scheduler/JobConfigFileMonitorTest.java @@ -29,15 +29,10 @@ import com.google.common.util.concurrent.ServiceManager; import gobblin.configuration.ConfigurationKeys; -import gobblin.runtime.TaskExecutor; -import gobblin.runtime.TaskStateTracker; -import gobblin.runtime.WorkUnitManager; -import gobblin.runtime.local.LocalJobManager; -import gobblin.runtime.local.LocalTaskStateTracker; /** - * Unit tests for the job configuration file monitor in {@link LocalJobManager}. + * Unit tests for the job configuration file monitor in {@link gobblin.scheduler.JobScheduler}. * * @author ynli */ @@ -47,7 +42,7 @@ public class JobConfigFileMonitorTest { private static final String JOB_CONFIG_FILE_DIR = "gobblin-test/resource/job-conf"; private ServiceManager serviceManager; - private LocalJobManager jobManager; + private JobScheduler jobScheduler; private File newJobConfigFile; @BeforeClass @@ -59,16 +54,8 @@ public void setUp() properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL_KEY, "1000"); properties.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false"); - TaskExecutor taskExecutor = new TaskExecutor(properties); - TaskStateTracker taskStateTracker = new LocalTaskStateTracker(properties, taskExecutor); - WorkUnitManager workUnitManager = new WorkUnitManager(taskExecutor, taskStateTracker); - this.jobManager = new LocalJobManager(workUnitManager, properties); - ((LocalTaskStateTracker) taskStateTracker).setJobManager(this.jobManager); - - this.serviceManager = new ServiceManager(Lists.newArrayList( - // The order matters due to dependencies between services - taskExecutor, taskStateTracker, workUnitManager, this.jobManager)); - + this.jobScheduler = new JobScheduler(properties); + this.serviceManager = new ServiceManager(Lists.newArrayList(this.jobScheduler)); this.serviceManager.startAsync(); } @@ -77,7 +64,7 @@ public void testAddNewJobConfigFile() throws Exception { Thread.sleep(2000); - Assert.assertEquals(this.jobManager.getScheduledJobs().size(), 3); + Assert.assertEquals(this.jobScheduler.getScheduledJobs().size(), 3); // Create a new job configuration file by making a copy of an existing // one and giving a different job name @@ -89,7 +76,7 @@ public void testAddNewJobConfigFile() Thread.sleep(2000); - Set jobNames = Sets.newHashSet(this.jobManager.getScheduledJobs()); + Set jobNames = Sets.newHashSet(this.jobScheduler.getScheduledJobs()); Assert.assertEquals(jobNames.size(), 4); Assert.assertTrue(jobNames.contains("GobblinTest1")); Assert.assertTrue(jobNames.contains("GobblinTest2")); @@ -101,7 +88,7 @@ public void testAddNewJobConfigFile() @Test(dependsOnMethods = {"testAddNewJobConfigFile"}) public void testChangeJobConfigFile() throws Exception { - Assert.assertEquals(this.jobManager.getScheduledJobs().size(), 4); + Assert.assertEquals(this.jobScheduler.getScheduledJobs().size(), 4); // Make a change to the new job configuration file Properties jobProps = new Properties(); @@ -111,7 +98,7 @@ public void testChangeJobConfigFile() Thread.sleep(2000); - Set jobNames = Sets.newHashSet(this.jobManager.getScheduledJobs()); + Set jobNames = Sets.newHashSet(this.jobScheduler.getScheduledJobs()); Assert.assertEquals(jobNames.size(), 4); Assert.assertTrue(jobNames.contains("GobblinTest1")); Assert.assertTrue(jobNames.contains("GobblinTest2")); @@ -123,7 +110,7 @@ public void testChangeJobConfigFile() @Test(dependsOnMethods = {"testChangeJobConfigFile"}) public void testUnscheduleJob() throws Exception { - Assert.assertEquals(this.jobManager.getScheduledJobs().size(), 4); + Assert.assertEquals(this.jobScheduler.getScheduledJobs().size(), 4); // Disable the new job by setting job.disabled=true Properties jobProps = new Properties(); @@ -133,7 +120,7 @@ public void testUnscheduleJob() Thread.sleep(2000); - Set jobNames = Sets.newHashSet(this.jobManager.getScheduledJobs()); + Set jobNames = Sets.newHashSet(this.jobScheduler.getScheduledJobs()); Assert.assertEquals(jobNames.size(), 3); Assert.assertTrue(jobNames.contains("GobblinTest1")); Assert.assertTrue(jobNames.contains("GobblinTest2")); diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle index bb4079bf40b..0f22f28e243 100644 --- a/gobblin-utility/build.gradle +++ b/gobblin-utility/build.gradle @@ -16,6 +16,7 @@ dependencies { compile externalDependency.commonsConfiguration compile externalDependency.commonsEmail + compile externalDependency.commonsIO compile externalDependency.commonsLang compile externalDependency.guava compile externalDependency.slf4j diff --git a/gobblin-utility/src/main/java/gobblin/util/SchedulerUtils.java b/gobblin-utility/src/main/java/gobblin/util/SchedulerUtils.java index 5e38a4c491c..405c27adb06 100644 --- a/gobblin-utility/src/main/java/gobblin/util/SchedulerUtils.java +++ b/gobblin-utility/src/main/java/gobblin/util/SchedulerUtils.java @@ -12,17 +12,18 @@ package gobblin.util; import java.io.File; -import java.io.FileInputStream; +import java.io.FileFilter; import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.Charset; import java.util.List; import java.util.Properties; import java.util.Set; import org.apache.commons.configuration.ConfigurationConverter; +import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.io.monitor.FileAlterationListener; +import org.apache.commons.io.monitor.FileAlterationMonitor; +import org.apache.commons.io.monitor.FileAlterationObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +32,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.io.Closer; import com.google.common.io.Files; import gobblin.configuration.ConfigurationKeys; @@ -47,101 +47,206 @@ public class SchedulerUtils { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class); // Extension of properties files - private static final String JOB_PROPS_FILE_EXTENSION = ".properties"; + public static final String JOB_PROPS_FILE_EXTENSION = "properties"; + // A filter for properties files + private static final FilenameFilter PROPERTIES_FILE_FILTER = new FilenameFilter() { + @Override + public boolean accept(File file, String name) { + return Files.getFileExtension(name).equalsIgnoreCase(JOB_PROPS_FILE_EXTENSION); + } + }; /** * Load job configurations from job configuration files stored under the - * root job configuration directory. + * root job configuration file directory. * * @param properties Gobblin framework configuration properties - * @return list of job configuration properties + * @return a list of job configurations in the form of {@link java.util.Properties} */ public static List loadJobConfigs(Properties properties) - throws IOException { - Iterable jobConfigFileExtensionsIterable = Splitter.on(",").omitEmptyStrings().trimResults().split( - properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_EXTENSIONS_KEY, - ConfigurationKeys.DEFAULT_JOB_CONFIG_FILE_EXTENSIONS)); - Set jobConfigFileExtensions = Sets.newHashSet( - Iterables.transform(jobConfigFileExtensionsIterable, new Function() { - @Override - public String apply(String input) { - return input.toLowerCase(); - } - })); + throws ConfigurationException { List jobConfigs = Lists.newArrayList(); - loadJobConfigsRecursive(jobConfigs, properties, jobConfigFileExtensions, + loadJobConfigsRecursive(jobConfigs, properties, getJobConfigurationFileExtensions(properties), new File(properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY))); return jobConfigs; } + /** + * Load job configurations from job configuration files affected by changes to the given common properties file. + * + * @param properties Gobblin framework configuration properties + * @param commonPropsFile the common properties file with changes + * @param jobConfigFileDir root job configuration file directory + * @return a list of job configurations in the form of {@link java.util.Properties} + */ + public static List loadJobConfigs(Properties properties, File commonPropsFile, File jobConfigFileDir) + throws ConfigurationException { + List commonPropsList = Lists.newArrayList(); + // Start from the parent of parent of the changed common properties file to avoid + // loading the common properties file here since it will be loaded below anyway + getCommonProperties(commonPropsList, jobConfigFileDir, commonPropsFile.getParentFile().getParentFile()); + // Add the framework configuration properties to the end + commonPropsList.add(properties); + + Properties commonProps = new Properties(); + // Include common properties in reverse order + for (Properties pros : Lists.reverse(commonPropsList)) { + commonProps.putAll(pros); + } + + List jobConfigs = Lists.newArrayList(); + // The common properties file will be loaded here + loadJobConfigsRecursive(jobConfigs, commonProps, getJobConfigurationFileExtensions(properties), + commonPropsFile.getParentFile()); + return jobConfigs; + } + + /** + * Load a given job configuration file. + * + * @param properties Gobblin framework configuration properties + * @param jobConfigFile job configuration file to be loaded + * @param jobConfigFileDir root job configuration file directory + * @return a job configuration in the form of {@link java.util.Properties} + */ + public static Properties loadJobConfig(Properties properties, File jobConfigFile, File jobConfigFileDir) + throws ConfigurationException { + List commonPropsList = Lists.newArrayList(); + getCommonProperties(commonPropsList, jobConfigFileDir, jobConfigFile.getParentFile()); + // Add the framework configuration properties to the end + commonPropsList.add(properties); + + Properties jobProps = new Properties(); + // Include common properties in reverse order + for (Properties commonProps : Lists.reverse(commonPropsList)) { + jobProps.putAll(commonProps); + } + + // Then load the job configuration properties defined in the job configuration file + jobProps.putAll(ConfigurationConverter.getProperties(new PropertiesConfiguration(jobConfigFile))); + jobProps.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, jobConfigFile.getAbsolutePath()); + return jobProps; + } + + /** + * Add {@link org.apache.commons.io.monitor.FileAlterationMonitor}s for the given + * root directory and any nested subdirectories under the root directory to the given + * {@link org.apache.commons.io.monitor.FileAlterationMonitor}. + * + * @param monitor a {@link org.apache.commons.io.monitor.FileAlterationMonitor} + * @param listener a {@link org.apache.commons.io.monitor.FileAlterationListener} + * @param rootDir root directory + */ + public static void addFileAlterationObserver(FileAlterationMonitor monitor, FileAlterationListener listener, + File rootDir) { + // Add a observer for the current root directory + FileAlterationObserver observer = new FileAlterationObserver(rootDir); + observer.addListener(listener); + monitor.addObserver(observer); + + // List subdirectories under the current root directory + File[] subDirs = rootDir.listFiles(new FileFilter() { + @Override + public boolean accept(File file) { + return file.isDirectory(); + } + }); + + if (subDirs == null || subDirs.length == 0) { + return; + } + + // Recursively add a observer for each subdirectory + for (File subDir : subDirs) { + addFileAlterationObserver(monitor, listener, subDir); + } + } + /** * Recursively load job configuration files under the given directory. */ private static void loadJobConfigsRecursive(List jobConfigs, Properties rootProps, Set jobConfigFileExtensions, File jobConfigDir) - throws IOException { + throws ConfigurationException { // Get the properties file that ends with .properties if any - String[] propertiesFiles = jobConfigDir.list(new FilenameFilter() { - @Override - public boolean accept(File file, String name) { - return name.toLowerCase().endsWith(JOB_PROPS_FILE_EXTENSION); + String[] propertiesFiles = jobConfigDir.list(PROPERTIES_FILE_FILTER); + if (propertiesFiles != null && propertiesFiles.length > 0) { + // There should be a single properties file in each directory (or sub directory) + if (propertiesFiles.length != 1) { + throw new RuntimeException("Found more than one .properties file in directory: " + jobConfigDir); } - }); - Closer closer = Closer.create(); - try { - if (propertiesFiles != null && propertiesFiles.length > 0) { - // There should be a single properties file in each directory (or sub directory) - if (propertiesFiles.length != 1) { - throw new RuntimeException("Found more than one .properties file in directory: " + jobConfigDir); + // Load the properties, which may overwrite the same properties defined in the parent or ancestor directories. + rootProps.putAll(ConfigurationConverter + .getProperties(new PropertiesConfiguration(new File(jobConfigDir, propertiesFiles[0])))); + } + + String[] names = jobConfigDir.list(); + if (names == null || names.length == 0) { + return; + } + + for (String name : names) { + File file = new File(jobConfigDir, name); + if (file.isDirectory()) { + Properties rootPropsCopy = new Properties(); + rootPropsCopy.putAll(rootProps); + loadJobConfigsRecursive(jobConfigs, rootPropsCopy, jobConfigFileExtensions, file); + } else { + if (!jobConfigFileExtensions.contains(Files.getFileExtension(file.getName()).toLowerCase())) { + LOGGER.warn("Skipped file " + file + " that has an unsupported extension"); + continue; } - // Load the properties, which may overwrite the same properties defined in the parent or ancestor directories. - rootProps.load(closer.register( - new InputStreamReader(new FileInputStream(new File(jobConfigDir, propertiesFiles[0])), - Charset.forName(ConfigurationKeys.DEFAULT_CHARSET_ENCODING)))); - } + File doneFile = new File(file + ".done"); + if (doneFile.exists()) { + // Skip the job configuration file when a .done file with the same name exists, + // which means the job configuration file is for a one-time job and the job has + // already run and finished. + LOGGER.info("Skipped job configuration file " + file + " for which a .done file exists"); + continue; + } - String[] names = jobConfigDir.list(); - if (names == null || names.length == 0) { - return; + Properties jobProps = new Properties(); + // Put all parent/ancestor properties first + jobProps.putAll(rootProps); + // Then load the job configuration properties defined in the job configuration file + jobProps.putAll(ConfigurationConverter.getProperties(new PropertiesConfiguration(file))); + jobProps.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, file.getAbsolutePath()); + jobConfigs.add(jobProps); } + } + } - for (String name : names) { - File file = new File(jobConfigDir, name); - if (file.isDirectory()) { - Properties rootPropsCopy = new Properties(); - rootPropsCopy.putAll(rootProps); - loadJobConfigsRecursive(jobConfigs, rootPropsCopy, jobConfigFileExtensions, file); - } else { - if (!jobConfigFileExtensions.contains(Files.getFileExtension(file.getName()).toLowerCase())) { - LOGGER.warn("Skipped file " + file + " that has an unsupported extension"); - continue; - } - - File doneFile = new File(file + ".done"); - if (doneFile.exists()) { - // Skip the job configuration file when a .done file with the same name exists, - // which means the job configuration file is for a one-time job and the job has - // already run and finished. - LOGGER.info("Skipped job configuration file " + file + " for which a .done file exists"); - continue; + private static Set getJobConfigurationFileExtensions(Properties properties) { + Iterable jobConfigFileExtensionsIterable = Splitter.on(",").omitEmptyStrings().trimResults().split( + properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_EXTENSIONS_KEY, + ConfigurationKeys.DEFAULT_JOB_CONFIG_FILE_EXTENSIONS)); + return Sets.newHashSet(Iterables.transform(jobConfigFileExtensionsIterable, new Function() { + @Override + public String apply(String input) { + return input.toLowerCase(); } + })); + } - Properties jobProps = new Properties(); - // Put all parent/ancestor properties first - jobProps.putAll(rootProps); - // Then load the job configuration properties defined in the job configuration file - jobProps.putAll(ConfigurationConverter.getProperties(new PropertiesConfiguration(file))); - jobProps.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, file.getAbsolutePath()); - jobConfigs.add(jobProps); + private static void getCommonProperties(List commonPropsList, File jobConfigFileDir, File dir) + throws ConfigurationException { + // Traversal backward until the parent of the root job configuration file directory is reached + while (!dir.equals(jobConfigFileDir.getParentFile())) { + // Get the properties file that ends with .properties if any + String[] propertiesFiles = dir.list(PROPERTIES_FILE_FILTER); + if (propertiesFiles != null && propertiesFiles.length > 0) { + // There should be a single properties file in each directory (or sub directory) + if (propertiesFiles.length != 1) { + throw new RuntimeException("Found more than one .properties file in directory: " + dir); } + commonPropsList.add( + ConfigurationConverter.getProperties(new PropertiesConfiguration(new File(dir, propertiesFiles[0])))); } - } catch (Throwable t) { - throw closer.rethrow(t); - } finally { - closer.close(); + + dir = dir.getParentFile(); } } } diff --git a/gobblin-utility/src/test/java/gobblin/util/SchedulerUtilsTest.java b/gobblin-utility/src/test/java/gobblin/util/SchedulerUtilsTest.java index e218e7ce0ff..48ea2b66ba7 100644 --- a/gobblin-utility/src/test/java/gobblin/util/SchedulerUtilsTest.java +++ b/gobblin-utility/src/test/java/gobblin/util/SchedulerUtilsTest.java @@ -14,15 +14,23 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.nio.charset.Charset; import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Semaphore; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.io.monitor.FileAlterationListener; +import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; +import org.apache.commons.io.monitor.FileAlterationMonitor; import org.apache.hadoop.fs.FileUtil; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.collect.Sets; import com.google.common.io.Files; import gobblin.configuration.ConfigurationKeys; @@ -34,20 +42,17 @@ @Test(groups = {"gobblin.util"}) public class SchedulerUtilsTest { - private static final String JOB_CONF_ROOT_DIR = "gobblin-test/test-job-conf-dir"; + private static final File JOB_CONF_ROOT_DIR = new File("gobblin-test/test-job-conf-dir"); + private static final File SUB_DIR1 = new File(JOB_CONF_ROOT_DIR, "test1"); + private static final File SUB_DIR11 = new File(SUB_DIR1, "test11"); + private static final File SUB_DIR2 = new File(JOB_CONF_ROOT_DIR, "test2"); @BeforeClass public void setUp() throws IOException { - // test-job-conf-dir/test1 - File subDir1 = new File(JOB_CONF_ROOT_DIR, "test1"); - subDir1.mkdirs(); - // test-job-conf-dir/test1/test11 - File subDir11 = new File(subDir1, "test11"); - subDir11.mkdirs(); - // test-job-conf-dir/test2 - File subDir2 = new File(JOB_CONF_ROOT_DIR, "test2"); - subDir2.mkdirs(); + SUB_DIR1.mkdirs(); + SUB_DIR11.mkdirs(); + SUB_DIR2.mkdirs(); Properties rootProps = new Properties(); rootProps.setProperty("k1", "a1"); @@ -59,44 +64,44 @@ public void setUp() props1.setProperty("k1", "b1"); props1.setProperty("k3", "a3"); // test-job-conf-dir/test1/test.properties - props1.store(new FileWriter(new File(subDir1, "test.properties")), ""); + props1.store(new FileWriter(new File(SUB_DIR1, "test.properties")), ""); Properties jobProps1 = new Properties(); jobProps1.setProperty("k1", "c1"); jobProps1.setProperty("k3", "b3"); jobProps1.setProperty("k6", "a6"); // test-job-conf-dir/test1/test11.pull - jobProps1.store(new FileWriter(new File(subDir1, "test11.pull")), ""); + jobProps1.store(new FileWriter(new File(SUB_DIR1, "test11.pull")), ""); Properties jobProps2 = new Properties(); jobProps2.setProperty("k7", "a7"); // test-job-conf-dir/test1/test12.PULL - jobProps2.store(new FileWriter(new File(subDir1, "test12.PULL")), ""); + jobProps2.store(new FileWriter(new File(SUB_DIR1, "test12.PULL")), ""); Properties jobProps3 = new Properties(); jobProps3.setProperty("k1", "d1"); jobProps3.setProperty("k8", "a8"); jobProps3.setProperty("k9", "${k8}"); // test-job-conf-dir/test1/test11/test111.pull - jobProps3.store(new FileWriter(new File(subDir11, "test111.pull")), ""); + jobProps3.store(new FileWriter(new File(SUB_DIR11, "test111.pull")), ""); Properties props2 = new Properties(); props2.setProperty("k2", "b2"); props2.setProperty("k5", "a5"); // test-job-conf-dir/test2/test.properties - props2.store(new FileWriter(new File(subDir2, "test.properties")), ""); + props2.store(new FileWriter(new File(SUB_DIR2, "test.PROPERTIES")), ""); Properties jobProps4 = new Properties(); jobProps4.setProperty("k5", "b5"); // test-job-conf-dir/test2/test21.PULL - jobProps4.store(new FileWriter(new File(subDir2, "test21.PULL")), ""); + jobProps4.store(new FileWriter(new File(SUB_DIR2, "test21.PULL")), ""); } @Test public void testLoadJobConfigs() - throws IOException { + throws ConfigurationException { Properties properties = new Properties(); - properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, JOB_CONF_ROOT_DIR); + properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, JOB_CONF_ROOT_DIR.getAbsolutePath()); List jobConfigs = SchedulerUtils.loadJobConfigs(properties); Assert.assertEquals(jobConfigs.size(), 4); @@ -143,13 +148,12 @@ public void testLoadJobConfigs() @Test(dependsOnMethods = {"testLoadJobConfigs"}) public void testLoadJobConfigsWithDoneFile() - throws IOException { - File subDir2 = new File(JOB_CONF_ROOT_DIR, "test2"); + throws ConfigurationException, IOException { // Create a .done file for test21.pull so it should not be loaded - Files.copy(new File(subDir2, "test21.PULL"), new File(subDir2, "test21.PULL.done")); + Files.copy(new File(SUB_DIR2, "test21.PULL"), new File(SUB_DIR2, "test21.PULL.done")); Properties properties = new Properties(); - properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, JOB_CONF_ROOT_DIR); + properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, JOB_CONF_ROOT_DIR.getAbsolutePath()); List jobConfigs = SchedulerUtils.loadJobConfigs(properties); Assert.assertEquals(jobConfigs.size(), 3); @@ -181,10 +185,114 @@ public void testLoadJobConfigsWithDoneFile() Assert.assertNull(getJobConfigForFile(jobConfigs, "test21.PULL")); } + @Test + public void testLoadJobConfigsForCommonPropsFile() + throws ConfigurationException { + File commonPropsFile = new File(SUB_DIR1, "test.properties"); + + Properties properties = new Properties(); + properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, JOB_CONF_ROOT_DIR.getAbsolutePath()); + List jobConfigs = + SchedulerUtils.loadJobConfigs(properties, commonPropsFile, JOB_CONF_ROOT_DIR); + Assert.assertEquals(jobConfigs.size(), 3); + + // test-job-conf-dir/test1/test11/test111.pull + Properties jobProps1 = getJobConfigForFile(jobConfigs, "test111.pull"); + Assert.assertEquals(jobProps1.stringPropertyNames().size(), 7); + Assert.assertEquals(jobProps1.getProperty("k1"), "d1"); + Assert.assertEquals(jobProps1.getProperty("k2"), "a2"); + Assert.assertEquals(jobProps1.getProperty("k3"), "a3"); + Assert.assertEquals(jobProps1.getProperty("k8"), "a8"); + Assert.assertEquals(jobProps1.getProperty("k9"), "a8"); + + // test-job-conf-dir/test1/test11.pull + Properties jobProps2 = getJobConfigForFile(jobConfigs, "test11.pull"); + Assert.assertEquals(jobProps2.stringPropertyNames().size(), 6); + Assert.assertEquals(jobProps2.getProperty("k1"), "c1"); + Assert.assertEquals(jobProps2.getProperty("k2"), "a2"); + Assert.assertEquals(jobProps2.getProperty("k3"), "b3"); + Assert.assertEquals(jobProps2.getProperty("k6"), "a6"); + + // test-job-conf-dir/test1/test12.PULL + Properties jobProps3 = getJobConfigForFile(jobConfigs, "test12.PULL"); + Assert.assertEquals(jobProps3.stringPropertyNames().size(), 6); + Assert.assertEquals(jobProps3.getProperty("k1"), "b1"); + Assert.assertEquals(jobProps3.getProperty("k2"), "a2"); + Assert.assertEquals(jobProps3.getProperty("k3"), "a3"); + Assert.assertEquals(jobProps3.getProperty("k7"), "a7"); + } + + @Test + public void testLoadJobConfig() + throws ConfigurationException { + File jobConfigFile = new File(SUB_DIR11, "test111.pull"); + Properties properties = new Properties(); + properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, JOB_CONF_ROOT_DIR.getAbsolutePath()); + Properties jobProps = SchedulerUtils.loadJobConfig(properties, jobConfigFile, JOB_CONF_ROOT_DIR); + + Assert.assertEquals(jobProps.stringPropertyNames().size(), 7); + Assert.assertTrue(jobProps.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)); + Assert.assertTrue(jobProps.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY)); + Assert.assertEquals(jobProps.getProperty("k1"), "d1"); + Assert.assertEquals(jobProps.getProperty("k2"), "a2"); + Assert.assertEquals(jobProps.getProperty("k3"), "a3"); + Assert.assertEquals(jobProps.getProperty("k8"), "a8"); + Assert.assertEquals(jobProps.getProperty("k9"), "a8"); + } + + @Test(dependsOnMethods = { + "testLoadJobConfigsWithDoneFile", + "testLoadJobConfigsForCommonPropsFile", + "testLoadJobConfig"}) + public void testFileAlterationObserver() throws Exception { + FileAlterationMonitor monitor = new FileAlterationMonitor(3000); + final Set fileAltered = Sets.newHashSet(); + final Semaphore semaphore = new Semaphore(0); + FileAlterationListener listener = new FileAlterationListenerAdaptor() { + + @Override + public void onFileCreate(File file) { + fileAltered.add(file); + semaphore.release(); + } + + @Override + public void onFileChange(File file) { + fileAltered.add(file); + semaphore.release(); + } + }; + + SchedulerUtils.addFileAlterationObserver(monitor, listener, JOB_CONF_ROOT_DIR); + + try { + monitor.start(); + // Give the monitor some time to start + Thread.sleep(1000); + + File jobConfigFile = new File(SUB_DIR11, "test111.pull"); + Files.touch(jobConfigFile); + + File commonPropsFile = new File(SUB_DIR1, "test.properties"); + Files.touch(commonPropsFile); + + File newJobConfigFile = new File(SUB_DIR11, "test112.pull"); + Files.append("k1=v1", newJobConfigFile, Charset.forName(ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); + + semaphore.acquire(3); + Assert.assertEquals(fileAltered.size(), 3); + Assert.assertTrue(fileAltered.contains(jobConfigFile)); + Assert.assertTrue(fileAltered.contains(commonPropsFile)); + Assert.assertTrue(fileAltered.contains(newJobConfigFile)); + } finally { + monitor.stop(); + } + } + @AfterClass public void tearDown() throws IOException { - FileUtil.fullyDelete(new File(JOB_CONF_ROOT_DIR)); + FileUtil.fullyDelete(JOB_CONF_ROOT_DIR); } private Properties getJobConfigForFile(List jobConfigs, String fileName) {