Skip to content

Commit

Permalink
=doc document client-side for streaming JSON (#1964)
Browse files Browse the repository at this point in the history
* =doc document client-side for streaming JSON

* add ref link
  • Loading branch information
ktoso authored Mar 27, 2018
1 parent 3f6f141 commit 16c9add
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSu
import akka.http.scaladsl.marshalling.{ Marshaller, Marshalling, ToResponseMarshallable }
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl._
import akka.testkit.EventFilter
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.Future

class EntityStreamingSpec extends RoutingSpec {
class EntityStreamingSpec extends RoutingSpec with ScalaFutures {

//#models
case class Tweet(uid: Int, txt: String)
Expand Down Expand Up @@ -95,6 +97,75 @@ class EntityStreamingSpec extends RoutingSpec {
}
}

"client-consume-streaming-json" in {
//#json-streaming-client-example
import MyJsonProtocol._
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.common.JsonEntityStreamingSupport

implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
EntityStreamingSupport.json()

val input = """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
"""{"uid":3,"txt":"You cannot enter the same river twice."}"""

val response = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, input))

// unmarshal:
val unmarshalled: Future[Source[Tweet, NotUsed]] =
Unmarshal(response).to[Source[Tweet, NotUsed]]

// flatten the Future[Source[]] into a Source[]:
val source: Source[Tweet, Future[NotUsed]] =
Source.fromFutureSource(unmarshalled)

//#json-streaming-client-example
// tests ------------------------------------------------------------
val all = source.runWith(Sink.seq).futureValue
all.head.uid should ===(1)
all.head.txt should ===("#Akka rocks!")
all.drop(1).head.uid should ===(2)
all.drop(1).head.txt should ===("Streaming is so hot right now!")
all.drop(2).head.uid should ===(3)
all.drop(2).head.txt should ===("You cannot enter the same river twice.")
}

"client-consume-streaming-json-raw" in {
//#json-streaming-client-example-raw
import MyJsonProtocol._
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.common.JsonEntityStreamingSupport

implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
EntityStreamingSupport.json()

val input = """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
"""{"uid":3,"txt":"You cannot enter the same river twice."}"""

val response = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, input))

val value: Source[Tweet, Any] =
response.entity.dataBytes
.via(jsonStreamingSupport.framingDecoder) // pick your Framing (could be "\n" etc)
.mapAsync(1)(bytes Unmarshal(bytes).to[Tweet]) // unmarshal one by one

//#json-streaming-client-example-raw

// tests ------------------------------------------------------------
val all = value.runWith(Sink.seq).futureValue
all.head.uid should ===(1)
all.head.txt should ===("#Akka rocks!")
all.drop(1).head.uid should ===(2)
all.drop(1).head.txt should ===("Streaming is so hot right now!")
all.drop(2).head.uid should ===(3)
all.drop(2).head.txt should ===("You cannot enter the same river twice.")

}

"csv-example" in {
implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () {
Expand Down
84 changes: 63 additions & 21 deletions docs/src/main/paradox/common/json-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,34 @@ Integration with @scala[[spray-json]]@java[[Jackson]] is provided out of the box
Integration with other JSON libraries are supported by the community.
See [the list of current community extensions for Akka HTTP](https://akka.io/community/#extensions-to-akka-http).

@@@ div { .group-java }

<a id="json-jackson-support-java"></a>
## Jackson Support

To make use of the support module for (un)marshalling from and to JSON with [Jackson], add a library dependency onto:

@@dependency [sbt,Gradle,Maven] {
group="com.typesafe.akka"
artifact="akka-http-jackson_$scala.binary.version$"
version="$project.version$"
}

Use `akka.http.javadsl.marshallers.jackson.Jackson.unmarshaller(T.class)` to create an @unidoc[Unmarshaller[HttpEntity,T]] which expects the request
body (HttpEntity) to be of type `application/json` and converts it to `T` using Jackson.

@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #unmarshall }

Use `akka.http.javadsl.marshallers.jackson.Jackson.marshaller(T.class)` to create a @unidoc[Marshaller[T,RequestEntity]] which can be used with
`RequestContext.complete` or `RouteDirectives.complete` to convert a POJO to an HttpResponse.

@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #marshall }

Refer to @github[this file](/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) in the sources for the complete example.

@@@


@@@ div { .group-scala }

## spray-json Support
Expand All @@ -28,44 +56,58 @@ Once you have done this (un)marshalling between JSON and your type `T` should wo

@@snip [SprayJsonExampleSpec.scala]($test$/scala/docs/http/scaladsl/SprayJsonExampleSpec.scala) { #minimal-spray-json-example }

### Pretty printing
@@@

By default, spray-json marshals your types to compact printed JSON by implicit conversion using `CompactPrinter`, as defined in:
<a id="json-streaming-client-side"></a>
## Consuming JSON Streaming style APIs

@@snip [SprayJsonSupport.scala]($akka-http$/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala) { #sprayJsonMarshallerConverter }
A popular way of implementing streaming APIs is [JSON Streaming](https://en.wikipedia.org/wiki/JSON_Streaming) (see [Source Streaming](./routing-dsl/source-streaming-support.md)
for documentation on building server-side of such API).

Alternatively to marshal your types to pretty printed JSON, bring a `PrettyPrinter` in scope to perform implicit conversion.
Depending on the way the API returns the streamed JSON (newline delimited, raw sequence of objects, or "infinite array")
you may have to apply a different framing mechanism, but the general idea remains the same: consuming the infinite entity stream
and applying a framing to it, such that the single objects can be easily deserialized using the usual marshalling infrastructure:

@@snip [SprayJsonPrettyMarshalSpec.scala]($test$/scala/docs/http/scaladsl/SprayJsonPrettyMarshalSpec.scala) { #example }
Scala
: @@snip [EntityStreamingSpec.scala]($akka-http$/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala) { #json-streaming-client-example }

Java
: @@snip [HttpClientExampleDocTest.java]($test$/java/docs/http/javadsl/server/JsonStreamingExamplesTest.java) { #json-streaming-client-example-raw }

To learn more about how spray-json works please refer to its [documentation][spray-json].
@@@ div { .group-scala }

In the above example the marshalling is handled by the implicitly provided `JsonEntityStreamingSupport`, which is also used when building server-side streaming APIs.
You can also achieve the same more explicitly, by manually connecting the entity byte stream through a framing and then deserialization stage:

Scala
: @@snip [EntityStreamingSpec.scala]($akka-http$/akka-http-tests/src/test/scala/akka/http/scaladsl/server/EntityStreamingSpec.scala) { #json-streaming-client-example-raw }

@@@

@@@ div { .group-java }

<a id="json-jackson-support-java"></a>
## Jackson Support
In the above example the `JsonEntityStreamingSupport` class is used to obtain the proper framing, though you could also
pick the framing manually by using `akka.stream.javadsl.Framing` or `akka.stream.javadsl.JsonFraming`.
Framing stages are used to "chunk up" the pieces of incoming bytes into appropriately sized pieces of valid JSON,
which then can be handled easily by a not-streaming JSON serializer such as jackson in the example. This technique is simpler to use
and often good enough rather than writing a fully streaming JSON parser (which also is possible).

To make use of the support module for (un)marshalling from and to JSON with [Jackson], add a library dependency onto:
@@@

@@dependency [sbt,Gradle,Maven] {
group="com.typesafe.akka"
artifact="akka-http-jackson_$scala.binary.version$"
version="$project.version$"
}

Use `akka.http.javadsl.marshallers.jackson.Jackson.unmarshaller(T.class)` to create an @unidoc[Unmarshaller[HttpEntity,T]] which expects the request
body (HttpEntity) to be of type `application/json` and converts it to `T` using Jackson.
@@@ div { .group-scala }

@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #unmarshall }
## Pretty printing

Use `akka.http.javadsl.marshallers.jackson.Jackson.marshaller(T.class)` to create a @unidoc[Marshaller[T,RequestEntity]] which can be used with
`RequestContext.complete` or `RouteDirectives.complete` to convert a POJO to an HttpResponse.
By default, spray-json marshals your types to compact printed JSON by implicit conversion using `CompactPrinter`, as defined in:

@@snip [PetStoreExample.java]($akka-http$/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) { #imports #marshall }
@@snip [SprayJsonSupport.scala]($akka-http$/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala) { #sprayJsonMarshallerConverter }

Refer to @github[this file](/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java) in the sources for the complete example.
Alternatively to marshal your types to pretty printed JSON, bring a `PrettyPrinter` in scope to perform implicit conversion.

@@snip [SprayJsonPrettyMarshalSpec.scala]($test$/scala/docs/http/scaladsl/SprayJsonPrettyMarshalSpec.scala) { #example }

To learn more about how spray-json works please refer to its [documentation][spray-json].

@@@

Expand Down
5 changes: 5 additions & 0 deletions docs/src/main/paradox/routing-dsl/source-streaming-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,8 @@ Scala
: @@snip [JsonStreamingFullExamples.scala]($test$/scala/docs/http/scaladsl/server/directives/JsonStreamingFullExamples.scala) { #custom-content-type }

@@@

## Consuming streaming JSON on client-side

For consuming such streaming APIs with, for example, JSON responses refer to @ref[Consuming JSON Streaming style APIs](../common/json-support.md#consuming-json-streaming-style-apis)
documentation in the JSON support section.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ final Route tweets() {
//#formats

//#response-streaming

// Step 1: Enable JSON streaming
// we're not using this in the example, but it's the simplest way to start:
// The default rendering is a JSON array: `[el, el, el , ...]`
Expand All @@ -43,9 +43,9 @@ final Route tweets() {
final ByteString start = ByteString.fromString("[");
final ByteString between = ByteString.fromString(",");
final ByteString end = ByteString.fromString("]");
final Flow<ByteString, ByteString, NotUsed> compactArrayRendering =
final Flow<ByteString, ByteString, NotUsed> compactArrayRendering =
Flow.of(ByteString.class).intersperse(start, between, end);

final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json()
.withFramingRendererFlow(compactArrayRendering);

Expand All @@ -56,7 +56,7 @@ final Route tweets() {
parameter(StringUnmarshallers.INTEGER, "n", n -> {
final Source<JavaTweet, NotUsed> tws =
Source.repeat(new JavaTweet(12, "Hello World!")).take(n);

// Step 3: call complete* with your source, marshaller, and stream rendering mode
return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport);
})
Expand All @@ -76,17 +76,17 @@ final Route tweets() {
});
}
)
)
)
);
//#incoming-request-streaming

return responseStreaming.orElse(incomingStreaming);
}

final Route csvTweets() {
//#csv-example
final Marshaller<JavaTweet, ByteString> renderAsCsv =
Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t ->
final Marshaller<JavaTweet, ByteString> renderAsCsv =
Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t ->
ByteString.fromString(t.getId() + "," + t.getMessage())
);

Expand All @@ -102,17 +102,39 @@ final Route csvTweets() {
)
);
//#csv-example

return responseStreaming;
}
//#routes


final void clientStreamingJsonExample() {
//#json-streaming-client-example-raw
Unmarshaller<ByteString, JavaTweet> unmarshal = Jackson.byteStringUnmarshaller(JavaTweet.class);
JsonEntityStreamingSupport support = EntityStreamingSupport.json();

// imagine receiving such response from a service:
String payload = "{\"uid\":1,\"txt\":\"#Akka rocks!\"}\n" +
"{\"uid\":2,\"txt\":\"Streaming is so hot right now!\"}\n" +
"{\"uid\":3,\"txt\":\"You cannot enter the same river twice.\"}";
HttpEntity.Strict entity = HttpEntities.create(ContentTypes.APPLICATION_JSON, payload);
HttpResponse response = HttpResponse.create().withEntity(entity);

Source<JavaTweet, Object> tweets =
response.entity().getDataBytes()
.via(support.framingDecoder()) // apply JSON framing
.mapAsync(1, // unmarshal each element
bs -> unmarshal.unmarshal(bs, materializer())
);

//#json-streaming-client-example-raw
}

@Test
public void getTweetsTest() {
//#response-streaming
// tests:
final TestRoute routes = testRoute(tweets());

// test happy path
final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication))
Expand All @@ -126,13 +148,13 @@ public void getTweetsTest() {
.assertEntity("Resource representation is only available with these types:\napplication/json");
//#response-streaming
}

@Test
public void csvExampleTweetsTest() {
//#response-streaming
// tests --------------------------------------------
final TestRoute routes = testRoute(csvTweets());

// test happy path
final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv))
Expand All @@ -150,7 +172,7 @@ public void csvExampleTweetsTest() {

//#models
private static final class JavaTweet {
private int id;
private int id;
private String message;

public JavaTweet(int id, String message) {
Expand Down

0 comments on commit 16c9add

Please sign in to comment.