Skip to content

Commit

Permalink
Feature/stream docs (#742)
Browse files Browse the repository at this point in the history
* adding documentation for the new streaming methods.

* Adding tests for the misisng query methods

* Removing the implicit guava adapter in favour of promise interface inference.

* Removing guava adapter references.

* Fixing streams test.

* Adding more tests for the newly added fun bits.

* Renaming and repurposing test
  • Loading branch information
alexflav23 authored Aug 29, 2017
1 parent 8a4c288 commit eb3c75d
Show file tree
Hide file tree
Showing 16 changed files with 217 additions and 140 deletions.
Empty file removed 1
Empty file.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ lazy val phantomSbtPlugin = (project in file("phantom-sbt"))
sbtPlugin := true,
publishArtifact := !Publishing.publishingToMaven && { scalaVersion.value.startsWith("2.10") },
libraryDependencies ++= Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % Versions.datastax,
"org.cassandraunit" % "cassandra-unit" % Versions.cassandraUnit excludeAll (
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("org.slf4j", "slf4j-jdk14")
Expand Down
2 changes: 1 addition & 1 deletion build/run_tests.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
if [ "${TRAVIS_SCALA_VERSION}" == "2.12.2" ] && [ "${TRAVIS_JDK_VERSION}" == "oraclejdk8" ];
if [ "${TRAVIS_SCALA_VERSION}" == "2.12.3" ] && [ "${TRAVIS_JDK_VERSION}" == "oraclejdk8" ];
then
echo "Running tests with coverage and report submission"
sbt ++$TRAVIS_SCALA_VERSION coverage test coverageReport coverageAggregate coveralls
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ object ScalaPromiseInterface extends PromiseInterface[Promise, Future] {

class ScalaQueryContext extends QueryContext[Promise, Future, Duration](10.seconds)(
ScalaFutureImplicits.monadInstance,
ScalaPromiseInterface,
ScalaGuavaAdapter
ScalaPromiseInterface
) {

override def await[T](f: Future[T], timeout: Duration): T = Await.result(f, timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ case class DeleteQuery[
ev: PS =:!= HNil,
rev: Reverse.Aux[PS, Rev],
monad: FutureMonad[F],
adapter: GuavaAdapter[F],
interface: PromiseInterface[P, F]
): F[PreparedBlock[Rev]] = {
val flatten = new PreparedFlattener(qb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ case class InsertQuery[
ev: PS =:!= HNil,
rev: Reverse.Aux[PS, Rev],
fMonad: FutureMonad[F],
adapter: GuavaAdapter[F],
interface: PromiseInterface[P, F]
): F[PreparedBlock[Rev]] = {
val flatten = new PreparedFlattener(qb)
Expand Down Expand Up @@ -245,7 +244,6 @@ class InsertJsonQuery[
ev: PS =:!= HNil,
rev: Reverse.Aux[PS, Rev],
fMonad: FutureMonad[F],
adapter: GuavaAdapter[F],
interface: PromiseInterface[P, F]
): F[PreparedBlock[Rev]] = {
val flatten = new PreparedFlattener(qb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ case class UpdateQuery[
ev: PS =:!= HNil,
rev: Reverse.Aux[PS, Rev],
fMonad: FutureMonad[F],
adapter: GuavaAdapter[F],
interface: PromiseInterface[P, F]
): F[PreparedBlock[Rev]] = {
val flatten = new PreparedFlattener(qb)
Expand Down Expand Up @@ -391,7 +390,6 @@ sealed case class ConditionalQuery[
ev: PS =:!= HNil,
rev: Reverse.Aux[PS, Rev],
fMonad: FutureMonad[F],
adapter: GuavaAdapter[F],
interface: PromiseInterface[P, F]
): F[PreparedBlock[Rev]] = {
val flatten = new PreparedFlattener(qb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ class PreparedFlattener(qb: CQLQuery)(
def async[P[_], F[_]]()(
implicit executor: ExecutionContextExecutor,
monad: FutureMonad[F],
adapter: GuavaAdapter[F],
interface: PromiseInterface[P, F]
): F[PreparedStatement] = {
new ExactlyOncePromise[P, F, PreparedStatement](
adapter.fromGuava(session.prepareAsync(qb.queryString))
interface.adapter.fromGuava(session.prepareAsync(qb.queryString))
).future
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ package object dsl extends ScalaQueryContext with DefaultImports {

implicit class ExecuteQueries[M[X] <: TraversableOnce[X]](val qc: QueryCollection[M]) extends AnyVal {
def executable(): ExecutableStatements[Future, M] = {
new ExecutableStatements[Future, M](qc)
new ExecutableStatements[Future, M](qc)(futureMonad, promiseInterface.adapter)
}

def future()(implicit session: Session,
Expand All @@ -47,7 +47,7 @@ package object dsl extends ScalaQueryContext with DefaultImports {
def cql(
str: CQLQuery,
options: QueryOptions
): QueryInterface[Future] = new QueryInterface[Future]() {
): QueryInterface[Future] = new QueryInterface[Future]()(promiseInterface.adapter) {
override def executableQuery: ExecutableCqlQuery = ExecutableCqlQuery(str, options)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ abstract class QueryContext[P[_], F[_], Timeout](
defaultTimeout: Timeout
)(
implicit fMonad: FutureMonad[F],
val promiseInterface: PromiseInterface[P, F],
val adapter: GuavaAdapter[F]
val promiseInterface: PromiseInterface[P, F]
) { outer =>

implicit val adapter: GuavaAdapter[F] = promiseInterface.adapter

def executeStatements[M[X] <: TraversableOnce[X]](
col: QueryCollection[M]
): ExecutableStatements[F, M] = {
Expand All @@ -48,7 +49,7 @@ abstract class QueryContext[P[_], F[_], Timeout](

implicit class BatchOps[Status <: ConsistencyBound](val query: BatchQuery[Status]) {
def future()(implicit session: Session, ctx: ExecutionContextExecutor): F[ResultSet] = {
adapter.executeBatch(query.makeBatch())
promiseInterface.adapter.executeBatch(query.makeBatch())
}
}

Expand All @@ -60,7 +61,7 @@ abstract class QueryContext[P[_], F[_], Timeout](
def future()(
implicit session: Session,
ctx: ExecutionContextExecutor
): F[ResultSet] = adapter.fromGuava(query.executableQuery)
): F[ResultSet] = promiseInterface.adapter.fromGuava(query.executableQuery)
}

implicit class RootSelectBlockOps[
Expand Down Expand Up @@ -98,7 +99,7 @@ abstract class QueryContext[P[_], F[_], Timeout](
PS <: HList
](
override val query: SelectQuery[Table, Record, Limit, Order, Status, Chain, PS]
) extends SelectQueryOps(query) {
) extends SelectQueryOps(query)(promiseInterface.adapter, fMonad) {
override def executableQuery: ExecutableCqlQuery = query.executableQuery
}

Expand Down Expand Up @@ -137,7 +138,7 @@ abstract class QueryContext[P[_], F[_], Timeout](
implicit session: Session,
ec: ExecutionContextExecutor
): F[ResultSet] = {
adapter.fromGuava(query.options(query.st))
promiseInterface.adapter.fromGuava(query.options(query.st))
}

/**
Expand All @@ -156,7 +157,7 @@ abstract class QueryContext[P[_], F[_], Timeout](
override def future(modifyStatement: Statement => Statement)(
implicit session: Session,
executor: ExecutionContextExecutor
): F[ResultSet] = adapter.fromGuava(modifyStatement(query.options(query.st)))
): F[ResultSet] = promiseInterface.adapter.fromGuava(modifyStatement(query.options(query.st)))
}

implicit class ExecutablePreparedSelect[
Expand All @@ -183,7 +184,7 @@ abstract class QueryContext[P[_], F[_], Timeout](
implicit session: Session,
ec: ExecutionContextExecutor
): F[ResultSet] = {
adapter.fromGuava(query.options(query.st))
promiseInterface.adapter.fromGuava(query.options(query.st))
}

/**
Expand Down Expand Up @@ -267,7 +268,7 @@ abstract class QueryContext[P[_], F[_], Timeout](
sg: SingleGeneric.Aux[V1, Repr, HL, Out],
ctx: ExecutionContextExecutor,
ev: Out ==:== Repr
): F[ResultSet] = adapter.fromGuava(table.store(input).executableQuery)
): F[ResultSet] = promiseInterface.adapter.fromGuava(table.store(input).executableQuery)

def storeRecords[M[X] <: TraversableOnce[X], V1, Repr <: HList, HL <: HList, Out <: HList](inputs: M[V1])(
implicit keySpace: KeySpace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class SelectQueryOps[
ev: PS =:!= HNil,
rev: Reverse.Aux[PS, Rev],
fMonad: FutureMonad[F],
adapter: GuavaAdapter[F],
interface: PromiseInterface[P, F]
): F[PreparedSelectBlock[Table, Record, Limit, Rev]] = {
val flatten = new PreparedFlattener(executableQuery.qb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ object TwitterPromiseInterface extends PromiseInterface[Promise, Future] {

class TwitterQueryContext extends QueryContext[Promise, Future, Duration](10.seconds)(
TwitterFutureImplicits.monadInstance,
TwitterPromiseInterface,
TwitterGuavaAdapter
TwitterPromiseInterface
) {

override def await[T](f: Future[T], timeout: Duration): T = Await.result(f, timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,26 @@ package object streams {
}
}

/**
* Creates a Reactive Streams publisher from a root select block.
* This will create a reactive streams publisher containing all the records found by this [[SelectQuery]].
* @param session The database session in which to execute this.
* @param ctx The execution context to use.
* @return A streams publisher interface clients can subscribe to.
*/
def publisher()(
implicit session: Session,
ctx: ExecutionContextExecutor
): Publisher[R] = enumeratorToPublisher(query.fetchEnumerator())

/**
* Creates a Reactive Streams publisher from a root select block but also
* allows passing through a query modifier.
* This will create a reactive streams publisher containing all the records found by this [[SelectQuery]].
* @param session The database session in which to execute this.
* @param ctx The execution context to use.
* @return A streams publisher interface clients can subscribe to.
*/
def publisher(modifier: Statement => Statement)(
implicit session: Session,
ctx: ExecutionContextExecutor
Expand Down Expand Up @@ -283,12 +298,31 @@ package object streams {
}
}

/**
* Creates a Reactive Streams publisher from a root select block.
* Because this is a [[RootSelectBlock]], the default execution profile
* of this method will be to select all records in a table and stream them.
* @param session The database session in which to execute this.
* @param keySpace The keyspace in which to execute the query.
* @param ctx The execution context to use.
* @return A streams publisher interface clients can subscribe to.
*/
def publisher()(
implicit session: Session,
keySpace: KeySpace,
ctx: ExecutionContextExecutor
): Publisher[R] = enumeratorToPublisher(block.fetchEnumerator())

/**
* Creates a Reactive Streams publisher from a root select block but also
* allows passing through a query modifier.
* Because this is a [[RootSelectBlock]], the default execution profile
* of this method will be to select all records in a table and stream them.
* @param session The database session in which to execute this.
* @param keySpace The keyspace in which to execute the query.
* @param ctx The execution context to use.
* @return A streams publisher interface clients can subscribe to.
*/
def publisher(modifier: Statement => Statement)(
implicit session: Session,
keySpace: KeySpace,
Expand Down
Loading

0 comments on commit eb3c75d

Please sign in to comment.