diff --git a/sats_receiver/gr_modules/decoders.py b/sats_receiver/gr_modules/decoders.py index 3c65f20..12e9046 100644 --- a/sats_receiver/gr_modules/decoders.py +++ b/sats_receiver/gr_modules/decoders.py @@ -67,6 +67,9 @@ def set_observation_key(self, observation_key: str): self.observation_key = observation_key self.base_kw.update(observation_key=observation_key) + def lock_reconf(self, detach=0): + pass + def start(self): pfx = '_'.join([*self.name().lower().split(), self.t.strftime('%Y%m%d%H%M%S')]) self.tmp_file = utils.mktmp(self.out_dir, pfx) @@ -139,6 +142,15 @@ def __init__(self, for ch in range(ch_n): self.connect((pre_sink, ch), (self.wav_sink, ch)) + def lock_reconf(self, detach=0): + self.wav_sink.close() + if detach: + self.wav_sink.set_append(1) + if not self.wav_sink.open(str(self.tmp_file)): + self.wav_sink.set_append(0) + self.finalize() + self.start() + def start(self): super(RawDecoder, self).start() @@ -147,7 +159,7 @@ def start(self): def finalize(self): self.wav_sink.close() if self.tmp_file.exists(): - self.recorder.satellite.executor.execute(self._raw_finalize, **self.base_kw) + self.recorder.satellite.executor.execute(self._raw_finalize, **self.base_kw.copy()) @staticmethod def _raw_finalize(log: logging.Logger, diff --git a/sats_receiver/gr_modules/modules.py b/sats_receiver/gr_modules/modules.py index 30298f6..7d20900 100644 --- a/sats_receiver/gr_modules/modules.py +++ b/sats_receiver/gr_modules/modules.py @@ -38,8 +38,6 @@ def __init__(self, self.resamp_gcd = resamp_gcd = math.gcd(bandwidth, samp_rate) - self.blocks_copy = gr.blocks.copy(gr.gr.sizeof_gr_complex) - self.blocks_copy.set_enabled(self.enabled) self.freqshifter = gr.blocks.rotator_cc(2 * math.pi * (main_tune - frequency) / samp_rate) self.resampler = gr.filter.rational_resampler_ccc( interpolation=bandwidth // resamp_gcd, @@ -50,7 +48,6 @@ def __init__(self, self.connect( self, - self.blocks_copy, self.freqshifter, self.resampler, self, @@ -58,7 +55,6 @@ def __init__(self, def set_enabled(self, enabled): self.enabled = enabled - self.blocks_copy.set_enabled(enabled) def set_freq_offset(self, new_freq: Union[int, float]): self.freqshifter.set_phase_inc(2 * math.pi * (self.main_tune - new_freq) / self.samp_rate) @@ -302,6 +298,10 @@ def stop(self): def set_freq_offset(self, new_freq: Union[int, float]): self.radio.set_freq_offset(new_freq) + def lock_reconf(self, detach=0): + for d in self.decoders: + d.lock_reconf(detach) + @property def is_runned(self) -> bool: return self.radio.enabled @@ -547,7 +547,8 @@ def stop(self): self.start_event = self.stop_event = None for r in self.recorders: - r.stop() + if r.is_runned: + r.stop() def correct_doppler(self, observer: ephem.Observer): if self.is_runned and self.doppler: @@ -557,6 +558,11 @@ def correct_doppler(self, observer: ephem.Observer): if r.is_runned: r.set_freq_offset(utils.doppler_shift(r.frequency, self.sat_ephem_tle[0].range_velocity)) + def lock_reconf(self, detach=0): + for r in self.recorders: + if r.is_runned: + r.lock_reconf(detach) + @property def name(self) -> str: return self.config['name'] diff --git a/sats_receiver/gr_modules/receiver.py b/sats_receiver/gr_modules/receiver.py index 2b301a2..bfcba72 100644 --- a/sats_receiver/gr_modules/receiver.py +++ b/sats_receiver/gr_modules/receiver.py @@ -112,13 +112,14 @@ def update_config(self, config: Mapping, force=False): if to_remove_sats or to_create_sats: self.lock() + self.lock_reconf() for sat_name in to_remove_sats: sat = self.satellites[sat_name] self.up.scheduler.cancel(*sat.events) - sat.stop() if sat.is_runned: self.disconnect(self.connector, sat) + sat.stop() del self.satellites[sat_name] for sat_name in to_create_sats: @@ -277,6 +278,10 @@ def start(self, max_noutput_items=10000000): utils.num_disp(self.tune, 3), utils.num_disp(self.samp_rate, 3), self.gain, self.biast) + for sat in self.satellites.values(): + if sat.is_runned: + self.disconnect(self.connector, sat) + try: self.set_source(gr.soapy.source(f'driver={self.source}{self.serial and f",serial={self.serial}"}', 'fc32', 1, '', '', [''], [''])) @@ -299,9 +304,9 @@ def start(self, max_noutput_items=10000000): self.soapy_apply(0) self.connect(*self.flow) - for sat in self.satellites.values(): - self.connect(self.connector, sat) + if sat.is_runned: + self.connect(self.connector, sat) super(SatsReceiver, self).start(max_noutput_items) self.is_runned = True @@ -318,18 +323,45 @@ def stop(self, sched_clear=True): super(SatsReceiver, self).stop() self.disconnect(*self.flow) - con = self.connector + conn = self.connector self.set_source(gr.blocks.null_source(gr.gr.sizeof_gr_complex)) for sat in self.satellites.values(): if sched_clear: self.up.scheduler.cancel(*sat.events) + if self.is_runned and sat.is_runned: + try: + self.disconnect(conn, sat) + except ValueError as e: + sat.log.warning('stop %s fail: %s', conn.name(), e) sat.stop() - if self.is_runned: - self.disconnect(con, sat) self.is_runned = False + def lock_reconf(self, detach=0): + for s in self.satellites.values(): + if s.is_runned: + s.lock_reconf(detach) + + def sat_attach(self, sat: modules.Satellite): + self.lock() + self.lock_reconf() + self.connect(self.connector, sat) + sat.start() + self.unlock() + + def sat_detach(self, sat: modules.Satellite): + try: + self.lock() + self.lock_reconf(1) + if sat.is_runned: + self.disconnect(self.connector, sat) + except ValueError as e: + sat.log.warning('detach %s fail: %s', self.connector.name(), e) + finally: + self.unlock() + sat.stop() + def action(self): if self.is_active and not self.start(): for sat in self.satellites.values(): @@ -371,8 +403,8 @@ def calculate_pass(self, sat: modules.Satellite): if set_t < rise_t: rise_t = t sat.events = [ - None if sat.is_runned else self.up.scheduler.plan(rise_t, sat.start), - self.up.scheduler.plan(set_t, sat.stop), + None if sat.is_runned else self.up.scheduler.plan(rise_t, self.sat_attach, sat), + self.up.scheduler.plan(set_t, self.sat_detach, sat), self.up.scheduler.plan(set_tt, self.calculate_pass, sat) ] self.log.info('Sat `%s` planned on %s <-> %s',