Skip to content

Commit

Permalink
Add required traces to the http package (#2326)
Browse files Browse the repository at this point in the history
* Add required traces to the http package

* removed comments
  • Loading branch information
jaliss authored Jul 28, 2023
1 parent 2ef18c4 commit e8dd351
Show file tree
Hide file tree
Showing 20 changed files with 136 additions and 90 deletions.
9 changes: 5 additions & 4 deletions zio-http/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.charset._
import java.nio.file._

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.stream.ZStream

Expand Down Expand Up @@ -161,7 +162,7 @@ object Body {
def fromMultipartForm(
form: Form,
specificBoundary: Boundary,
): Body = {
)(implicit trace: Trace): Body = {
val bytes = form.multipartBytes(specificBoundary)

StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
Expand All @@ -174,7 +175,7 @@ object Body {
*/
def fromMultipartFormUUID(
form: Form,
): UIO[Body] =
)(implicit trace: Trace): UIO[Body] =
form.multipartBytesUUID.map { case (boundary, bytes) =>
StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
}
Expand Down Expand Up @@ -331,8 +332,8 @@ object Body {
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(newBoundary))
}

private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])
private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])(Trace.empty)

private val zioEmptyChunk = ZIO.succeed(Chunk.empty[Byte])
private val zioEmptyChunk = ZIO.succeed(Chunk.empty[Byte])(Trace.empty)

}
5 changes: 3 additions & 2 deletions zio-http/src/main/scala/zio/http/Boundary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package zio.http

import java.nio.charset.Charset

import zio.Chunk
import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{Chunk, Trace}

/**
* A multipart boundary, which consists of both the boundary and its charset.
Expand Down Expand Up @@ -72,7 +73,7 @@ object Boundary {
def fromString(content: String, charset: Charset): Option[Boundary] =
fromContent(Chunk.fromArray(content.getBytes(charset)), charset)

def randomUUID: zio.UIO[Boundary] =
def randomUUID(implicit trace: Trace): zio.UIO[Boundary] =
zio.Random.nextUUID.map { id =>
Boundary(s"(((${id.toString()})))")
}
Expand Down
5 changes: 3 additions & 2 deletions zio-http/src/main/scala/zio/http/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.http

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

/**
* A `Channel` is an asynchronous communication channel that supports receiving
Expand Down Expand Up @@ -72,7 +73,7 @@ trait Channel[-In, +Out] { self =>
* Constructs a new channel that automatically transforms messages received
* from this channel using the specified function.
*/
final def map[Out2](f: Out => Out2): Channel[In, Out2] =
final def map[Out2](f: Out => Out2)(implicit trace: Trace): Channel[In, Out2] =
new Channel[In, Out2] {
def awaitShutdown: UIO[Unit] =
self.awaitShutdown
Expand All @@ -90,6 +91,6 @@ trait Channel[-In, +Out] { self =>
* Reads all messages from the channel, handling them with the specified
* function.
*/
final def receiveAll[Env](f: Out => ZIO[Env, Throwable, Any]): ZIO[Env, Throwable, Nothing] =
final def receiveAll[Env](f: Out => ZIO[Env, Throwable, Any])(implicit trace: Trace): ZIO[Env, Throwable, Nothing] =
receive.flatMap(f).forever
}
6 changes: 4 additions & 2 deletions zio-http/src/main/scala/zio/http/ClientDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package zio.http

import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{Promise, Scope, Trace, ZIO, ZLayer}

import zio.http.ClientDriver.ChannelInterface
import zio.http.netty.client.ChannelState

trait ClientDriver {
type Connection

Expand All @@ -46,11 +46,13 @@ object ClientDriver {
def interrupt: ZIO[Any, Throwable, Unit]
}

val shared: ZLayer[Driver, Throwable, ClientDriver] =
val shared: ZLayer[Driver, Throwable, ClientDriver] = {
implicit val trace: Trace = Trace.empty
ZLayer.scoped {
for {
driver <- ZIO.service[Driver]
clientDriver <- driver.createClientDriver()
} yield clientDriver
}
}
}
45 changes: 29 additions & 16 deletions zio-http/src/main/scala/zio/http/DnsResolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import java.net.{InetAddress, UnknownHostException}
import java.time.Instant

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

trait DnsResolver {
def resolve(host: String): ZIO[Any, UnknownHostException, Chunk[InetAddress]]
def resolve(host: String)(implicit trace: Trace): ZIO[Any, UnknownHostException, Chunk[InetAddress]]
}

object DnsResolver {
def resolve(host: String): ZIO[DnsResolver, UnknownHostException, Chunk[InetAddress]] =
def resolve(host: String)(implicit trace: Trace): ZIO[DnsResolver, UnknownHostException, Chunk[InetAddress]] =
ZIO.serviceWithZIO(_.resolve(host))

private final case class SystemResolver() extends DnsResolver {
override def resolve(host: String): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
override def resolve(host: String)(implicit trace: Trace): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
ZIO
.attemptBlocking(InetAddress.getAllByName(host))
.refineToOrDie[UnknownHostException]
Expand Down Expand Up @@ -71,7 +72,7 @@ object DnsResolver {
semaphore: Semaphore,
entries: Ref[Map[String, CacheEntry]],
) extends DnsResolver {
override def resolve(host: String): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
override def resolve(host: String)(implicit trace: Trace): ZIO[Any, UnknownHostException, Chunk[InetAddress]] =
for {
now <- Clock.instant
fiberId <- ZIO.fiberId
Expand Down Expand Up @@ -102,18 +103,18 @@ object DnsResolver {
} yield result

/** Gets a snapshot of the cache state, for testing only */
private[http] def snapshot(): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
private[http] def snapshot()(implicit trace: Trace): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
entries.get

private def startResolvingHost(
host: String,
targetPromise: Promise[UnknownHostException, Chunk[InetAddress]],
): ZIO[Any, Nothing, Unit] =
)(implicit trace: Trace): ZIO[Any, Nothing, Unit] =
semaphore.withPermit {
resolver.resolve(host).intoPromise(targetPromise)
}.fork.unit

private def refreshAndCleanup(): ZIO[Any, Nothing, Unit] =
private def refreshAndCleanup()(implicit trace: Trace): ZIO[Any, Nothing, Unit] =
// Resolve only adds new entries for unseen hosts, so it is safe to do this non-atomically
for {
fiberId <- ZIO.fiberId
Expand All @@ -126,7 +127,7 @@ object DnsResolver {
private def refreshOrDropEntries(
fiberId: FiberId,
entries: Map[String, CacheEntry],
): ZIO[Any, Nothing, Chunk[CachePatch]] =
)(implicit trace: Trace): ZIO[Any, Nothing, Chunk[CachePatch]] =
Clock.instant.flatMap { now =>
ZIO
.foreach(Chunk.fromIterable(entries)) { case (host, entry) =>
Expand Down Expand Up @@ -177,7 +178,9 @@ object DnsResolver {
.map(_.flatten)
}

private def ensureMaxSize(entries: Map[String, CacheEntry]): ZIO[Any, Nothing, Chunk[CachePatch]] =
private def ensureMaxSize(
entries: Map[String, CacheEntry],
)(implicit trace: Trace): ZIO[Any, Nothing, Chunk[CachePatch]] =
if (entries.size > maxCount) {
val toDrop = Chunk.fromIterable(entries).sortBy(_._2.lastUpdatedAt).take(entries.size - maxCount)
ZIO
Expand Down Expand Up @@ -208,7 +211,7 @@ object DnsResolver {
maxConcurrentResolutions: Int,
expireAction: ExpireAction,
refreshRate: Duration,
): ZIO[Scope, Nothing, DnsResolver] =
)(implicit trace: Trace): ZIO[Scope, Nothing, DnsResolver] =
for {
semaphore <- Semaphore.make(maxConcurrentResolutions)
entries <- Ref.make(Map.empty[String, CacheEntry])
Expand All @@ -217,7 +220,7 @@ object DnsResolver {
} yield cachingResolver
}

private[http] def snapshot(): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
private[http] def snapshot()(implicit trace: Trace): ZIO[DnsResolver, Nothing, Map[String, CacheEntry]] =
ZIO.service[DnsResolver].flatMap {
case cachingResolver: CachingResolver => cachingResolver.snapshot()
case _ => ZIO.dieMessage(s"Unexpected DnsResolver implementation: ${getClass.getName}")
Expand Down Expand Up @@ -256,11 +259,13 @@ object DnsResolver {

def configured(
path: NonEmptyChunk[String] = NonEmptyChunk("zio", "http", "dns"),
): ZLayer[Any, zio.Config.Error, DnsResolver] =
)(implicit trace: Trace): ZLayer[Any, zio.Config.Error, DnsResolver] =
ZLayer(ZIO.config(Config.config.nested(path.head, path.tail: _*))) >>> live

val default: ZLayer[Any, Nothing, DnsResolver] =
val default: ZLayer[Any, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty
ZLayer.succeed(Config.default) >>> live
}

private[http] def explicit(
ttl: Duration = 10.minutes,
Expand All @@ -270,7 +275,8 @@ object DnsResolver {
expireAction: ExpireAction = ExpireAction.Refresh,
refreshRate: Duration = 2.seconds,
implementation: DnsResolver = SystemResolver(),
): ZLayer[Any, Nothing, DnsResolver] =
): ZLayer[Any, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty
ZLayer.scoped {
CachingResolver
.make(
Expand All @@ -283,8 +289,11 @@ object DnsResolver {
refreshRate,
)
}
}

lazy val live: ZLayer[DnsResolver.Config, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty

lazy val live: ZLayer[DnsResolver.Config, Nothing, DnsResolver] =
ZLayer.scoped {
for {
config <- ZIO.service[Config]
Expand All @@ -299,6 +308,10 @@ object DnsResolver {
)
} yield resolver
}
}

val system: ZLayer[Any, Nothing, DnsResolver] = ZLayer.succeed(SystemResolver())
val system: ZLayer[Any, Nothing, DnsResolver] = {
implicit val trace: Trace = Trace.empty
ZLayer.succeed(SystemResolver())
}
}
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zio/http/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package zio.http
import java.util.concurrent.atomic.LongAdder

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.http.Driver.StartResult
import zio.http.netty.server.NettyDriver

trait Driver {
def start(implicit trace: Trace): RIO[Scope, StartResult]

Expand Down
9 changes: 5 additions & 4 deletions zio-http/src/main/scala/zio/http/Form.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.UnsupportedEncodingException
import java.nio.charset.Charset

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.stream._

Expand All @@ -46,7 +47,7 @@ final case class Form(formData: Chunk[FormField]) {
* Runs all streaming form data and stores them in memory, returning a Form
* that has no streaming parts.
*/
def collectAll: ZIO[Any, Throwable, Form] =
def collectAll(implicit trace: Trace): ZIO[Any, Throwable, Form] =
ZIO
.foreach(formData) {
case streamingBinary: StreamingBinary =>
Expand All @@ -71,7 +72,7 @@ final case class Form(formData: Chunk[FormField]) {
* Encodes the form using multipart encoding, choosing a random UUID as the
* boundary.
*/
def multipartBytesUUID: zio.UIO[(Boundary, ZStream[Any, Nothing, Byte])] =
def multipartBytesUUID(implicit trace: Trace): zio.UIO[(Boundary, ZStream[Any, Nothing, Byte])] =
Boundary.randomUUID.map { boundary =>
boundary -> multipartBytes(boundary)
}
Expand All @@ -81,7 +82,7 @@ final case class Form(formData: Chunk[FormField]) {
*/
def multipartBytes(
boundary: Boundary,
): ZStream[Any, Nothing, Byte] = {
)(implicit trace: Trace): ZStream[Any, Nothing, Byte] = {

val encapsulatingBoundary = FormAST.EncapsulatingBoundary(boundary)
val closingBoundary = FormAST.ClosingBoundary(boundary)
Expand Down Expand Up @@ -201,7 +202,7 @@ object Form {
def fromMultipartBytes(
bytes: Chunk[Byte],
charset: Charset = Charsets.Utf8,
): ZIO[Any, Throwable, Form] =
)(implicit trace: Trace): ZIO[Any, Throwable, Form] =
for {
boundary <- ZIO
.fromOption(Boundary.fromContent(bytes, charset))
Expand Down
12 changes: 6 additions & 6 deletions zio-http/src/main/scala/zio/http/FormField.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/

package zio.http

import java.nio.charset._

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.stream.{Take, ZPipeline, ZStream}

Expand Down Expand Up @@ -49,7 +49,7 @@ sealed trait FormField {
* Gets the value of this form field as a String. If it is a binary field, the
* value is interpreted as an UTF-8 byte stream.
*/
final def asText: ZIO[Any, CharacterCodingException, String] = this match {
final def asText(implicit trace: Trace): ZIO[Any, CharacterCodingException, String] = this match {
case FormField.Text(_, value, _, _) =>
ZIO.succeed(value)
case FormField.Binary(_, value, _, _, _) =>
Expand All @@ -64,7 +64,7 @@ sealed trait FormField {
* Gets the value of this form field as a chunk of bytes. If it is a text
* field, the value gets encoded as an UTF-8 byte stream.
*/
final def asChunk: ZIO[Any, Nothing, Chunk[Byte]] = this match {
final def asChunk(implicit trace: Trace): ZIO[Any, Nothing, Chunk[Byte]] = this match {
case FormField.Text(_, value, _, _) =>
ZIO.succeed(Chunk.fromArray(value.getBytes(Charsets.Utf8)))
case FormField.Binary(_, value, _, _, _) =>
Expand Down Expand Up @@ -123,7 +123,7 @@ object FormField {
filename: Option[String] = None,
data: ZStream[Any, Nothing, Byte],
) extends FormField {
def collect: ZIO[Any, Nothing, Binary] = {
def collect(implicit trace: Trace): ZIO[Any, Nothing, Binary] = {
data.runCollect.map { bytes =>
Binary(name, bytes, contentType, transferEncoding, filename)
}
Expand All @@ -145,7 +145,7 @@ object FormField {
private[http] def fromFormAST(
ast: Chunk[FormAST],
defaultCharset: Charset = StandardCharsets.UTF_8,
): ZIO[Any, FormDecodingError, FormField] = {
)(implicit trace: Trace): ZIO[Any, FormDecodingError, FormField] = {
val extract =
ast.foldLeft(
(
Expand Down Expand Up @@ -197,7 +197,7 @@ object FormField {
private[http] def incomingStreamingBinary(
ast: Chunk[FormAST],
queue: Queue[Take[Nothing, Byte]],
): ZIO[Any, FormDecodingError, FormField] = {
)(implicit trace: Trace): ZIO[Any, FormDecodingError, FormField] = {
val extract =
ast.foldLeft((Option.empty[FormAST.Header], Option.empty[FormAST.Header], Option.empty[FormAST.Header])) {
case (accum, header: FormAST.Header) if header.name == "Content-Disposition" =>
Expand Down
Loading

0 comments on commit e8dd351

Please sign in to comment.