diff --git a/core/local/channel_watcher/index.js b/core/local/channel_watcher/index.js index 62c8a9fcd..3a889f09e 100644 --- a/core/local/channel_watcher/index.js +++ b/core/local/channel_watcher/index.js @@ -165,6 +165,18 @@ class ChannelWatcher { await scanDone } + resume() { + log.info('Resuming watcher...') + + return this.producer.resume() + } + + suspend() { + log.info('Suspending watcher...') + + return this.producer.suspend() + } + async stop() /*: Promise<*> */ { log.info('Stopping watcher...') diff --git a/core/local/channel_watcher/parcel_producer.js b/core/local/channel_watcher/parcel_producer.js index e2d9916dc..12fe4f164 100644 --- a/core/local/channel_watcher/parcel_producer.js +++ b/core/local/channel_watcher/parcel_producer.js @@ -102,15 +102,7 @@ class Producer { async start() { log.info('Starting producer...') - this.watcher = await parcel.subscribe( - this.config.syncPath, - async (err, events) => { - // FIXME: use async queue to run processEvents calls in order - await this.processEvents(events) - }, - { backend } - ) - if (!this.watcher) throw new Error('Could not start @parcel/watcher') + await this.subscribe() this.events.emit('buffering-start') @@ -123,6 +115,47 @@ class Producer { this.events.emit('buffering-end') } + async resume() { + log.info('Resuming producer...') + + await this.subscribe() + } + + async suspend() { + log.info('Suspending producer...') + + await this.unsubscribe() + } + + async stop() { + log.info('Stopping producer...') + + await this.unsubscribe() + } + + async subscribe() { + if (!this.watcher) { + this.watcher = await parcel.subscribe( + this.config.syncPath, + async (err, events) => { + // FIXME: use async queue to run processEvents calls in order + await this.processEvents(events) + }, + { backend } + ) + } + if (!this.watcher) throw new Error('Could not start @parcel/watcher') + } + + async unsubscribe() { + if (this.watcher) { + await this.watcher.unsubscribe() + // XXX: unsubscribe() resolves before it was actually finished + await Promise.delay(1000) + this.watcher = null + } + } + async scan(relPath /*: string */) { const stopParcelScanMeasure = measureTime('Parcel#scan') const scanEvents = await parcel.scan( @@ -187,17 +220,6 @@ class Producer { this.channel.push(batch) } } - - async stop() { - log.info('Stopping producer...') - - if (this.watcher) { - await this.watcher.unsubscribe() - // XXX: unsubscribe() resolves before it was actually finished - await Promise.delay(1000) - this.watcher = null - } - } } module.exports = Producer diff --git a/core/local/chokidar/event_buffer.js b/core/local/chokidar/event_buffer.js index 90a1ce224..eb782b8c4 100644 --- a/core/local/chokidar/event_buffer.js +++ b/core/local/chokidar/event_buffer.js @@ -106,7 +106,7 @@ class EventBuffer /*:: */ { } switchMode(mode /*: EventBufferMode */) /*: void */ { - this.flush() + this.clearTimeout() this.mode = mode } diff --git a/core/local/chokidar/watcher.js b/core/local/chokidar/watcher.js index 50923f2da..fe6f914d7 100644 --- a/core/local/chokidar/watcher.js +++ b/core/local/chokidar/watcher.js @@ -190,6 +190,7 @@ class LocalWatcher { .on('ready', () => { stopChokidarScanMeasure() log.info('Folder scan done') + this.buffer.flush() this.buffer.switchMode('timeout') }) .on('raw', async (event, path, details) => { @@ -223,6 +224,68 @@ class LocalWatcher { return started } + async resume() { + log.info('Resuming watcher...') + + if (this.watcher && this.watcher.getWatched().length === 0) { + this.watcher.add('.') + } + + // Flush previously buffered events + this.buffer.flush() + // Restart flushes loop + this.buffer.switchMode('timeout') + } + + async suspend() { + log.info('Suspending watcher...') + + // Stop flushes loop but keep buffered events + this.buffer.switchMode('idle') + + // Stop underlying Chokidar watcher + if (this.watcher) { + this.watcher.unwatch('.') + } + } + + async stop(force /*: ?bool */ = false) { + log.info('Stopping watcher...') + + if (!this.watcher) return + + if (force || !this.initialScanParams.flushed) { + // Drop buffered events + this.buffer.clear() + } else { + // XXX manually fire events for added file, because chokidar will cancel + // them if they are still in the awaitWriteFinish period + for (let relpath in this.watcher._pendingWrites) { + try { + const fullpath = path.join(this.watcher.options.cwd, relpath) + const curStat = await stater.stat(fullpath) + this.watcher.emit('add', relpath, curStat) + } catch (err) { + log.warn('Could not fire remaining add events', { err }) + } + } + } + + // Stop underlying Chokidar watcher + await this.watcher.close() + this.watcher = null + // Flush buffer and stop flushes loop + this.buffer.flush() + this.buffer.switchMode('idle') + + if (!force) { + // Give some time for awaitWriteFinish events to be managed + return new Promise(resolve => { + setTimeout(resolve, 1000) + }) + } + } + // TODO: Start checksuming as soon as an add/change event is buffered // TODO: Put flushed event batches in a queue async onFlush(rawEvents /*: ChokidarEvent[] */) { @@ -283,42 +346,6 @@ class LocalWatcher { this.endInitialScan() } - async stop(force /*: ?bool */ = false) { - log.info('Stopping watcher...') - - if (!this.watcher) return - - if (force || !this.initialScanParams.flushed) { - // Drop buffered events - this.buffer.clear() - } else { - // XXX manually fire events for added file, because chokidar will cancel - // them if they are still in the awaitWriteFinish period - for (let relpath in this.watcher._pendingWrites) { - try { - const fullpath = path.join(this.watcher.options.cwd, relpath) - const curStat = await stater.stat(fullpath) - this.watcher.emit('add', relpath, curStat) - } catch (err) { - log.warn('Could not fire remaining add events', { err }) - } - } - } - - // Stop underlying Chokidar watcher - await this.watcher.close() - this.watcher = null - // Flush buffer and stop flushes loop - this.buffer.switchMode('idle') - - if (!force) { - // Give some time for awaitWriteFinish events to be managed - return new Promise(resolve => { - setTimeout(resolve, 1000) - }) - } - } - resetInitialScanParams() { this.initialScanParams = { paths: [], diff --git a/core/local/index.js b/core/local/index.js index fdaaeac2d..4bd60ce9e 100644 --- a/core/local/index.js +++ b/core/local/index.js @@ -115,6 +115,17 @@ class Local /*:: implements Reader, Writer */ { return this.watcher.start() } + resume() { + syncDir.ensureExistsSync(this) + this.syncDirCheckInterval = syncDir.startIntervalCheck(this) + return this.watcher.resume() + } + + suspend() { + clearInterval(this.syncDirCheckInterval) + return this.watcher.suspend() + } + /** Stop watching the file system */ stop() { clearInterval(this.syncDirCheckInterval) diff --git a/core/local/watcher.js b/core/local/watcher.js index b6d476ff7..4d1c541ed 100644 --- a/core/local/watcher.js +++ b/core/local/watcher.js @@ -26,6 +26,8 @@ import type { Checksumer } from './checksumer' export interface Watcher { checksumer: Checksumer, start (): Promise<*>, + resume (): Promise<*>, + suspend (): Promise<*>, stop (force: ?bool): Promise<*>, onFatal (listener: Error => any): void, fatal (err: Error): void, diff --git a/core/remote/index.js b/core/remote/index.js index be097b7df..f8ad3df6e 100644 --- a/core/remote/index.js +++ b/core/remote/index.js @@ -117,6 +117,15 @@ class Remote /*:: implements Reader, Writer */ { return this.warningsPoller.start() } + async resume() { + await this.watcher.resume() + return this.warningsPoller.start() + } + + async suspend() { + await Promise.all([this.watcher.suspend(), this.warningsPoller.stop()]) + } + async stop() { await Promise.all([this.watcher.stop(), this.warningsPoller.stop()]) } diff --git a/core/remote/watcher/index.js b/core/remote/watcher/index.js index 3fcea43f0..5f916823b 100644 --- a/core/remote/watcher/index.js +++ b/core/remote/watcher/index.js @@ -110,7 +110,8 @@ class RemoteWatcher { async start() { if (!this.running) { - log.info('Starting watcher') + log.info('Starting watcher...') + this.running = true this.startClock() @@ -127,9 +128,21 @@ class RemoteWatcher { } } + resume() { + log.info('Resuming watcher...') + + return this.start() + } + + suspend() { + log.info('Suspending watcher...') + + return this.stop() + } + async stop() { if (this.running) { - log.info('Stopping watcher') + log.info('Stopping watcher...') if (this.realtimeManager) { this.realtimeManager.stop() diff --git a/core/sync/index.js b/core/sync/index.js index 9bc0a8aa7..fda6a404d 100644 --- a/core/sync/index.js +++ b/core/sync/index.js @@ -298,14 +298,21 @@ class Sync { } } - async started() { - await this.lifecycle.started() - } + async resume() { + log.info('resuming synchronization') - // Manually force a full synchronization - async forceSync() { - await this.stop() - await this.start() + try { + this.lifecycle.begin('start') + } catch (err) { + return + } + + this.events.once('power-suspend', this.suspend) + + this.lifecycle.unblockFor('all') + this.local.resume() + this.remote.resume() + this.lifecycle.end('start') } suspend() { @@ -319,19 +326,14 @@ class Sync { return } - this.local.stop() - this.remote.stop() + this.local.suspend() + this.remote.suspend() clearInterval(this.retryInterval) this.retryInterval = null this.lifecycle.unblockFor('all') this.lifecycle.end('stop') } - async resume() { - log.info('resuming synchronization') - await this.start() - } - // Stop the synchronization async stop() /*: Promise */ { // In case an interval timer was started, we clear it to make sure it won't @@ -361,10 +363,20 @@ class Sync { this.lifecycle.end('stop') } + async started() { + await this.lifecycle.started() + } + async stopped() { await this.lifecycle.stopped() } + // Manually force a full synchronization + async forceSync() { + await this.stop() + await this.start() + } + fatal(err /*: Error */) { log.fatal(`Sync fatal: ${err.message}`, { err, sentry: true }) diff --git a/test/unit/local/chokidar/event_buffer.js b/test/unit/local/chokidar/event_buffer.js index 0db9bfdaf..eab9ad0ab 100644 --- a/test/unit/local/chokidar/event_buffer.js +++ b/test/unit/local/chokidar/event_buffer.js @@ -83,7 +83,7 @@ onPlatform('darwin', () => { it('can be switched to timeout mode', () => { buffer.switchMode('timeout') - should(flushed).have.been.calledWith([event1, event2]) + should(flushed).not.have.been.called() }) }) @@ -96,7 +96,7 @@ onPlatform('darwin', () => { buffer.push(event1) buffer.switchMode('idle') clock.tick(TIMEOUT_IN_MS) - should(flushed).have.been.calledWith([event1]) + should(flushed).not.have.been.called() }) it('does not flush without events', () => {