Skip to content

Commit

Permalink
Merge branch 'main' into url-macro-2
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Jul 28, 2023
2 parents 5d0c693 + 812ac8e commit 0a0b6ab
Show file tree
Hide file tree
Showing 55 changed files with 221 additions and 130 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ jobs:
scala: [2.12.18, 2.13.10, 3.3.0]
java: [graal_21.1.0@11, temurin@8]
runs-on: ${{ matrix.os }}
timeout-minutes: 60

steps:
- name: Checkout current branch (full)
uses: actions/checkout@v3
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import BuildHelper._
import Dependencies._
import sbt.librarymanagement.ScalaArtifacts.isScala3
import scala.concurrent.duration._

val releaseDrafterVersion = "5"

Expand Down Expand Up @@ -101,6 +102,8 @@ inThisBuild(
),
)

ThisBuild / githubWorkflowBuildTimeout := Some(60.minutes)

lazy val root = (project in file("."))
.settings(stdSettings("zio-http-root"))
.settings(publishSetting(false))
Expand Down
10 changes: 6 additions & 4 deletions zio-http-testkit/src/main/scala/zio/http/TestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ case class TestChannel(
out: Queue[WebSocketChannelEvent],
promise: Promise[Nothing, Unit],
) extends WebSocketChannel {
def awaitShutdown: UIO[Unit] =
def awaitShutdown: UIO[Unit] =
promise.await
def receive: Task[WebSocketChannelEvent] =
def receive: Task[WebSocketChannelEvent] =
in.take
def send(in: WebSocketChannelEvent): Task[Unit] =
def send(in: WebSocketChannelEvent): Task[Unit] =
out.offer(in).unit
def shutdown: UIO[Unit] =
def sendAll(in: Iterable[WebSocketChannelEvent]): Task[Unit] =
out.offerAll(in).unit
def shutdown: UIO[Unit] =
in.offer(ChannelEvent.Unregistered) *>
out.offer(ChannelEvent.Unregistered) *>
promise.succeed(()).unit
Expand Down
9 changes: 5 additions & 4 deletions zio-http/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.charset._
import java.nio.file._

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.stream.ZStream

Expand Down Expand Up @@ -161,7 +162,7 @@ object Body {
def fromMultipartForm(
form: Form,
specificBoundary: Boundary,
): Body = {
)(implicit trace: Trace): Body = {
val bytes = form.multipartBytes(specificBoundary)

StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
Expand All @@ -174,7 +175,7 @@ object Body {
*/
def fromMultipartFormUUID(
form: Form,
): UIO[Body] =
)(implicit trace: Trace): UIO[Body] =
form.multipartBytesUUID.map { case (boundary, bytes) =>
StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
}
Expand Down Expand Up @@ -331,8 +332,8 @@ object Body {
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(newBoundary))
}

private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])
private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])(Trace.empty)

private val zioEmptyChunk = ZIO.succeed(Chunk.empty[Byte])
private val zioEmptyChunk = ZIO.succeed(Chunk.empty[Byte])(Trace.empty)

}
5 changes: 3 additions & 2 deletions zio-http/src/main/scala/zio/http/Boundary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package zio.http

import java.nio.charset.Charset

import zio.Chunk
import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{Chunk, Trace}

/**
* A multipart boundary, which consists of both the boundary and its charset.
Expand Down Expand Up @@ -72,7 +73,7 @@ object Boundary {
def fromString(content: String, charset: Charset): Option[Boundary] =
fromContent(Chunk.fromArray(content.getBytes(charset)), charset)

def randomUUID: zio.UIO[Boundary] =
def randomUUID(implicit trace: Trace): zio.UIO[Boundary] =
zio.Random.nextUUID.map { id =>
Boundary(s"(((${id.toString()})))")
}
Expand Down
30 changes: 20 additions & 10 deletions zio-http/src/main/scala/zio/http/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.http

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

/**
* A `Channel` is an asynchronous communication channel that supports receiving
Expand All @@ -40,6 +41,11 @@ trait Channel[-In, +Out] { self =>
*/
def send(in: In): Task[Unit]

/**
* Send all messages to the channel.
*/
def sendAll(in: Iterable[In]): Task[Unit]

/**
* Shut down the channel.
*/
Expand All @@ -51,36 +57,40 @@ trait Channel[-In, +Out] { self =>
*/
final def contramap[In2](f: In2 => In): Channel[In2, Out] =
new Channel[In2, Out] {
def awaitShutdown: UIO[Unit] =
def awaitShutdown: UIO[Unit] =
self.awaitShutdown
def receive: Task[Out] =
def receive: Task[Out] =
self.receive
def send(in: In2): Task[Unit] =
def send(in: In2): Task[Unit] =
self.send(f(in))
def shutdown: UIO[Unit] =
def sendAll(in: Iterable[In2]): Task[Unit] =
self.sendAll(in.map(f))
def shutdown: UIO[Unit] =
self.shutdown
}

/**
* Constructs a new channel that automatically transforms messages received
* from this channel using the specified function.
*/
final def map[Out2](f: Out => Out2): Channel[In, Out2] =
final def map[Out2](f: Out => Out2)(implicit trace: Trace): Channel[In, Out2] =
new Channel[In, Out2] {
def awaitShutdown: UIO[Unit] =
def awaitShutdown: UIO[Unit] =
self.awaitShutdown
def receive: Task[Out2] =
def receive: Task[Out2] =
self.receive.map(f)
def send(in: In): Task[Unit] =
def send(in: In): Task[Unit] =
self.send(in)
def shutdown: UIO[Unit] =
def sendAll(in: Iterable[In]): Task[Unit] =
self.sendAll(in)
def shutdown: UIO[Unit] =
self.shutdown
}

/**
* Reads all messages from the channel, handling them with the specified
* function.
*/
final def receiveAll[Env](f: Out => ZIO[Env, Throwable, Any]): ZIO[Env, Throwable, Nothing] =
final def receiveAll[Env](f: Out => ZIO[Env, Throwable, Any])(implicit trace: Trace): ZIO[Env, Throwable, Nothing] =
receive.flatMap(f).forever
}
6 changes: 4 additions & 2 deletions zio-http/src/main/scala/zio/http/ClientDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package zio.http

import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{Promise, Scope, Trace, ZIO, ZLayer}

import zio.http.ClientDriver.ChannelInterface
import zio.http.netty.client.ChannelState

trait ClientDriver {
type Connection

Expand All @@ -46,11 +46,13 @@ object ClientDriver {
def interrupt: ZIO[Any, Throwable, Unit]
}

val shared: ZLayer[Driver, Throwable, ClientDriver] =
val shared: ZLayer[Driver, Throwable, ClientDriver] = {
implicit val trace: Trace = Trace.empty
ZLayer.scoped {
for {
driver <- ZIO.service[Driver]
clientDriver <- driver.createClientDriver()
} yield clientDriver
}
}
}
45 changes: 29 additions & 16 deletions zio-http/src/main/scala/zio/http/DnsResolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import java.net.{InetAddress, UnknownHostException}
import java.time.Instant

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

trait DnsResolver {
def resolve(host: String): ZIO[Any, UnknownHostException, Chunk[InetAddress]]
def resolve(host: String)(implicit trace: Trace): ZIO[Any, UnknownHostException, Chunk[InetAddress]]
}

object DnsResolver {
def resolve(host: String): ZIO[DnsResolver, UnknownHostException, Chunk[InetAddress]] =
def resolve(host: String)(implicit trace: Trace): ZIO[DnsResolver, UnknownHostException, Chunk[InetAddress]] =
ZIO.serviceWithZIO(_.resolve(host))

private final case class SystemResolver() extends DnsResolver {
override def resolve(host: String): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
override def resolve(host: String)(implicit trace: Trace): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
ZIO
.attemptBlocking(InetAddress.getAllByName(host))
.refineToOrDie[UnknownHostException]
Expand Down Expand Up @@ -71,7 +72,7 @@ object DnsResolver {
semaphore: Semaphore,
entries: Ref[Map[String, CacheEntry]],
) extends DnsResolver {
override def resolve(host: String): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
override def resolve(host: String)(implicit trace: Trace): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
for {
now <- Clock.instant
fiberId <- ZIO.fiberId
Expand Down Expand Up @@ -102,18 +103,18 @@ object DnsResolver {
} yield result

/** Gets a snapshot of the cache state, for testing only */
private[http] def snapshot(): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
private[http] def snapshot()(implicit trace: Trace): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
entries.get

private def startResolvingHost(
host: String,
targetPromise: Promise[UnknownHostException, Chunk[InetAddress]],
): ZIO[Any, Nothing, Unit] =
)(implicit trace: Trace): ZIO[Any, Nothing, Unit] =
semaphore.withPermit {
resolver.resolve(host).intoPromise(targetPromise)
}.fork.unit

private def refreshAndCleanup(): ZIO[Any, Nothing, Unit] =
private def refreshAndCleanup()(implicit trace: Trace): ZIO[Any, Nothing, Unit] =
// Resolve only adds new entries for unseen hosts, so it is safe to do this non-atomically
for {
fiberId <- ZIO.fiberId
Expand All @@ -126,7 +127,7 @@ object DnsResolver {
private def refreshOrDropEntries(
fiberId: FiberId,
entries: Map[String, CacheEntry],
): ZIO[Any, Nothing, Chunk[CachePatch]] =
)(implicit trace: Trace): ZIO[Any, Nothing, Chunk[CachePatch]] =
Clock.instant.flatMap { now =>
ZIO
.foreach(Chunk.fromIterable(entries)) { case (host, entry) =>
Expand Down Expand Up @@ -177,7 +178,9 @@ object DnsResolver {
.map(_.flatten)
}

private def ensureMaxSize(entries: Map[String, CacheEntry]): ZIO[Any, Nothing, Chunk[CachePatch]] =
private def ensureMaxSize(
entries: Map[String, CacheEntry],
)(implicit trace: Trace): ZIO[Any, Nothing, Chunk[CachePatch]] =
if (entries.size > maxCount) {
val toDrop = Chunk.fromIterable(entries).sortBy(_._2.lastUpdatedAt).take(entries.size - maxCount)
ZIO
Expand Down Expand Up @@ -208,7 +211,7 @@ object DnsResolver {
maxConcurrentResolutions: Int,
expireAction: ExpireAction,
refreshRate: Duration,
): ZIO[Scope, Nothing, DnsResolver] =
)(implicit trace: Trace): ZIO[Scope, Nothing, DnsResolver] =
for {
semaphore <- Semaphore.make(maxConcurrentResolutions)
entries <- Ref.make(Map.empty[String, CacheEntry])
Expand All @@ -217,7 +220,7 @@ object DnsResolver {
} yield cachingResolver
}

private[http] def snapshot(): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
private[http] def snapshot()(implicit trace: Trace): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
ZIO.service[DnsResolver].flatMap {
case cachingResolver: CachingResolver => cachingResolver.snapshot()
case _ => ZIO.dieMessage(s"Unexpected DnsResolver implementation: ${getClass.getName}")
Expand Down Expand Up @@ -256,11 +259,13 @@ object DnsResolver {

def configured(
path: NonEmptyChunk[String] = NonEmptyChunk("zio", "http", "dns"),
): ZLayer[Any, zio.Config.Error, DnsResolver] =
)(implicit trace: Trace): ZLayer[Any, zio.Config.Error, DnsResolver] =
ZLayer(ZIO.config(Config.config.nested(path.head, path.tail: _*))) >>> live

val default: ZLayer[Any, Nothing, DnsResolver] =
val default: ZLayer[Any, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty
ZLayer.succeed(Config.default) >>> live
}

private[http] def explicit(
ttl: Duration = 10.minutes,
Expand All @@ -270,7 +275,8 @@ object DnsResolver {
expireAction: ExpireAction = ExpireAction.Refresh,
refreshRate: Duration = 2.seconds,
implementation: DnsResolver = SystemResolver(),
): ZLayer[Any, Nothing, DnsResolver] =
): ZLayer[Any, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty
ZLayer.scoped {
CachingResolver
.make(
Expand All @@ -283,8 +289,11 @@ object DnsResolver {
refreshRate,
)
}
}

lazy val live: ZLayer[DnsResolver.Config, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty

lazy val live: ZLayer[DnsResolver.Config, Nothing, DnsResolver] =
ZLayer.scoped {
for {
config <- ZIO.service[Config]
Expand All @@ -299,6 +308,10 @@ object DnsResolver {
)
} yield resolver
}
}

val system: ZLayer[Any, Nothing, DnsResolver] = ZLayer.succeed(SystemResolver())
val system: ZLayer[Any, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty
ZLayer.succeed(SystemResolver())
}
}
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zio/http/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package zio.http
import java.util.concurrent.atomic.LongAdder

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.http.Driver.StartResult
import zio.http.netty.server.NettyDriver

trait Driver {
def start(implicit trace: Trace): RIO[Scope, StartResult]

Expand Down
9 changes: 5 additions & 4 deletions zio-http/src/main/scala/zio/http/Form.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.UnsupportedEncodingException
import java.nio.charset.Charset

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.stream._

Expand All @@ -46,7 +47,7 @@ final case class Form(formData: Chunk[FormField]) {
* Runs all streaming form data and stores them in memory, returning a Form
* that has no streaming parts.
*/
def collectAll: ZIO[Any, Throwable, Form] =
def collectAll(implicit trace: Trace): ZIO[Any, Throwable, Form] =
ZIO
.foreach(formData) {
case streamingBinary: StreamingBinary =>
Expand All @@ -71,7 +72,7 @@ final case class Form(formData: Chunk[FormField]) {
* Encodes the form using multipart encoding, choosing a random UUID as the
* boundary.
*/
def multipartBytesUUID: zio.UIO[(Boundary, ZStream[Any, Nothing, Byte])] =
def multipartBytesUUID(implicit trace: Trace): zio.UIO[(Boundary, ZStream[Any, Nothing, Byte])] =
Boundary.randomUUID.map { boundary =>
boundary -> multipartBytes(boundary)
}
Expand All @@ -81,7 +82,7 @@ final case class Form(formData: Chunk[FormField]) {
*/
def multipartBytes(
boundary: Boundary,
): ZStream[Any, Nothing, Byte] = {
)(implicit trace: Trace): ZStream[Any, Nothing, Byte] = {

val encapsulatingBoundary = FormAST.EncapsulatingBoundary(boundary)
val closingBoundary = FormAST.ClosingBoundary(boundary)
Expand Down Expand Up @@ -201,7 +202,7 @@ object Form {
def fromMultipartBytes(
bytes: Chunk[Byte],
charset: Charset = Charsets.Utf8,
): ZIO[Any, Throwable, Form] =
)(implicit trace: Trace): ZIO[Any, Throwable, Form] =
for {
boundary <- ZIO
.fromOption(Boundary.fromContent(bytes, charset))
Expand Down
Loading

0 comments on commit 0a0b6ab

Please sign in to comment.