Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(PubSub): unbounded replay #2940

Closed
wants to merge 7 commits into from

Conversation

dilame
Copy link
Contributor

@dilame dilame commented Jun 6, 2024

Type

  • Refactor
  • Feature
  • Bug Fix
  • Optimization
  • Documentation Update

Description

Implementation of replay last N values feature for PubSub.unbounded(replayBufferSize: N = 0).
Can further be used to create shared Stream #2943.

Related

  • Related Issue #
  • Closes #

@dilame dilame requested a review from mikearnaldi as a code owner June 6, 2024 14:42
Copy link

changeset-bot bot commented Jun 6, 2024

🦋 Changeset detected

Latest commit: 7cdbf81

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 30 packages
Name Type
effect Minor
@effect/cli Major
@effect/cluster-browser Major
@effect/cluster-node Major
@effect/cluster-workflow Major
@effect/cluster Major
@effect/experimental Major
@effect/opentelemetry Major
@effect/platform-browser Major
@effect/platform-bun Major
@effect/platform-node-shared Major
@effect/platform-node Major
@effect/platform Major
@effect/printer-ansi Major
@effect/printer Major
@effect/rpc-http Major
@effect/rpc Major
@effect/schema Major
@effect/sql-d1 Major
@effect/sql-drizzle Major
@effect/sql-mssql Major
@effect/sql-mysql2 Major
@effect/sql-pg Major
@effect/sql-sqlite-bun Major
@effect/sql-sqlite-node Major
@effect/sql-sqlite-react-native Major
@effect/sql-sqlite-wasm Major
@effect/sql Major
@effect/typeclass Major
@effect/vitest Major

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@github-actions github-actions bot changed the base branch from main to next-minor June 6, 2024 14:43
@dilame dilame changed the title feat(pubsub): unbounded replay feat(PubSub): unbounded replay Jun 6, 2024
@dilame dilame force-pushed the pubsub-unbounded-replay branch 2 times, most recently from 2aa3eeb to 62af9a6 Compare June 6, 2024 19:39
@github-actions github-actions bot force-pushed the next-minor branch 3 times, most recently from f45852b to 4fdcf95 Compare June 7, 2024 13:49
@github-actions github-actions bot force-pushed the next-minor branch 8 times, most recently from 13bbc9c to 1ad8ec6 Compare June 9, 2024 21:57
@tim-smart
Copy link
Member

I think if we were going to add a replay option, we would need to add it to every strategy.

Also switch to object arguments:

Pubsub.unbounded<string>({ replaySize: 16 })

@dilame
Copy link
Contributor Author

dilame commented Jun 9, 2024

I'm not sure how replay should behave for all the other strategies.
FMPOV it only makes sense for unbounded, because it's the only unlimited strategy.
For example, PubSub.bounded<string>({capacity: 2, replay: 3}) looks contradictory to me, same as PubSub.dropping<string>({capacity: 2, replay: 3}). But maybe i am wrong.
I would suggest to focus on unbounded strategy in this PR.

I like the idea about object argument. What do you think about replayBufferSize name? I think it exhaustively reflects the concept behind it

@tim-smart
Copy link
Member

PubSub.bounded<string>({capacity: 2, replay: 3}) looks contradictory to me

capacity and replay could be separate buffers, so you could have a higher replay buffer compared to the capacity. Or the capacity could be the ceiling.

Also I think the replay doesn't need to be tracked per strategy, but rather on the PubSubImpl. The replay buffer can be independent of the subscriber-based logic.

@dilame
Copy link
Contributor Author

dilame commented Jun 10, 2024

Also I think the replay doesn't need to be tracked per strategy, but rather on the PubSubImpl. The replay buffer can be independent of the subscriber-based logic.

I don't see a way to implement it in the PubSubImpl within the current architecture because we need to pass a pointer to the node from where to start to the new subscription (https://github.com/dilame/effect/blob/pubsub-unbounded-replay/packages/effect/src/internal/pubsub.ts#L754). The new subscription is being created at the UnboundedPubSub level, and UnboundedPubSub doesn't have any pointers to the PubSubImpl, so even if we track the replay in the PubSubImpl, we will not have access to it from the place where we need it.

I think it is possible to reimplement the whole PubSub module from scratch with a new architecture that will track replay on a higher abstraction level, but is it really needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

6 participants