diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala index 1676d895eba..0f91e3c10e1 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala @@ -9,13 +9,15 @@ import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSu import akka.http.scaladsl.marshalling.{ Marshaller, Marshalling, ToResponseMarshallable } import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.scaladsl._ import akka.testkit.EventFilter import akka.util.ByteString +import org.scalatest.concurrent.ScalaFutures import scala.concurrent.Future -class EntityStreamingSpec extends RoutingSpec { +class EntityStreamingSpec extends RoutingSpec with ScalaFutures { //#models case class Tweet(uid: Int, txt: String) @@ -95,6 +97,75 @@ class EntityStreamingSpec extends RoutingSpec { } } + "client-consume-streaming-json" in { + //#json-streaming-client-example + import MyJsonProtocol._ + import akka.http.scaladsl.unmarshalling._ + import akka.http.scaladsl.common.EntityStreamingSupport + import akka.http.scaladsl.common.JsonEntityStreamingSupport + + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = + EntityStreamingSupport.json() + + val input = """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + + """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + + """{"uid":3,"txt":"You cannot enter the same river twice."}""" + + val response = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, input)) + + // unmarshal: + val unmarshalled: Future[Source[Tweet, NotUsed]] = + Unmarshal(response).to[Source[Tweet, NotUsed]] + + // flatten the Future[Source[]] into a Source[]: + val source: Source[Tweet, Future[NotUsed]] = + Source.fromFutureSource(unmarshalled) + + //#json-streaming-client-example + // tests ------------------------------------------------------------ + val all = source.runWith(Sink.seq).futureValue + all.head.uid should ===(1) + all.head.txt should ===("#Akka rocks!") + all.drop(1).head.uid should ===(2) + all.drop(1).head.txt should ===("Streaming is so hot right now!") + all.drop(2).head.uid should ===(3) + all.drop(2).head.txt should ===("You cannot enter the same river twice.") + } + + "client-consume-streaming-json-raw" in { + //#json-streaming-client-example-raw + import MyJsonProtocol._ + import akka.http.scaladsl.unmarshalling._ + import akka.http.scaladsl.common.EntityStreamingSupport + import akka.http.scaladsl.common.JsonEntityStreamingSupport + + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = + EntityStreamingSupport.json() + + val input = """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + + """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + + """{"uid":3,"txt":"You cannot enter the same river twice."}""" + + val response = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, input)) + + val value: Source[Tweet, Any] = + response.entity.dataBytes + .via(jsonStreamingSupport.framingDecoder) // pick your Framing (could be "\n" etc) + .mapAsync(1)(bytes ⇒ Unmarshal(bytes).to[Tweet]) // unmarshal one by one + + //#json-streaming-client-example-raw + + // tests ------------------------------------------------------------ + val all = value.runWith(Sink.seq).futureValue + all.head.uid should ===(1) + all.head.txt should ===("#Akka rocks!") + all.drop(1).head.uid should ===(2) + all.drop(1).head.txt should ===("Streaming is so hot right now!") + all.drop(2).head.uid should ===(3) + all.drop(2).head.txt should ===("You cannot enter the same river twice.") + + } + "csv-example" in { implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t ⇒ Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () ⇒ { diff --git a/docs/src/main/paradox/common/json-support.md b/docs/src/main/paradox/common/json-support.md index bfa2c5401f4..447af0e23e8 100644 --- a/docs/src/main/paradox/common/json-support.md +++ b/docs/src/main/paradox/common/json-support.md @@ -5,6 +5,34 @@ Integration with @scala[[spray-json]]@java[[Jackson]] is provided out of the box Integration with other JSON libraries are supported by the community. See [the list of current community extensions for Akka HTTP](https://akka.io/community/#extensions-to-akka-http). +@@@ div { .group-java } + + +## Jackson Support + +To make use of the support module for (un)marshalling from and to JSON with [Jackson], add a library dependency onto: + +@@dependency [sbt,Gradle,Maven] { + group="com.typesafe.akka" + artifact="akka-http-jackson_$scala.binary.version$" + version="$project.version$" +} + +Use `akka.http.javadsl.marshallers.jackson.Jackson.unmarshaller(T.class)` to create an @unidoc[Unmarshaller[HttpEntity,T]] which expects the request +body (HttpEntity) to be of type `application/json` and converts it to `T` using Jackson. + +@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #unmarshall } + +Use `akka.http.javadsl.marshallers.jackson.Jackson.marshaller(T.class)` to create a @unidoc[Marshaller[T,RequestEntity]] which can be used with +`RequestContext.complete` or `RouteDirectives.complete` to convert a POJO to an HttpResponse. + +@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #marshall } + +Refer to @github[this file](/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) in the sources for the complete example. + +@@@ + + @@@ div { .group-scala } ## spray-json Support @@ -28,44 +56,58 @@ Once you have done this (un)marshalling between JSON and your type `T` should wo @@snip [SprayJsonExampleSpec.scala]($test$/scala/docs/http/scaladsl/SprayJsonExampleSpec.scala) { #minimal-spray-json-example } -### Pretty printing +@@@ -By default, spray-json marshals your types to compact printed JSON by implicit conversion using `CompactPrinter`, as defined in: + +## Consuming JSON Streaming style APIs -@@snip [SprayJsonSupport.scala]($akka-http$/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala) { #sprayJsonMarshallerConverter } +A popular way of implementing streaming APIs is [JSON Streaming](https://en.wikipedia.org/wiki/JSON_Streaming) (see [Source Streaming](./routing-dsl/source-streaming-support.md) +for documentation on building server-side of such API). -Alternatively to marshal your types to pretty printed JSON, bring a `PrettyPrinter` in scope to perform implicit conversion. +Depending on the way the API returns the streamed JSON (newline delimited, raw sequence of objects, or "infinite array") +you may have to apply a different framing mechanism, but the general idea remains the same: consuming the infinite entity stream +and applying a framing to it, such that the single objects can be easily deserialized using the usual marshalling infrastructure: -@@snip [SprayJsonPrettyMarshalSpec.scala]($test$/scala/docs/http/scaladsl/SprayJsonPrettyMarshalSpec.scala) { #example } +Scala +: @@snip [EntityStreamingSpec.scala]($akka-http$/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala) { #json-streaming-client-example } + +Java +: @@snip [HttpClientExampleDocTest.java]($test$/java/docs/http/javadsl/server/JsonStreamingExamplesTest.java) { #json-streaming-client-example-raw } -To learn more about how spray-json works please refer to its [documentation][spray-json]. +@@@ div { .group-scala } + +In the above example the marshalling is handled by the implicitly provided `JsonEntityStreamingSupport`, which is also used when building server-side streaming APIs. +You can also achieve the same more explicitly, by manually connecting the entity byte stream through a framing and then deserialization stage: +Scala +: @@snip [EntityStreamingSpec.scala]($akka-http$/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala) { #json-streaming-client-example-raw } + @@@ @@@ div { .group-java } - -## Jackson Support +In the above example the `JsonEntityStreamingSupport` class is used to obtain the proper framing, though you could also +pick the framing manually by using `akka.stream.javadsl.Framing` or `akka.stream.javadsl.JsonFraming`. +Framing stages are used to "chunk up" the pieces of incoming bytes into appropriately sized pieces of valid JSON, +which then can be handled easily by a not-streaming JSON serializer such as jackson in the example. This technique is simpler to use +and often good enough rather than writing a fully streaming JSON parser (which also is possible). -To make use of the support module for (un)marshalling from and to JSON with [Jackson], add a library dependency onto: +@@@ -@@dependency [sbt,Gradle,Maven] { - group="com.typesafe.akka" - artifact="akka-http-jackson_$scala.binary.version$" - version="$project.version$" -} -Use `akka.http.javadsl.marshallers.jackson.Jackson.unmarshaller(T.class)` to create an @unidoc[Unmarshaller[HttpEntity,T]] which expects the request -body (HttpEntity) to be of type `application/json` and converts it to `T` using Jackson. +@@@ div { .group-scala } -@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #unmarshall } +## Pretty printing -Use `akka.http.javadsl.marshallers.jackson.Jackson.marshaller(T.class)` to create a @unidoc[Marshaller[T,RequestEntity]] which can be used with -`RequestContext.complete` or `RouteDirectives.complete` to convert a POJO to an HttpResponse. +By default, spray-json marshals your types to compact printed JSON by implicit conversion using `CompactPrinter`, as defined in: -@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #marshall } +@@snip [SprayJsonSupport.scala]($akka-http$/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala) { #sprayJsonMarshallerConverter } -Refer to @github[this file](/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) in the sources for the complete example. +Alternatively to marshal your types to pretty printed JSON, bring a `PrettyPrinter` in scope to perform implicit conversion. + +@@snip [SprayJsonPrettyMarshalSpec.scala]($test$/scala/docs/http/scaladsl/SprayJsonPrettyMarshalSpec.scala) { #example } + +To learn more about how spray-json works please refer to its [documentation][spray-json]. @@@ diff --git a/docs/src/main/paradox/routing-dsl/source-streaming-support.md b/docs/src/main/paradox/routing-dsl/source-streaming-support.md index 0636bb3a42f..2b0e067083c 100644 --- a/docs/src/main/paradox/routing-dsl/source-streaming-support.md +++ b/docs/src/main/paradox/routing-dsl/source-streaming-support.md @@ -183,3 +183,8 @@ Scala : @@snip [JsonStreamingFullExamples.scala]($test$/scala/docs/http/scaladsl/server/directives/JsonStreamingFullExamples.scala) { #custom-content-type } @@@ + +## Consuming streaming JSON on client-side + +For consuming such streaming APIs with, for example, JSON responses refer to @ref[Consuming JSON Streaming style APIs](../common/json-support.md#consuming-json-streaming-style-apis) +documentation in the JSON support section. diff --git a/docs/src/test/java/docs/http/javadsl/server/JsonStreamingExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/JsonStreamingExamplesTest.java index 6a642ca9ce7..f21972d7a78 100644 --- a/docs/src/test/java/docs/http/javadsl/server/JsonStreamingExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/JsonStreamingExamplesTest.java @@ -33,7 +33,7 @@ final Route tweets() { //#formats //#response-streaming - + // Step 1: Enable JSON streaming // we're not using this in the example, but it's the simplest way to start: // The default rendering is a JSON array: `[el, el, el , ...]` @@ -43,9 +43,9 @@ final Route tweets() { final ByteString start = ByteString.fromString("["); final ByteString between = ByteString.fromString(","); final ByteString end = ByteString.fromString("]"); - final Flow compactArrayRendering = + final Flow compactArrayRendering = Flow.of(ByteString.class).intersperse(start, between, end); - + final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json() .withFramingRendererFlow(compactArrayRendering); @@ -56,7 +56,7 @@ final Route tweets() { parameter(StringUnmarshallers.INTEGER, "n", n -> { final Source tws = Source.repeat(new JavaTweet(12, "Hello World!")).take(n); - + // Step 3: call complete* with your source, marshaller, and stream rendering mode return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport); }) @@ -76,17 +76,17 @@ final Route tweets() { }); } ) - ) + ) ); //#incoming-request-streaming - + return responseStreaming.orElse(incomingStreaming); } - + final Route csvTweets() { //#csv-example - final Marshaller renderAsCsv = - Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t -> + final Marshaller renderAsCsv = + Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t -> ByteString.fromString(t.getId() + "," + t.getMessage()) ); @@ -102,17 +102,39 @@ final Route csvTweets() { ) ); //#csv-example - + return responseStreaming; } //#routes - + + final void clientStreamingJsonExample() { + //#json-streaming-client-example-raw + Unmarshaller unmarshal = Jackson.byteStringUnmarshaller(JavaTweet.class); + JsonEntityStreamingSupport support = EntityStreamingSupport.json(); + + // imagine receiving such response from a service: + String payload = "{\"uid\":1,\"txt\":\"#Akka rocks!\"}\n" + + "{\"uid\":2,\"txt\":\"Streaming is so hot right now!\"}\n" + + "{\"uid\":3,\"txt\":\"You cannot enter the same river twice.\"}"; + HttpEntity.Strict entity = HttpEntities.create(ContentTypes.APPLICATION_JSON, payload); + HttpResponse response = HttpResponse.create().withEntity(entity); + + Source tweets = + response.entity().getDataBytes() + .via(support.framingDecoder()) // apply JSON framing + .mapAsync(1, // unmarshal each element + bs -> unmarshal.unmarshal(bs, materializer()) + ); + + //#json-streaming-client-example-raw + } + @Test public void getTweetsTest() { //#response-streaming // tests: final TestRoute routes = testRoute(tweets()); - + // test happy path final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON)); routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication)) @@ -126,13 +148,13 @@ public void getTweetsTest() { .assertEntity("Resource representation is only available with these types:\napplication/json"); //#response-streaming } - + @Test public void csvExampleTweetsTest() { //#response-streaming // tests -------------------------------------------- final TestRoute routes = testRoute(csvTweets()); - + // test happy path final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV)); routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv)) @@ -150,7 +172,7 @@ public void csvExampleTweetsTest() { //#models private static final class JavaTweet { - private int id; + private int id; private String message; public JavaTweet(int id, String message) {