Skip to content

Commit

Permalink
Merge branch 'main' into accept-absolute-url-strings
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Aug 5, 2023
2 parents d27d0e9 + 538761f commit f6314c5
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 63 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Dependencies {
val NettyVersion = "4.1.94.Final"
val NettyIncubatorVersion = "0.0.20.Final"
val ScalaCompactCollectionVersion = "2.11.0"
val ZioVersion = "2.0.13"
val ZioVersion = "2.0.15+81-32928b56-SNAPSHOT"
val ZioCliVersion = "0.5.0"
val ZioSchemaVersion = "0.4.13"
val SttpVersion = "3.3.18"
Expand Down
14 changes: 8 additions & 6 deletions zio-http-testkit/src/main/scala/zio/http/TestChannel.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package zio.http

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.http.ChannelEvent.{Unregistered, UserEvent, UserEventTriggered}

Expand All @@ -8,15 +10,15 @@ case class TestChannel(
out: Queue[WebSocketChannelEvent],
promise: Promise[Nothing, Unit],
) extends WebSocketChannel {
def awaitShutdown: UIO[Unit] =
def awaitShutdown(implicit trace: Trace): UIO[Unit] =
promise.await
def receive: Task[WebSocketChannelEvent] =
def receive(implicit trace: Trace): Task[WebSocketChannelEvent] =
in.take
def send(in: WebSocketChannelEvent): Task[Unit] =
def send(in: WebSocketChannelEvent)(implicit trace: Trace): Task[Unit] =
out.offer(in).unit
def sendAll(in: Iterable[WebSocketChannelEvent]): Task[Unit] =
def sendAll(in: Iterable[WebSocketChannelEvent])(implicit trace: Trace): Task[Unit] =
out.offerAll(in).unit
def shutdown: UIO[Unit] =
def shutdown(implicit trace: Trace): UIO[Unit] =
in.offer(ChannelEvent.Unregistered) *>
out.offer(ChannelEvent.Unregistered) *>
promise.succeed(()).unit
Expand All @@ -27,7 +29,7 @@ object TestChannel {
in: Queue[WebSocketChannelEvent],
out: Queue[WebSocketChannelEvent],
promise: Promise[Nothing, Unit],
): ZIO[Any, Nothing, TestChannel] =
)(implicit trace: Trace): ZIO[Any, Nothing, TestChannel] =
for {
_ <- out.offer(UserEventTriggered(UserEvent.HandshakeComplete))
} yield TestChannel(in, out, promise)
Expand Down
30 changes: 15 additions & 15 deletions zio-http/src/main/scala/zio/http/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,44 @@ trait Channel[-In, +Out] { self =>
/**
* Await shutdown of the channel.
*/
def awaitShutdown: UIO[Unit]
def awaitShutdown(implicit trace: Trace): UIO[Unit]

/**
* Read a message from the channel, suspending until the next message is
* available.
*/
def receive: Task[Out]
def receive(implicit trace: Trace): Task[Out]

/**
* Send a message to the channel.
*/
def send(in: In): Task[Unit]
def send(in: In)(implicit trace: Trace): Task[Unit]

/**
* Send all messages to the channel.
*/
def sendAll(in: Iterable[In]): Task[Unit]
def sendAll(in: Iterable[In])(implicit trace: Trace): Task[Unit]

/**
* Shut down the channel.
*/
def shutdown: UIO[Unit]
def shutdown(implicit trace: Trace): UIO[Unit]

/**
* Constructs a new channel that automatically transforms messages sent to
* this channel using the specified function.
*/
final def contramap[In2](f: In2 => In): Channel[In2, Out] =
new Channel[In2, Out] {
def awaitShutdown: UIO[Unit] =
def awaitShutdown(implicit trace: Trace): UIO[Unit] =
self.awaitShutdown
def receive: Task[Out] =
def receive(implicit trace: Trace): Task[Out] =
self.receive
def send(in: In2): Task[Unit] =
def send(in: In2)(implicit trace: Trace): Task[Unit] =
self.send(f(in))
def sendAll(in: Iterable[In2]): Task[Unit] =
def sendAll(in: Iterable[In2])(implicit trace: Trace): Task[Unit] =
self.sendAll(in.map(f))
def shutdown: UIO[Unit] =
def shutdown(implicit trace: Trace): UIO[Unit] =
self.shutdown
}

Expand All @@ -75,15 +75,15 @@ trait Channel[-In, +Out] { self =>
*/
final def map[Out2](f: Out => Out2)(implicit trace: Trace): Channel[In, Out2] =
new Channel[In, Out2] {
def awaitShutdown: UIO[Unit] =
def awaitShutdown(implicit trace: Trace): UIO[Unit] =
self.awaitShutdown
def receive: Task[Out2] =
def receive(implicit trace: Trace): Task[Out2] =
self.receive.map(f)
def send(in: In): Task[Unit] =
def send(in: In)(implicit trace: Trace): Task[Unit] =
self.send(in)
def sendAll(in: Iterable[In]): Task[Unit] =
def sendAll(in: Iterable[In])(implicit trace: Trace): Task[Unit] =
self.sendAll(in)
def shutdown: UIO[Unit] =
def shutdown(implicit trace: Trace): UIO[Unit] =
self.shutdown
}

Expand Down
12 changes: 6 additions & 6 deletions zio-http/src/main/scala/zio/http/WebSocketChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.http

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.http.ChannelEvent.{ExceptionCaught, Read, Registered, Unregistered, UserEventTriggered}
import zio.http.netty.NettyChannel
Expand All @@ -31,31 +32,30 @@ private[http] object WebSocketChannel {
queue: Queue[WebSocketChannelEvent],
): WebSocketChannel =
new WebSocketChannel {
def awaitShutdown: UIO[Unit] =
def awaitShutdown(implicit trace: Trace): UIO[Unit] =
nettyChannel.awaitClose

def receive: Task[WebSocketChannelEvent] =
def receive(implicit trace: Trace): Task[WebSocketChannelEvent] =
queue.take

def send(in: WebSocketChannelEvent): Task[Unit] = {
def send(in: WebSocketChannelEvent)(implicit trace: Trace): Task[Unit] = {
in match {
case Read(message) => nettyChannel.writeAndFlush(frameToNetty(message))
case _ => ZIO.unit
}
}

def sendAll(in: Iterable[WebSocketChannelEvent]): Task[Unit] =
def sendAll(in: Iterable[WebSocketChannelEvent])(implicit trace: Trace): Task[Unit] =
ZIO.suspendSucceed {
val iterator = in.iterator.collect { case Read(message) => message }

println(s"sendAll")
ZIO.whileLoop(iterator.hasNext) {
val message = iterator.next()
if (iterator.hasNext) nettyChannel.write(frameToNetty(message))
else nettyChannel.writeAndFlush(frameToNetty(message))
}(_ => ())
}
def shutdown: UIO[Unit] =
def shutdown(implicit trace: Trace): UIO[Unit] =
nettyChannel.close(false).orDie
}

Expand Down
1 change: 1 addition & 0 deletions zio-http/src/main/scala/zio/http/WebSocketConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.http

import zio.Duration
import zio.stacktracer.TracingImplicits.disableAutoTrace

/**
* Server side websocket configuration
Expand Down
1 change: 1 addition & 0 deletions zio-http/src/main/scala/zio/http/WebSocketFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.http

import zio.Chunk
import zio.stacktracer.TracingImplicits.disableAutoTrace

sealed trait WebSocketFrame extends Product with Serializable { self =>
def isFinal: Boolean = true
Expand Down
36 changes: 1 addition & 35 deletions zio-http/src/test/scala/zio/http/WebSocketSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,6 @@ import zio.http.ChannelEvent.{Read, Unregistered, UserEvent, UserEventTriggered}
import zio.http.internal.{DynamicServer, HttpRunnableSpec, severTestLayer}

object WebSocketSpec extends HttpRunnableSpec {
implicit class scopeDisconnect(scope: zio.Scope) {
def disconnect(label: String): Scope =
new Scope {
def addFinalizerExit(finalizer: Exit[Any, Any] => UIO[Any])(implicit trace: zio.Trace): UIO[Unit] =
scope.addFinalizerExit { (exit: Exit[Any, Any]) =>
val warn =
ZIO
.logWarning(
s"A finalizer for layer ${label} has taken more than 1 minute to complete. Skipping this finalizer and moving onto the next one.",
)
.delay(1.minute)
.unit

finalizer(exit).disconnect.race(warn)
}

def forkWith(executionStrategy: => zio.ExecutionStrategy)(implicit trace: zio.Trace): UIO[Scope.Closeable] =
scope.forkWith(executionStrategy)
}
}
implicit class layerDisconnect[I, E, O](layer: ZLayer[I, E, O]) {
def disconnect(label: String): ZLayer[I, E, O] =
ZLayer.scopedEnvironment[I] {
for {
scope <- ZIO.scope
zenv <- layer.build(scope.disconnect(label))
} yield zenv
}
}

private val websocketSpec = suite("WebsocketSpec")(
test("channel events between client and server") {
Expand Down Expand Up @@ -241,12 +212,7 @@ object WebSocketSpec extends HttpRunnableSpec {
serve.as(List(websocketSpec))
}
}
.provideShared(
DynamicServer.live.disconnect("DynamicServer.live"),
severTestLayer.disconnect("serverTestLayer"),
Client.default.disconnect("Client.default"),
Scope.default,
) @@
.provideShared(DynamicServer.live, severTestLayer, Client.default, Scope.default) @@
timeout(30 seconds) @@ diagnose(30.seconds) @@ withLiveClock @@ sequential

final class MessageCollector[A](ref: Ref[List[A]], promise: Promise[Nothing, Unit]) {
Expand Down

0 comments on commit f6314c5

Please sign in to comment.