From 530aed47e2d389e658d729367aff12a18e888e1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Mu=CC=88ller?= Date: Thu, 13 Jul 2023 05:56:05 +0200 Subject: [PATCH] feat(streams): implement zip --- packages/streams/package.json | 2 +- packages/streams/src/async.ts | 34 +++++++++++++++++++++++++++++ packages/streams/src/sync.ts | 31 ++++++++++++++++++++++++++ packages/streams/test/async.test.ts | 11 ++++++++++ packages/streams/test/sync.test.ts | 11 ++++++++++ pnpm-lock.yaml | 17 +++++---------- 6 files changed, 93 insertions(+), 13 deletions(-) diff --git a/packages/streams/package.json b/packages/streams/package.json index dffc61efa..0b73629a2 100644 --- a/packages/streams/package.json +++ b/packages/streams/package.json @@ -38,7 +38,7 @@ "devDependencies": { "@yeger/tsconfig": "workspace:*", "typescript": "5.1.6", - "vite": "4.4.2", + "vite": "4.4.4", "vite-plugin-lib": "workspace:*" }, "publishConfig": { diff --git a/packages/streams/src/async.ts b/packages/streams/src/async.ts index e48dbdffe..60c294740 100644 --- a/packages/streams/src/async.ts +++ b/packages/streams/src/async.ts @@ -55,6 +55,10 @@ export abstract class AsyncStream implements AsyncIterable { return AsyncFlatMapStream.ofPrevious(this, fn) } + public zip(other: Iterable | AsyncIterable) { + return AsyncZipStream.ofPrevious(this, other) + } + public limit(limit: number) { return AsyncLimitStream.ofPrevious(this, limit) } @@ -205,6 +209,36 @@ class AsyncFlatMapStream extends AsyncStream { } } +class AsyncZipStream extends AsyncStream<[T, R]> { + private constructor( + private readonly previous: AsyncStream, + private readonly other: Iterable | AsyncIterable, + ) { + super() + } + + public static ofPrevious( + previous: AsyncStream, + other: Iterable | AsyncIterable, + ) { + return new AsyncZipStream(previous, other) + } + + public async *[Symbol.asyncIterator](): AsyncIterableIterator<[T, R]> { + const otherIterator = + Symbol.asyncIterator in this.other + ? this.other[Symbol.asyncIterator]() + : this.other[Symbol.iterator]() + for await (const item of this.previous) { + const otherItem = await otherIterator.next() + if (otherItem.done) { + break + } + yield [item, otherItem.value] + } + } +} + class AsyncLimitStream extends AsyncStream { private constructor( private readonly previous: AsyncStream, diff --git a/packages/streams/src/sync.ts b/packages/streams/src/sync.ts index 80b94caf3..64c2dd7f4 100644 --- a/packages/streams/src/sync.ts +++ b/packages/streams/src/sync.ts @@ -42,6 +42,10 @@ export abstract class Stream implements Iterable { return FlatMapStream.ofPrevious(this, fn) } + public zip(other: Iterable) { + return ZipStream.ofPrevious(this, other) + } + public limit(limit: number) { return LimitStream.ofPrevious(this, limit) } @@ -192,6 +196,33 @@ class FlatMapStream extends Stream { } } +class ZipStream extends Stream<[T, R]> { + private constructor( + private readonly previous: Stream, + private readonly other: Iterable, + ) { + super() + } + + public static ofPrevious( + previous: Stream, + other: Iterable, + ): ZipStream { + return new ZipStream(previous, other) + } + + public *[Symbol.iterator](): IterableIterator<[T, R]> { + const otherIterator = this.other[Symbol.iterator]() + for (const item of this.previous) { + const otherItem = otherIterator.next() + if (otherItem.done) { + break + } + yield [item, otherItem.value] + } + } +} + class LimitStream extends Stream { private constructor( private readonly previous: Stream, diff --git a/packages/streams/test/async.test.ts b/packages/streams/test/async.test.ts index 0b6e02bc0..c60bd0144 100644 --- a/packages/streams/test/async.test.ts +++ b/packages/streams/test/async.test.ts @@ -84,6 +84,17 @@ describe('async streams', () => { expect(streamResult).toEqual([1, 1, 2, 2, 3, 3]) }) + it('can zip', async () => { + const streamResult = await AsyncStream.from([1, 2, 3]) + .zip([4, 5, 6]) + .toArray() + expect(streamResult).toEqual([ + [1, 4], + [2, 5], + [3, 6], + ]) + }) + it('can transform to async', async () => { const a = await AsyncStream.from([1, 2, 3]) .map((x) => 2 * x) diff --git a/packages/streams/test/sync.test.ts b/packages/streams/test/sync.test.ts index 21c31d2b5..c12f61006 100644 --- a/packages/streams/test/sync.test.ts +++ b/packages/streams/test/sync.test.ts @@ -65,4 +65,15 @@ describe('sync streams', () => { .toArray() expect(streamResult).toEqual([1, 2, 2, 3, 3, 3]) }) + + it('can zip', () => { + const streamResult = Stream.from([1, 2, 3]) + .zip(Stream.from(['a', 'b', 'c'])) + .toArray() + expect(streamResult).toEqual([ + [1, 'a'], + [2, 'b'], + [3, 'c'], + ]) + }) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 76fe3c960..30fb9ebcc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -547,11 +547,11 @@ importers: specifier: workspace:* version: link:../tsconfig typescript: - specifier: 5.1.3 - version: 5.1.3 + specifier: 5.1.6 + version: 5.1.6 vite: - specifier: 4.3.9 - version: 4.3.9(@types/node@18.16.18) + specifier: 4.4.4 + version: 4.4.4(@types/node@18.16.19) vite-plugin-lib: specifier: workspace:* version: link:../vite-plugin-lib @@ -9099,7 +9099,7 @@ packages: dependencies: foreground-child: 3.1.1 jackspeak: 2.2.0 - minimatch: 9.0.0 + minimatch: 9.0.2 minipass: 5.0.0 path-scurry: 1.7.0 dev: true @@ -10962,13 +10962,6 @@ packages: brace-expansion: 2.0.1 dev: true - /minimatch@9.0.0: - resolution: {integrity: sha512-0jJj8AvgKqWN05mrwuqi8QYKx1WmYSUoKSxu5Qhs9prezTz10sxAHGNZe9J9cqIJzta8DWsleh2KaVaLl6Ru2w==} - engines: {node: '>=16 || 14 >=14.17'} - dependencies: - brace-expansion: 2.0.1 - dev: true - /minimatch@9.0.1: resolution: {integrity: sha512-0jWhJpD/MdhPXwPuiRkCbfYfSKp2qnn2eOc279qI7f+osl/l+prKSrvhg157zSYvx/1nmgn2NqdT6k2Z7zSH9w==} engines: {node: '>=16 || 14 >=14.17'}