From fb3155083364ef0e99923364e5a340dd06452742 Mon Sep 17 00:00:00 2001 From: Matthew R Becker Date: Fri, 10 Jan 2020 09:06:56 -0600 Subject: [PATCH 1/9] ENH allow variable number of jobs for making psfs --- meds/maker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meds/maker.py b/meds/maker.py index 6c4fb5e..9fc0bb5 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -340,7 +340,7 @@ def _write_psf_cutouts_joblib(self): # run them all in parallel with joblib.Parallel( - n_jobs=-1, + n_jobs=self.get('psf', {}).get('n_jobs', -1), backend='multiprocessing', max_nbytes=None, verbose=50) as parallel: From 15e8b19c7a7c85852de67f485df8c128602ce35c Mon Sep 17 00:00:00 2001 From: Matthew R Becker Date: Fri, 10 Jan 2020 09:17:35 -0600 Subject: [PATCH 2/9] try this --- meds/maker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/meds/maker.py b/meds/maker.py index 9fc0bb5..40f4881 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -340,7 +340,8 @@ def _write_psf_cutouts_joblib(self): # run them all in parallel with joblib.Parallel( - n_jobs=self.get('psf', {}).get('n_jobs', -1), + n_jobs=self.get('max_joblib_workers', -1), + inner_max_num_threads=1, backend='multiprocessing', max_nbytes=None, verbose=50) as parallel: @@ -895,6 +896,9 @@ def _do_sky2image(self, wcs, ra, dec, color=None): import joblib n_jobs = joblib.externals.loky.cpu_count() + if self.get('max_joblib_workers', -1) > 0: + n_jobs = min(self.get('max_joblib_workers', -1), n_jobs) + n_per_job = len(ra) // n_jobs if n_jobs * n_per_job < len(ra): n_per_job += 1 @@ -926,6 +930,7 @@ def _do_sky2image(self, wcs, ra, dec, color=None): with joblib.Parallel( n_jobs=n_jobs, + inner_max_num_threads=1, backend='multiprocessing', max_nbytes=None, verbose=50) as parallel: From 00c045c5850d4f4c2e9a6412ce6d0456e8df8a36 Mon Sep 17 00:00:00 2001 From: Matthew R Becker Date: Fri, 10 Jan 2020 09:22:26 -0600 Subject: [PATCH 3/9] defaults in one spot --- meds/maker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/meds/maker.py b/meds/maker.py index 40f4881..52d2d65 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -340,7 +340,7 @@ def _write_psf_cutouts_joblib(self): # run them all in parallel with joblib.Parallel( - n_jobs=self.get('max_joblib_workers', -1), + n_jobs=self._max_joblib_workers, inner_max_num_threads=1, backend='multiprocessing', max_nbytes=None, @@ -896,8 +896,8 @@ def _do_sky2image(self, wcs, ra, dec, color=None): import joblib n_jobs = joblib.externals.loky.cpu_count() - if self.get('max_joblib_workers', -1) > 0: - n_jobs = min(self.get('max_joblib_workers', -1), n_jobs) + if self._max_joblib_workers > 0: + n_jobs = min(self._max_joblib_workers, n_jobs) n_per_job = len(ra) // n_jobs if n_jobs * n_per_job < len(ra): @@ -1339,6 +1339,8 @@ def _load_config(self, config): if 'psf_type' in self: self['psf'] = {'type': self['psf_type']} + self._max_joblib_workers = self.get('max_joblib_workers', -1) + def _psf_rec_func(output_path, psf_data, file_ids, rows, cols): import joblib From 00df73e80135d276d396c02e52aadc37a276c03f Mon Sep 17 00:00:00 2001 From: Matthew R Becker Date: Fri, 10 Jan 2020 11:12:35 -0600 Subject: [PATCH 4/9] try this API --- meds/maker.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/meds/maker.py b/meds/maker.py index 52d2d65..68f1494 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -339,13 +339,11 @@ def _write_psf_cutouts_joblib(self): psf_data, file_ids, rows, cols)) # run them all in parallel - with joblib.Parallel( + with joblib.parallel_backend("multiprocessing", inner_max_num_threads=1): + outputs = joblib.Parallel( n_jobs=self._max_joblib_workers, - inner_max_num_threads=1, - backend='multiprocessing', max_nbytes=None, - verbose=50) as parallel: - outputs = parallel(jobs) + verbose=50)(jobs) # write to disk # at this point all of the PSFs we need are in memory on a @@ -928,13 +926,11 @@ def _do_sky2image(self, wcs, ra, dec, color=None): ) ) - with joblib.Parallel( + with joblib.parallel_backend("multiprocessing", inner_max_num_threads=1): + outputs = joblib.Parallel( n_jobs=n_jobs, - inner_max_num_threads=1, - backend='multiprocessing', max_nbytes=None, - verbose=50) as parallel: - outputs = parallel(jobs) + verbose=50)(jobs) col = [] row = [] From 45b9cdb711da2f176cb52a05c448b1bb580a59a4 Mon Sep 17 00:00:00 2001 From: Erin Sheldon Date: Fri, 10 Jan 2020 12:24:02 -0500 Subject: [PATCH 5/9] use loky --- meds/maker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/meds/maker.py b/meds/maker.py index 68f1494..06c7857 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -339,7 +339,7 @@ def _write_psf_cutouts_joblib(self): psf_data, file_ids, rows, cols)) # run them all in parallel - with joblib.parallel_backend("multiprocessing", inner_max_num_threads=1): + with joblib.parallel_backend("loky", inner_max_num_threads=1): outputs = joblib.Parallel( n_jobs=self._max_joblib_workers, max_nbytes=None, @@ -926,7 +926,7 @@ def _do_sky2image(self, wcs, ra, dec, color=None): ) ) - with joblib.parallel_backend("multiprocessing", inner_max_num_threads=1): + with joblib.parallel_backend("loky", inner_max_num_threads=1): outputs = joblib.Parallel( n_jobs=n_jobs, max_nbytes=None, From 3bb0f368b8051eee46447d156b44bfada16e96ae Mon Sep 17 00:00:00 2001 From: Erin Sheldon Date: Fri, 10 Jan 2020 12:28:22 -0500 Subject: [PATCH 6/9] just remove inner_max_num_threads --- meds/maker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/meds/maker.py b/meds/maker.py index 06c7857..25cf39d 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -339,7 +339,7 @@ def _write_psf_cutouts_joblib(self): psf_data, file_ids, rows, cols)) # run them all in parallel - with joblib.parallel_backend("loky", inner_max_num_threads=1): + with joblib.parallel_backend("multiprocessing"): outputs = joblib.Parallel( n_jobs=self._max_joblib_workers, max_nbytes=None, @@ -926,7 +926,7 @@ def _do_sky2image(self, wcs, ra, dec, color=None): ) ) - with joblib.parallel_backend("loky", inner_max_num_threads=1): + with joblib.parallel_backend("multiprocessing"): outputs = joblib.Parallel( n_jobs=n_jobs, max_nbytes=None, From fce0464bdffbabc6a04f35b9cf2ba388ba16ce17 Mon Sep 17 00:00:00 2001 From: Matthew R Becker Date: Fri, 10 Jan 2020 11:52:09 -0600 Subject: [PATCH 7/9] ENH do this thing --- meds/maker.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/meds/maker.py b/meds/maker.py index 68f1494..ac2c82a 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -339,9 +339,11 @@ def _write_psf_cutouts_joblib(self): psf_data, file_ids, rows, cols)) # run them all in parallel - with joblib.parallel_backend("multiprocessing", inner_max_num_threads=1): + with joblib.parallel_backend( + self._joblib_backend, + inner_max_num_threads=self._joblib_threads): outputs = joblib.Parallel( - n_jobs=self._max_joblib_workers, + n_jobs=self._joblib_max_workers, max_nbytes=None, verbose=50)(jobs) @@ -894,8 +896,8 @@ def _do_sky2image(self, wcs, ra, dec, color=None): import joblib n_jobs = joblib.externals.loky.cpu_count() - if self._max_joblib_workers > 0: - n_jobs = min(self._max_joblib_workers, n_jobs) + if self._joblib_max_workers > 0: + n_jobs = min(self._joblib_max_workers, n_jobs) n_per_job = len(ra) // n_jobs if n_jobs * n_per_job < len(ra): @@ -926,7 +928,9 @@ def _do_sky2image(self, wcs, ra, dec, color=None): ) ) - with joblib.parallel_backend("multiprocessing", inner_max_num_threads=1): + with joblib.parallel_backend( + self._joblib_backend, + inner_max_num_threads=self._joblib_threads): outputs = joblib.Parallel( n_jobs=n_jobs, max_nbytes=None, @@ -1335,7 +1339,14 @@ def _load_config(self, config): if 'psf_type' in self: self['psf'] = {'type': self['psf_type']} - self._max_joblib_workers = self.get('max_joblib_workers', -1) + self._joblib_backend = self.get( + 'joblib', {}).get('backend', 'multiprocessing') + self._joblib_max_workers = self.get( + 'joblib', {}).get('max_joblib_workers', -1) + if self._joblib_backend == 'loky': + self._joblib_threads = 1 + else: + self._joblib_threads = None def _psf_rec_func(output_path, psf_data, file_ids, rows, cols): From aa3ec2f0309b6ec4a5a7b89d82dfb83d9098ad33 Mon Sep 17 00:00:00 2001 From: Matthew R Becker Date: Fri, 10 Jan 2020 12:00:35 -0600 Subject: [PATCH 8/9] wrong key --- meds/maker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meds/maker.py b/meds/maker.py index ac2c82a..940cb9e 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -1342,7 +1342,7 @@ def _load_config(self, config): self._joblib_backend = self.get( 'joblib', {}).get('backend', 'multiprocessing') self._joblib_max_workers = self.get( - 'joblib', {}).get('max_joblib_workers', -1) + 'joblib', {}).get('max_workers', -1) if self._joblib_backend == 'loky': self._joblib_threads = 1 else: From bf094d782ed52bc221b207b5e35177c383d4c92e Mon Sep 17 00:00:00 2001 From: Matthew R Becker Date: Fri, 10 Jan 2020 12:09:29 -0600 Subject: [PATCH 9/9] again --- meds/maker.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/meds/maker.py b/meds/maker.py index 940cb9e..58141ea 100644 --- a/meds/maker.py +++ b/meds/maker.py @@ -422,7 +422,7 @@ def _write_psf_cutouts(self): print('writing psf cutouts') - if self.get('use_joblib', False): + if self._use_joblib: self._write_psf_cutouts_joblib() else: self._write_psf_cutouts_serial() @@ -892,7 +892,7 @@ def _do_sky2image(self, wcs, ra, dec, color=None): """ # the cut at 250 eliminates cases where multiprocessing is # slower or the same due to overheads - if self.get('use_joblib', False) and len(ra) > 250: + if self._use_joblib and len(ra) > 250: import joblib n_jobs = joblib.externals.loky.cpu_count() @@ -1339,6 +1339,11 @@ def _load_config(self, config): if 'psf_type' in self: self['psf'] = {'type': self['psf_type']} + if 'joblib' in self: + self._use_joblib = True + else: + self._use_joblib = self.get('use_joblib', False) + self._joblib_backend = self.get( 'joblib', {}).get('backend', 'multiprocessing') self._joblib_max_workers = self.get(