-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce intersperse
function
#18
Conversation
04e57f4
to
d77e952
Compare
* An element to be injected between the stream elements. | ||
* @return | ||
* A source, onto which elements will be injected. | ||
* @example |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Please add the necessary imports to the example (probably import ox.*
and import ox.channels.Source
) so that it's copy-paste'able (also in the other variant of intersperse
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥇 Updated examples and confirmed in Scala Worksheet
file
val c = StageCapacity.newChannel[U] | ||
forkDaemon { | ||
var firstEmitted = false | ||
start.foreach(c.send(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Could be just start.foreach(c.send)
, also applies to case ChannelClosed.Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: In the two other places we only continue processing when send
succeeds, i.e. when send(...).isValue
is true
- shouldn't this be the case here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be just start.foreach(c.send), also applies to case ChannelClosed.Done
👍 method reference rulez - fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: In the two other places we only continue processing...
That is necessary to properly implement the situation when sink cannot accept anything anymore, right? Fixed.
class SourceOpsIntersperseTest extends AnyFlatSpec with Matchers { | ||
behavior of "Source.intersperse" | ||
|
||
it should "intersperse with inject only over an empty source" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: You can avoid one level of the curly braces by writing
it should "..." in scoped {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💪 done
s.receive() shouldBe "foo" | ||
s.receive() shouldBe ", " | ||
s.receive() shouldBe "bar" | ||
s.receive() shouldBe ChannelClosed.Done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: If you want to receive all elements from the source, you can use s.toList
and make a single assertion on the list instead of aseerting individual receive
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 done
d77e952
to
767a962
Compare
private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] = | ||
val c = StageCapacity.newChannel[U] | ||
forkDaemon { | ||
if (start.map(c.send(_).isValue).getOrElse(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we just created the c
channel, it can't bet done/errored (we're the only ones sending to it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be true as long as the downstream has no way to close the channel (which is the current state). So the question is whether we plan to change this at some point soon - if so, this code would already handle such case, otherwise it's YAGNI.
Since the "repeat while send succeeds" pattern originates from treating SourceOps.map
as a reference implementation. Thus if we decide to keep the downstream unable to close the channel, we should probably also update some of the recently added combinators so that they don't use isValue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rucek yes, that's a good question, but I think it's the current design's assumption, that the streams are one-way - data always flows downstream. There's no backchannel which would allow propagating information. There are two ways of implementing such a backchannel, if needed:
(a) when the downstream receives an error/done, simply close the scope. This will close all intermediate components. My suspicion is that this will cover 90% of the use-cases, but I might be proven wrong of course. But so far I didn't come up with a compelling use-case to actually have the backchannel, in presence of structured concurrency & scopes
(b) for the other 10%, you can implement a backchannel using a channel. Then, in you original source or some intermediate component you can do select(backchannel.receiveClause, s.sendClause(x))
to send data while being informed if anything is sent on the backchannel
The docs kind of try to communicate the same as above, but maybe they need extending :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
receive() match | ||
case ChannelClosed.Done => end.foreach(c.send); c.done(); false | ||
case ChannelClosed.Error(e) => c.error(e); false | ||
case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v).isValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, we know that the channel can't be closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
val c = Channel[String]() | ||
fork { | ||
c.done() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: This is equivalent to Source.empty[String]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
val c = Channel[String]() | ||
fork { | ||
c.send("foo") | ||
c.done() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: This is equivalent to Source.fromValues("foo")
, you could also use fromValues
in the other test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
767a962
to
d16440d
Compare
@@ -34,6 +34,67 @@ trait SourceOps[+T] { this: Source[T] => | |||
} | |||
c2 | |||
|
|||
/** Intersperses stream with provided element. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Updated function description to match the other operators
The intersperse function behaves as `List.mkString` e.g.: * called with an `inject` element only it produces Source.fromValues("f", "b").intersperse(", ") // ("f", ", ", "b") * called with `start`, `inject` and `end` it produces Source.fromValues("f", "b").intersperse("[", ", ", "]") // ("[", "f", ", ", "b", "]")
d16440d
to
a4de618
Compare
The intersperse function behaves as
List.mkString
e.g.:inject
element only it producesstart
,inject
andend
it produces