Skip to content

Commit

Permalink
feat(streams): implement zip
Browse files Browse the repository at this point in the history
  • Loading branch information
DerYeger committed Aug 11, 2023
1 parent f212ebc commit 530aed4
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 13 deletions.
2 changes: 1 addition & 1 deletion packages/streams/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"devDependencies": {
"@yeger/tsconfig": "workspace:*",
"typescript": "5.1.6",
"vite": "4.4.2",
"vite": "4.4.4",
"vite-plugin-lib": "workspace:*"
},
"publishConfig": {
Expand Down
34 changes: 34 additions & 0 deletions packages/streams/src/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ export abstract class AsyncStream<T> implements AsyncIterable<T> {
return AsyncFlatMapStream.ofPrevious(this, fn)
}

public zip<R>(other: Iterable<R> | AsyncIterable<R>) {
return AsyncZipStream.ofPrevious(this, other)
}

public limit(limit: number) {
return AsyncLimitStream.ofPrevious(this, limit)
}
Expand Down Expand Up @@ -205,6 +209,36 @@ class AsyncFlatMapStream<Input, Output> extends AsyncStream<Output> {
}
}

class AsyncZipStream<T, R> extends AsyncStream<[T, R]> {
private constructor(
private readonly previous: AsyncStream<T>,
private readonly other: Iterable<R> | AsyncIterable<R>,
) {
super()
}

public static ofPrevious<T, R>(
previous: AsyncStream<T>,
other: Iterable<R> | AsyncIterable<R>,
) {
return new AsyncZipStream(previous, other)
}

public async *[Symbol.asyncIterator](): AsyncIterableIterator<[T, R]> {
const otherIterator =
Symbol.asyncIterator in this.other
? this.other[Symbol.asyncIterator]()
: this.other[Symbol.iterator]()
for await (const item of this.previous) {
const otherItem = await otherIterator.next()
if (otherItem.done) {
break
}
yield [item, otherItem.value]
}
}
}

class AsyncLimitStream<T> extends AsyncStream<T> {
private constructor(
private readonly previous: AsyncStream<T>,
Expand Down
31 changes: 31 additions & 0 deletions packages/streams/src/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export abstract class Stream<T> implements Iterable<T> {
return FlatMapStream.ofPrevious(this, fn)
}

public zip<R>(other: Iterable<R>) {
return ZipStream.ofPrevious(this, other)
}

public limit(limit: number) {
return LimitStream.ofPrevious(this, limit)
}
Expand Down Expand Up @@ -192,6 +196,33 @@ class FlatMapStream<Input, Output> extends Stream<Output> {
}
}

class ZipStream<T, R> extends Stream<[T, R]> {
private constructor(
private readonly previous: Stream<T>,
private readonly other: Iterable<R>,
) {
super()
}

public static ofPrevious<T, R>(
previous: Stream<T>,
other: Iterable<R>,
): ZipStream<T, R> {
return new ZipStream(previous, other)
}

public *[Symbol.iterator](): IterableIterator<[T, R]> {
const otherIterator = this.other[Symbol.iterator]()
for (const item of this.previous) {
const otherItem = otherIterator.next()
if (otherItem.done) {
break
}
yield [item, otherItem.value]
}
}
}

class LimitStream<T> extends Stream<T> {
private constructor(
private readonly previous: Stream<T>,
Expand Down
11 changes: 11 additions & 0 deletions packages/streams/test/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ describe('async streams', () => {
expect(streamResult).toEqual([1, 1, 2, 2, 3, 3])
})

it('can zip', async () => {
const streamResult = await AsyncStream.from([1, 2, 3])
.zip([4, 5, 6])
.toArray()
expect(streamResult).toEqual([
[1, 4],
[2, 5],
[3, 6],
])
})

it('can transform to async', async () => {
const a = await AsyncStream.from([1, 2, 3])
.map((x) => 2 * x)
Expand Down
11 changes: 11 additions & 0 deletions packages/streams/test/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,15 @@ describe('sync streams', () => {
.toArray()
expect(streamResult).toEqual([1, 2, 2, 3, 3, 3])
})

it('can zip', () => {
const streamResult = Stream.from([1, 2, 3])
.zip(Stream.from(['a', 'b', 'c']))
.toArray()
expect(streamResult).toEqual([
[1, 'a'],
[2, 'b'],
[3, 'c'],
])
})
})
17 changes: 5 additions & 12 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 530aed4

Please sign in to comment.