Skip to content

Commit

Permalink
experiment with layer disconnect (#2339)
Browse files Browse the repository at this point in the history
* experiment with layer disconnect

* improve diagnostics
  • Loading branch information
jdegoes authored Jul 28, 2023
1 parent 500cbb7 commit 3f1fb27
Showing 1 changed file with 35 additions and 1 deletion.
36 changes: 35 additions & 1 deletion zio-http/src/test/scala/zio/http/WebSocketSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,35 @@ import zio.http.ChannelEvent.{Read, Unregistered, UserEvent, UserEventTriggered}
import zio.http.internal.{DynamicServer, HttpRunnableSpec, severTestLayer}

object WebSocketSpec extends HttpRunnableSpec {
implicit class scopeDisconnect(scope: zio.Scope) {
def disconnect(label: String): Scope =
new Scope {
def addFinalizerExit(finalizer: Exit[Any, Any] => UIO[Any])(implicit trace: zio.Trace): UIO[Unit] =
scope.addFinalizerExit { (exit: Exit[Any, Any]) =>
val warn =
ZIO
.logWarning(
s"A finalizer for layer ${label} has taken more than 1 minute to complete. Skipping this finalizer and moving onto the next one.",
)
.delay(1.minute)
.unit

finalizer(exit).disconnect.race(warn)
}

def forkWith(executionStrategy: => zio.ExecutionStrategy)(implicit trace: zio.Trace): UIO[Scope.Closeable] =
scope.forkWith(executionStrategy)
}
}
implicit class layerDisconnect[I, E, O](layer: ZLayer[I, E, O]) {
def disconnect(label: String): ZLayer[I, E, O] =
ZLayer.scopedEnvironment[I] {
for {
scope <- ZIO.scope
zenv <- layer.build(scope.disconnect(label))
} yield zenv
}
}

private val websocketSpec = suite("WebsocketSpec")(
test("channel events between client and server") {
Expand Down Expand Up @@ -212,7 +241,12 @@ object WebSocketSpec extends HttpRunnableSpec {
serve.as(List(websocketSpec))
}
}
.provideShared(DynamicServer.live, severTestLayer, Client.default, Scope.default) @@
.provideShared(
DynamicServer.live.disconnect("DynamicServer.live"),
severTestLayer.disconnect("serverTestLayer"),
Client.default.disconnect("Client.default"),
Scope.default,
) @@
timeout(30 seconds) @@ diagnose(30.seconds) @@ withLiveClock @@ sequential

final class MessageCollector[A](ref: Ref[List[A]], promise: Promise[Nothing, Unit]) {
Expand Down

0 comments on commit 3f1fb27

Please sign in to comment.