From 5514a4ddaeaf12961f037e38de33c4f8f2f62d92 Mon Sep 17 00:00:00 2001 From: Mikhail Sokolov Date: Tue, 3 Sep 2024 11:53:27 +0300 Subject: [PATCH] Drop 2.12 support, add scalafmt, scalafix, MiMa Source and binary compat preserved. The code changes: - scalafmt and scalafix formatting applied - added type annotations to public members and implicits - migrated to Scala 3 syntax for imports - since 2.12 is not supported anymore, replaced CurrentThreadExecutionContext wrapper with direct usage of ExecutionContext.parasitic. This as well leads to removal of the direct dependency on executor-tools as it is not used in the project anymore. --- .github/workflows/ci.yml | 3 +- .scalafix.conf | 7 +++ .scalafmt.conf | 40 ++++++++++++++++ .../concurrent/sequentially/Common.scala | 34 +++++++------- .../SequentiallyAsyncBenchmark.scala | 2 +- .../sequentially/SequentiallyBenchmark.scala | 7 ++- .../SequentiallyHandlerBenchmark.scala | 3 +- .../SequentiallyStreamBenchmark.scala | 2 +- build.sbt | 41 +++++++++++----- project/plugins.sbt | 10 ++-- .../concurrent/MeteredSequentiallyAsync.scala | 36 +++++++------- .../concurrent/SequentiallyMetrics.scala | 23 ++++----- .../sequentially/AsyncHandlerMap.scala | 31 ++++++------ .../concurrent/sequentially/AsyncMap.scala | 16 +++---- .../sequentially/MapDirective.scala | 2 - .../sequentially/SequentialMap.scala | 9 ++-- .../sequentially/Sequentially.scala | 47 ++++++++++--------- .../sequentially/SequentiallyAsync.scala | 33 +++++++------ .../sequentially/SequentiallyHandler.scala | 37 +++++++-------- .../sequentially/SourceQueueHelper.scala | 12 +++-- .../concurrent/sequentially/Substream.scala | 2 +- .../concurrent/sequentially/ActorSpec.scala | 3 +- .../sequentially/AsyncHandlerMapSpec.scala | 32 +++++++------ .../sequentially/AsyncMapSpec.scala | 10 ++-- .../sequentially/SequentialMapSpec.scala | 6 +-- .../sequentially/SequentiallyAsyncSpec.scala | 16 +++---- .../SequentiallyHandlerSpec.scala | 17 ++++--- .../sequentially/SequentiallySpec.scala | 30 ++++++------ .../sequentially/SubstreamSpec.scala | 3 +- 29 files changed, 294 insertions(+), 220 deletions(-) create mode 100644 .scalafix.conf create mode 100644 .scalafmt.conf diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aa9adbd..3b7efde 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,6 @@ jobs: matrix: scala: - 2.13.14 - - 2.12.19 - 3.3.3 steps: @@ -25,7 +24,7 @@ jobs: java-version: openjdk@1.11 - name: build ${{ matrix.scala }} - run: sbt ++${{ matrix.scala }} clean coverage test + run: sbt ++${{ matrix.scala }} clean check coverage test - name: test coverage if: success() diff --git a/.scalafix.conf b/.scalafix.conf new file mode 100644 index 0000000..7d8ffb7 --- /dev/null +++ b/.scalafix.conf @@ -0,0 +1,7 @@ +rules = [OrganizeImports] + +OrganizeImports { + preset = INTELLIJ_2020_3 + removeUnused = false # `true` is not supported in Scala 3.3.0 + targetDialect = Auto +} \ No newline at end of file diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..9ee6fd3 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,40 @@ +version = 3.8.3 + +runner.dialect = scala213source3 + +maxColumn = 110 +trailingCommas = multiple + +align.preset = some +indent.defnSite = 2 +indent.caseSite = 5 +indent.extendSite = 2 +indentOperator.exclude = "^(&&|\\|\\||~)$" +newlines.avoidForSimpleOverflow = [punct, tooLong, slc] +newlines.source = keep +newlines.implicitParamListModifierForce = [after] +spaces.beforeContextBoundColon = always +spaces.inInterpolatedStringCurlyBraces = true +verticalMultiline.atDefnSite = true +verticalMultiline.arityThreshold = 3 +verticalMultiline.newlineAfterOpenParen = true +danglingParentheses.exclude = [] +docstrings.oneline = fold +docstrings.wrap = no +importSelectors = singleLine + +rewrite.rules = [ + RedundantParens, + SortModifiers, +] + +rewrite.sortModifiers.order = [ + "override", + "private", + "protected", + "implicit", + "final", + "sealed", + "abstract", + "lazy", +] diff --git a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/Common.scala b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/Common.scala index a374c98..1c93c5c 100644 --- a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/Common.scala +++ b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/Common.scala @@ -1,25 +1,27 @@ package com.evolutiongaming.concurrent.sequentially -import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations.* -import org.openjdk.jmh.annotations._ +import java.util.concurrent.TimeUnit @State(Scope.Benchmark) @Warmup(iterations = 5) @Measurement(iterations = 10) -@Fork(value = 1, jvmArgs = Array( - "-server", - "-Xms1g", - "-Xmx1g", - "-XX:NewSize=512m", - "-XX:MaxNewSize=512m", - "-XX:InitialCodeCacheSize=256m", - "-XX:ReservedCodeCacheSize=256m", - "-XX:-UseBiasedLocking", - "-XX:+AlwaysPreTouch", - "-XX:+UseParallelGC")) +@Fork( + value = 1, + jvmArgs = Array( + "-server", + "-Xms1g", + "-Xmx1g", + "-XX:NewSize=512m", + "-XX:MaxNewSize=512m", + "-XX:InitialCodeCacheSize=256m", + "-XX:ReservedCodeCacheSize=256m", + "-XX:-UseBiasedLocking", + "-XX:+AlwaysPreTouch", + "-XX:+UseParallelGC", + ), +) @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) -abstract class Common { - -} +abstract class Common {} diff --git a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncBenchmark.scala b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncBenchmark.scala index 05011c3..7000ff3 100644 --- a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncBenchmark.scala +++ b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncBenchmark.scala @@ -5,7 +5,7 @@ import akka.stream.Materializer import org.openjdk.jmh.annotations.{Benchmark, Level, Setup, TearDown} import scala.concurrent.Await -import scala.concurrent.duration._ +import scala.concurrent.duration.* class SequentiallyAsyncBenchmark extends Common { diff --git a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyBenchmark.scala b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyBenchmark.scala index ba0b294..d95dc25 100644 --- a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyBenchmark.scala +++ b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyBenchmark.scala @@ -1,11 +1,10 @@ package com.evolutiongaming.concurrent.sequentially import akka.actor.ActorSystem -import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.annotations.* import scala.concurrent.Await -import scala.concurrent.duration._ - +import scala.concurrent.duration.* class SequentiallyBenchmark extends Common { @@ -28,4 +27,4 @@ class SequentiallyBenchmark extends Common { Await.ready(system.terminate(), 15.seconds) () } -} \ No newline at end of file +} diff --git a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerBenchmark.scala b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerBenchmark.scala index b8b31fc..69f4671 100644 --- a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerBenchmark.scala +++ b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerBenchmark.scala @@ -5,7 +5,7 @@ import akka.stream.Materializer import org.openjdk.jmh.annotations.{Benchmark, Level, Setup, TearDown} import scala.concurrent.Await -import scala.concurrent.duration._ +import scala.concurrent.duration.* class SequentiallyHandlerBenchmark extends Common { @@ -32,4 +32,3 @@ class SequentiallyHandlerBenchmark extends Common { () } } - diff --git a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyStreamBenchmark.scala b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyStreamBenchmark.scala index 3b2ed93..b889c54 100644 --- a/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyStreamBenchmark.scala +++ b/benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyStreamBenchmark.scala @@ -5,7 +5,7 @@ import akka.stream.Materializer import org.openjdk.jmh.annotations.{Benchmark, Level, Setup, TearDown} import scala.concurrent.Await -import scala.concurrent.duration._ +import scala.concurrent.duration.* class SequentiallyStreamBenchmark extends Common { diff --git a/build.sbt b/build.sbt index 03b79fa..5e90e07 100644 --- a/build.sbt +++ b/build.sbt @@ -1,39 +1,50 @@ lazy val commonSettings = Seq( organization := "com.evolutiongaming", - homepage := Some(new URL("http://github.com/evolution-gaming/sequentially")), + homepage := Some(url("https://github.com/evolution-gaming/sequentially")), startYear := Some(2018), organizationName := "Evolution", - organizationHomepage := Some(url("http://evolution.com")), + organizationHomepage := Some(url("https://evolution.com")), scalaVersion := crossScalaVersions.value.head, - crossScalaVersions := Seq("2.13.14", "2.12.19", "3.3.3"), + crossScalaVersions := Seq("2.13.14", "3.3.3"), + Compile / scalacOptions ++= { + if (scalaBinaryVersion.value == "2.13") { + Seq( + "-Xsource:3" + ) + } else Seq.empty + }, Compile / doc / scalacOptions += "-no-link-warnings", publishTo := Some(Resolver.evolutionReleases), licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))), releaseCrossBuild := true, - versionScheme := Some("semver-spec")) + versionScheme := Some("semver-spec"), +) +// Your next release will be binary compatible with the previous one, +// but it may not be source compatible (ie, it will be a minor release). +ThisBuild / versionPolicyIntention := Compatibility.BinaryAndSourceCompatible lazy val root = (project in file(".") settings (name := "sequentially-root") settings commonSettings settings (publish / skip := true) - aggregate(sequentially, benchmark, `sequentially-metrics`)) + aggregate (sequentially, benchmark, `sequentially-metrics`)) lazy val sequentially = (project in file("sequentially") settings (name := "sequentially") settings commonSettings settings (libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-stream" % "2.6.21", - "com.typesafe.akka" %% "akka-testkit" % "2.6.21" % Test, - "com.evolutiongaming" %% "executor-tools" % "1.0.4", - "com.evolutiongaming" %% "future-helper" % "1.0.7", - "org.scalatest" %% "scalatest" % "3.2.10" % Test))) + "com.typesafe.akka" %% "akka-stream" % "2.6.21", + "com.typesafe.akka" %% "akka-testkit" % "2.6.21" % Test, + "com.evolutiongaming" %% "future-helper" % "1.0.7", + "org.scalatest" %% "scalatest" % "3.2.10" % Test, + ))) lazy val benchmark = (project in file("benchmark") - enablePlugins(JmhPlugin) + enablePlugins JmhPlugin settings (name := "benchmark") settings commonSettings dependsOn sequentially) @@ -47,4 +58,10 @@ lazy val `sequentially-metrics` = (project "com.evolutiongaming" %% "prometheus-tools" % "1.0.8" ))) -addCommandAlias("check", "show version") +//used by evolution-gaming/scala-github-actions +addCommandAlias( + "check", + "all versionPolicyCheck Compile/doc scalafmtCheckAll scalafmtSbtCheck; scalafixEnable; scalafixAll --check", +) + +addCommandAlias("fmtAll", "all scalafmtAll scalafmtSbt; scalafixEnable; scalafixAll") diff --git a/project/plugins.sbt b/project/plugins.sbt index f820650..3984fcf 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,11 +1,9 @@ addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2") - addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.12") - addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.11") - addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0") - +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1") addSbtPlugin("com.evolution" % "sbt-scalac-opts-plugin" % "0.0.9") - -addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.3") \ No newline at end of file +addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.3") diff --git a/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/MeteredSequentiallyAsync.scala b/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/MeteredSequentiallyAsync.scala index 5e44d22..e54aefd 100644 --- a/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/MeteredSequentiallyAsync.scala +++ b/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/MeteredSequentiallyAsync.scala @@ -6,27 +6,30 @@ import scala.concurrent.Future object MeteredSequentiallyAsync { - def apply[K](sequentially: SequentiallyAsync[K], - name: String, - sequentiallyMetrics: SequentiallyMetrics.Factory, + def apply[K]( + sequentially: SequentiallyAsync[K], + name: String, + sequentiallyMetrics: SequentiallyMetrics.Factory, ): SequentiallyAsync[K] = { apply(sequentially, sequentiallyMetrics(name)) } - def apply[K](sequentially: SequentiallyAsync[K], - metrics: SequentiallyMetrics, - ): SequentiallyAsync[K] = new SequentiallyAsync[K] { - def async[KK <: K, T](key: K)(task: => Future[T]): Future[T] = { - val start = System.nanoTime() + def apply[K]( + sequentially: SequentiallyAsync[K], + metrics: SequentiallyMetrics, + ): SequentiallyAsync[K] = + new SequentiallyAsync[K] { + def async[KK <: K, T](key: K)(task: => Future[T]): Future[T] = { + val start = System.nanoTime() - def run(): Future[T] = { - metrics.queue(start) - metrics.run(task) - } + def run(): Future[T] = { + metrics.queue(start) + metrics.run(task) + } - sequentially.async(key)(run()) + sequentially.async(key)(run()) + } } - } trait Factory { def apply[K](name: String): SequentiallyAsync[K] @@ -38,8 +41,9 @@ object MeteredSequentiallyAsync { def apply[K]: SequentiallyAsync[K] } - def apply(provider: Provider, - sequentiallyMetrics: SequentiallyMetrics.Factory, + def apply( + provider: Provider, + sequentiallyMetrics: SequentiallyMetrics.Factory, ): Factory = new Factory { override def apply[K](name: String): SequentiallyAsync[K] = MeteredSequentiallyAsync(provider.apply[K], sequentiallyMetrics(name)) diff --git a/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/SequentiallyMetrics.scala b/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/SequentiallyMetrics.scala index 171b9eb..39a0abc 100644 --- a/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/SequentiallyMetrics.scala +++ b/sequentially-metrics/src/main/scala/com/evolutiongaming/concurrent/SequentiallyMetrics.scala @@ -1,6 +1,6 @@ package com.evolutiongaming.concurrent -import com.evolutiongaming.prometheus.PrometheusHelper._ +import com.evolutiongaming.prometheus.PrometheusHelper.* import io.prometheus.client.{CollectorRegistry, Summary} import scala.concurrent.Future @@ -13,30 +13,27 @@ trait SequentiallyMetrics { object SequentiallyMetrics { def empty: SequentiallyMetrics = new SequentiallyMetrics { - def queue(startNanos: Long): Unit = () + def queue(startNanos: Long): Unit = () def run[T](future: => Future[T]): Future[T] = future } - /** - * name: String => SequentiallyMetrics - */ + /** name: String => SequentiallyMetrics */ type Factory = String => SequentiallyMetrics object Factory { def empty: Factory = _ => SequentiallyMetrics.empty - /** - * @note Must be singleton as metric names must be unique. - * @see CollectorRegistry#register - */ + /** @note Must be singleton as metric names must be unique. + * @see CollectorRegistry#register + */ def apply( - prometheusRegistry: CollectorRegistry, - prefix: String = "sequentially", - ): Factory = { + prometheusRegistry: CollectorRegistry, + prefix: String = "sequentially", + ): Factory = { val time = Summary .build() - .name(s"${prefix}_time") + .name(s"${ prefix }_time") .help("Latency of Sequentially operations (queue, run) (by name)") .labelNames("name", "operation") .defaultQuantiles() diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMap.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMap.scala index 37adf31..73ccc94 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMap.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMap.scala @@ -1,14 +1,13 @@ package com.evolutiongaming.concurrent.sequentially -import com.evolutiongaming.concurrent.CurrentThreadExecutionContext -import com.evolutiongaming.concurrent.sequentially.TrieMapHelper._ -import com.evolutiongaming.concurrent.FutureHelper._ +import com.evolutiongaming.concurrent.FutureHelper.* +import com.evolutiongaming.concurrent.sequentially.TrieMapHelper.* import scala.collection.concurrent.TrieMap -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} trait AsyncHandlerMap[K, V] extends AsyncMap[K, V] { - + def updateHandler[T](key: K)(f: Opt => Future[Opt => Future[(Directive, T)]]): Future[T] override def toString: String = s"AsyncHandlerMap${ values mkString "," }" @@ -18,24 +17,24 @@ object AsyncHandlerMap { def apply[K, V]( sequentially: SequentiallyHandler[K], - map: TrieMap[K, V] = TrieMap.empty[K, V]): AsyncHandlerMap[K, V] = { + map: TrieMap[K, V] = TrieMap.empty[K, V], + ): AsyncHandlerMap[K, V] = { - implicit val ec = CurrentThreadExecutionContext + implicit val ec: ExecutionContext = ExecutionContext.parasitic new AsyncHandlerMap[K, V] { - def values = map + def values: TrieMap[K, V] = map - def updateHandler[T](key: K)(f: Opt => Future[Opt => Future[(Directive, T)]]) = { + def updateHandler[T](key: K)(f: Opt => Future[Opt => Future[(Directive, T)]]): Future[T] = { - def value() = map.get(key) + def value(): Option[V] = map.get(key) - def task = f(value()) map { task => - () => - task(value()) map { case (directive, result) => - map.apply(key, directive) - result - } + def task: Future[() => Future[T]] = f(value()) map { task => () => + task(value()) map { case (directive, result) => + map.apply(key, directive) + result + } } sequentially.handler(key)(task) diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncMap.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncMap.scala index 938f0ab..0eb42e5 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncMap.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/AsyncMap.scala @@ -1,11 +1,10 @@ package com.evolutiongaming.concurrent.sequentially -import com.evolutiongaming.concurrent.CurrentThreadExecutionContext -import com.evolutiongaming.concurrent.sequentially.TrieMapHelper._ -import com.evolutiongaming.concurrent.FutureHelper._ +import com.evolutiongaming.concurrent.FutureHelper.* +import com.evolutiongaming.concurrent.sequentially.TrieMapHelper.* import scala.collection.concurrent.TrieMap -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} trait AsyncMap[K, V] extends SequentialMap[K, V] { @@ -18,16 +17,17 @@ object AsyncMap { def apply[K, V]( sequentially: SequentiallyAsync[K], - map: TrieMap[K, V] = TrieMap.empty[K, V]): AsyncMap[K, V] = { + map: TrieMap[K, V] = TrieMap.empty[K, V], + ): AsyncMap[K, V] = { - implicit val ec = CurrentThreadExecutionContext + implicit val ec: ExecutionContext = ExecutionContext.parasitic new AsyncMap[K, V] { - def values = map + def values: TrieMap[K, V] = map def updateAsync[T](key: K)(f: Opt => Future[(Directive, T)]): Future[T] = { - def task = f(map.get(key)) map { case (directive, result) => + def task: Future[T] = f(map.get(key)) map { case (directive, result) => map.apply(key, directive) result } diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/MapDirective.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/MapDirective.scala index 61d835e..c4022a2 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/MapDirective.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/MapDirective.scala @@ -1,6 +1,5 @@ package com.evolutiongaming.concurrent.sequentially - sealed trait MapDirective[+T] object MapDirective { @@ -11,7 +10,6 @@ object MapDirective { def ignore[T]: MapDirective[T] = Ignore - final case class Update[+T](newValue: T) extends MapDirective[T] case object Remove extends MapDirective[Nothing] diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentialMap.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentialMap.scala index 36b5056..6cfda2a 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentialMap.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentialMap.scala @@ -1,6 +1,6 @@ package com.evolutiongaming.concurrent.sequentially -import com.evolutiongaming.concurrent.sequentially.TrieMapHelper._ +import com.evolutiongaming.concurrent.sequentially.TrieMapHelper.* import scala.collection.concurrent.TrieMap import scala.concurrent.Future @@ -41,7 +41,7 @@ trait SequentialMap[K, V] { def getOrUpdate(key: K)(newValue: => V): Future[V] = { update(key) { case Some(value) => (MapDirective.ignore, value) - case None => + case None => val value = newValue (MapDirective.update(value), value) } @@ -61,7 +61,8 @@ object SequentialMap { def apply[K, V]( sequentially: Sequentially[K], - map: TrieMap[K, V] = TrieMap.empty[K, V]): SequentialMap[K, V] = { + map: TrieMap[K, V] = TrieMap.empty[K, V], + ): SequentialMap[K, V] = { new SequentialMap[K, V] { @@ -78,4 +79,4 @@ object SequentialMap { } } } -} \ No newline at end of file +} diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Sequentially.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Sequentially.scala index 12f2567..967715a 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Sequentially.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Sequentially.scala @@ -3,16 +3,14 @@ package com.evolutiongaming.concurrent.sequentially import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} import akka.stream.scaladsl.{Sink, Source} import akka.stream.{Materializer, OverflowStrategy} -import com.evolutiongaming.concurrent.sequentially.SourceQueueHelper._ -import com.evolutiongaming.concurrent.{AvailableProcessors, CurrentThreadExecutionContext} +import com.evolutiongaming.concurrent.AvailableProcessors +import com.evolutiongaming.concurrent.sequentially.SourceQueueHelper.* -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.duration.* +import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.util.Try -/** - * Runs tasks sequentially for the same key and in parallel - for different keys - */ +/** Runs tasks sequentially for the same key and in parallel - for different keys */ trait Sequentially[-K] { def apply[KK <: K, T](key: KK)(task: => T): Future[T] @@ -26,7 +24,6 @@ object Sequentially { val BufferSize: Int = Int.MaxValue val Timeout: FiniteDuration = 5.seconds - def apply[K](factory: ActorRefFactory): Sequentially[K] = { apply(factory, None, Substreams) } @@ -35,11 +32,20 @@ object Sequentially { apply(factory, name, Substreams) } - def apply[K](factory: ActorRefFactory, name: Option[String], substreams: Int): Sequentially[K] = { + def apply[K]( + factory: ActorRefFactory, + name: Option[String], + substreams: Int, + ): Sequentially[K] = { apply(factory, name, substreams, Timeout) } - def apply[K](factory: ActorRefFactory, name: Option[String], substreams: Int, timeout: FiniteDuration): Sequentially[K] = { + def apply[K]( + factory: ActorRefFactory, + name: Option[String], + substreams: Int, + timeout: FiniteDuration, + ): Sequentially[K] = { require(substreams > 0, s"substreams is $substreams") @@ -51,9 +57,9 @@ object Sequentially { val promise = Promise[Map[Int, ActorRef]]() - def supervisor() = new Actor { - val props = Props(actor()) - val refs = for { + def supervisor(): Actor = new Actor { + private val props = Props(actor()) + private val refs = for { substream <- 0 until substreams } yield { val ref = context.actorOf(props, name = substream.toString) @@ -61,7 +67,7 @@ object Sequentially { } promise.success(refs.toMap) - def receive = PartialFunction.empty + def receive: Receive = PartialFunction.empty } val props = Props(supervisor()) @@ -81,12 +87,13 @@ object Sequentially { } } - def apply[K]( substreams: Int = Substreams, bufferSize: Int = BufferSize, - overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure) - (implicit materializer: Materializer): Sequentially[K] = { + overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure, + )(implicit + materializer: Materializer + ): Sequentially[K] = { val queue = Source .queue[Elem](bufferSize, overflowStrategy) @@ -96,7 +103,7 @@ object Sequentially { .to(Sink.ignore) .run()(materializer) - implicit val ecNow = CurrentThreadExecutionContext + implicit val ecNow: ExecutionContext = ExecutionContext.parasitic val ec = materializer.executionContext case class Elem(substream: Int, apply: () => Future[Any]) @@ -119,7 +126,6 @@ object Sequentially { } } - def now[K]: Sequentially[K] = Now private object Now extends Sequentially[Any] { @@ -128,8 +134,7 @@ object Sequentially { } } - class Comap[A, B](tmp: A => B, sequentially: Sequentially[B]) extends Sequentially[A] { def apply[AA <: A, T](key: AA)(f: => T): Future[T] = sequentially(tmp(key))(f) } -} \ No newline at end of file +} diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsync.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsync.scala index 67f0885..0d9e397 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsync.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsync.scala @@ -2,16 +2,14 @@ package com.evolutiongaming.concurrent.sequentially import akka.stream.scaladsl.{Sink, Source, SourceQueue} import akka.stream.{Materializer, OverflowStrategy} -import com.evolutiongaming.concurrent.FutureHelper._ +import com.evolutiongaming.concurrent.FutureHelper.* import com.evolutiongaming.concurrent.sequentially.Sequentially.{BufferSize, Substreams} -import com.evolutiongaming.concurrent.sequentially.SourceQueueHelper._ +import com.evolutiongaming.concurrent.sequentially.SourceQueueHelper.* import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.control.NonFatal -/** - * Runs tasks sequentially for the same key and in parallel - for different keys - */ +/** Runs tasks sequentially for the same key and in parallel - for different keys */ trait SequentiallyAsync[-K] extends Sequentially[K] { def async[KK <: K, T](key: K)(task: => Future[T]): Future[T] @@ -28,8 +26,10 @@ object SequentiallyAsync { def apply[K]( substreams: Int = Substreams, bufferSize: Int = BufferSize, - overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure) - (implicit materializer: Materializer): SequentiallyAsync[K] = { + overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure, + )(implicit + materializer: Materializer + ): SequentiallyAsync[K] = { require(substreams > 0, s"substreams is $substreams") require(bufferSize > 0, s"bufferSize is $bufferSize") @@ -42,13 +42,17 @@ object SequentiallyAsync { .to(Sink.ignore) .run() - implicit val ec = materializer.executionContext + implicit val ec: ExecutionContext = materializer.executionContext apply(substreams, queue) } - def apply[K](substreams: Int, queue: SourceQueue[Elem]) - (implicit ec: ExecutionContext): SequentiallyAsync[K] = { + def apply[K]( + substreams: Int, + queue: SourceQueue[Elem], + )(implicit + ec: ExecutionContext + ): SequentiallyAsync[K] = { val pf: PartialFunction[Throwable, Unit] = { case _ => () } @@ -71,15 +75,14 @@ object SequentiallyAsync { } } - def now[K]: SequentiallyAsync[K] = Now - private object Now extends SequentiallyAsync[Any] { - def async[KK <: Any, T](key: Any)(task: => Future[T]) = { - try task catch { case NonFatal(failure) => Future.failed(failure) } + def async[KK <: Any, T](key: Any)(task: => Future[T]): Future[T] = { + try task + catch { case NonFatal(failure) => Future.failed(failure) } } } final case class Elem(substream: Int, apply: () => Future[Any]) -} \ No newline at end of file +} diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandler.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandler.scala index 6daf6d5..32be6c4 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandler.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandler.scala @@ -2,18 +2,16 @@ package com.evolutiongaming.concurrent.sequentially import akka.stream.scaladsl.{Sink, Source} import akka.stream.{Materializer, OverflowStrategy} -import com.evolutiongaming.concurrent.FutureHelper._ +import com.evolutiongaming.concurrent.AvailableProcessors +import com.evolutiongaming.concurrent.FutureHelper.* import com.evolutiongaming.concurrent.sequentially.Sequentially.{BufferSize, Substreams} -import com.evolutiongaming.concurrent.sequentially.SourceQueueHelper._ -import com.evolutiongaming.concurrent.{AvailableProcessors, CurrentThreadExecutionContext} +import com.evolutiongaming.concurrent.sequentially.SourceQueueHelper.* import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.control.NonFatal import scala.util.{Failure, Success} -/** - * Runs tasks sequentially for the same key and in parallel - for different keys - */ +/** Runs tasks sequentially for the same key and in parallel - for different keys */ trait SequentiallyHandler[-K] extends SequentiallyAsync[K] { def handler[KK <: K, T](key: KK)(task: => Future[() => Future[T]]): Future[T] @@ -33,8 +31,10 @@ object SequentiallyHandler { substreams: Int = Substreams, parallelism: Int = Parallelism, bufferSize: Int = BufferSize, - overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure) - (implicit materializer: Materializer): SequentiallyHandler[K] = { + overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure, + )(implicit + materializer: Materializer + ): SequentiallyHandler[K] = { require(substreams > 0, s"substreams is $substreams") require(bufferSize > 0, s"bufferSize is $bufferSize") @@ -51,7 +51,7 @@ object SequentiallyHandler { .to(Sink.ignore) .run() - implicit val ec = materializer.executionContext + implicit val ec: ExecutionContext = materializer.executionContext case class Elem(substream: Int, apply: () => Future[() => Future[Any]]) @@ -62,13 +62,14 @@ object SequentiallyHandler { val promise = Promise[T]() val safeTask = () => { - val safeTask = () => task.map { task => - () => { - val future = Future(task()).flatten - promise.completeWith(future) - future.recover[Any](pf) + val safeTask = () => + task.map { task => () => + { + val future = Future(task()).flatten + promise.completeWith(future) + future.recover[Any](pf) + } } - } Future .apply { safeTask() } @@ -77,7 +78,7 @@ object SequentiallyHandler { case Failure(e) => promise.failure(e) Success(() => Future.unit) - case a => a + case a => a } } @@ -91,13 +92,11 @@ object SequentiallyHandler { } } - def now[T]: SequentiallyHandler[T] = Now - private object Now extends SequentiallyHandler[Any] { - private implicit val ec: ExecutionContext = CurrentThreadExecutionContext + private implicit val ec: ExecutionContext = ExecutionContext.parasitic def handler[KK <: Any, T](key: KK)(task: => Future[() => Future[T]]): Future[T] = { try { diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SourceQueueHelper.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SourceQueueHelper.scala index 680853a..efdd3be 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SourceQueueHelper.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/SourceQueueHelper.scala @@ -1,7 +1,7 @@ package com.evolutiongaming.concurrent.sequentially +import akka.stream.QueueOfferResult as Result import akka.stream.scaladsl.SourceQueue -import akka.stream.{QueueOfferResult => Result} import scala.concurrent.{ExecutionContext, Future} @@ -9,7 +9,12 @@ object SourceQueueHelper { implicit class SourceQueueOps[T](val self: SourceQueue[T]) extends AnyVal { - def offerOrError(elem: T, errorMsg: => String)(implicit ec: ExecutionContext): Future[Unit] = { + def offerOrError( + elem: T, + errorMsg: => String, + )(implicit + ec: ExecutionContext + ): Future[Unit] = { for { result <- self.offer(elem) result <- result match { @@ -22,4 +27,5 @@ object SourceQueueHelper { } } -class QueueException(message: String, cause: Option[Throwable] = None) extends RuntimeException(message, cause.orNull) \ No newline at end of file +class QueueException(message: String, cause: Option[Throwable] = None) + extends RuntimeException(message, cause.orNull) diff --git a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Substream.scala b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Substream.scala index 8094290..a35ffd2 100644 --- a/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Substream.scala +++ b/sequentially/src/main/scala/com/evolutiongaming/concurrent/sequentially/Substream.scala @@ -4,4 +4,4 @@ object Substream { def apply[T](key: T, substreams: Int): Int = { math.abs(key.hashCode() % substreams) } -} \ No newline at end of file +} diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/ActorSpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/ActorSpec.scala index 710aa85..01d4421 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/ActorSpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/ActorSpec.scala @@ -9,10 +9,9 @@ trait ActorSpec extends BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem(getClass.getSimpleName) - override protected def afterAll() = { + override protected def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } abstract class ActorScope extends TestKit(system) with ImplicitSender with DefaultTimeout } - diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMapSpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMapSpec.scala index 95a9044..12c9480 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMapSpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncHandlerMapSpec.scala @@ -1,19 +1,18 @@ package com.evolutiongaming.concurrent.sequentially import akka.stream.Materializer -import com.evolutiongaming.concurrent.CurrentThreadExecutionContext -import com.evolutiongaming.concurrent.FutureHelper._ +import com.evolutiongaming.concurrent.FutureHelper.* +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec -import scala.concurrent.duration._ +import scala.concurrent.duration.* import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.util.control.NoStackTrace import scala.util.{Failure, Success} -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec class AsyncHandlerMapSpec extends AnyWordSpec with Matchers with ActorSpec { - implicit val ec: ExecutionContext = CurrentThreadExecutionContext + implicit val ec: ExecutionContext = ExecutionContext.parasitic type K = Int type V = Int @@ -154,7 +153,9 @@ class AsyncHandlerMapSpec extends AnyWordSpec with Matchers with ActorSpec { map.update(key) { _ => throw TestException }.value shouldEqual Some(Failure(TestException)) map.updateAsync(key) { _ => Future.failed(TestException) }.value shouldEqual Some(Failure(TestException)) map.updateHandler(key) { _ => throw TestException }.value shouldEqual Some(Failure(TestException)) - map.updateHandler(key) { _ => Future.failed(TestException) }.value shouldEqual Some(Failure(TestException)) + map.updateHandler(key) { _ => Future.failed(TestException) }.value shouldEqual Some( + Failure(TestException) + ) } } @@ -164,14 +165,17 @@ class AsyncHandlerMapSpec extends AnyWordSpec with Matchers with ActorSpec { val key: K = 0 val map = AsyncHandlerMap[K, V](sequentially) - def update(idx: Int, p0: Promise[Unit], p1: Promise[Unit]): Future[Int] = { + def update( + idx: Int, + p0: Promise[Unit], + p1: Promise[Unit], + ): Future[Int] = { map.updateHandler[Int](key) { _ => - p0.future map { _ => - (_: Option[V]) => - p1.future map { _ => - val directive = MapDirective.update(idx) - (directive, idx) - } + p0.future map { _ => (_: Option[V]) => + p1.future map { _ => + val directive = MapDirective.update(idx) + (directive, idx) + } } } } diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncMapSpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncMapSpec.scala index 9aac9e4..980dbbf 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncMapSpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/AsyncMapSpec.scala @@ -1,16 +1,15 @@ package com.evolutiongaming.concurrent.sequentially -import com.evolutiongaming.concurrent.CurrentThreadExecutionContext -import com.evolutiongaming.concurrent.FutureHelper._ +import com.evolutiongaming.concurrent.FutureHelper.* +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec import scala.concurrent.{ExecutionContext, Promise} import scala.util.Success -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec class AsyncMapSpec extends AnyWordSpec with Matchers { - implicit val ec: ExecutionContext = CurrentThreadExecutionContext + implicit val ec: ExecutionContext = ExecutionContext.parasitic "AsyncMap" should { @@ -96,4 +95,3 @@ class AsyncMapSpec extends AnyWordSpec with Matchers { val map = AsyncMap[Int, String](SequentiallyAsync.now) } } - diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentialMapSpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentialMapSpec.scala index 6e0a1fc..3a00694 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentialMapSpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentialMapSpec.scala @@ -1,10 +1,10 @@ package com.evolutiongaming.concurrent.sequentially - -import scala.util.Success import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import scala.util.Success + class SequentialMapSpec extends AnyWordSpec with Matchers { "SequentialMap" should { @@ -65,6 +65,6 @@ class SequentialMapSpec extends AnyWordSpec with Matchers { } private trait Scope { - val map = SequentialMap[Int, String](Sequentially.now) + val map: SequentialMap[Int, String] = SequentialMap[Int, String](Sequentially.now) } } diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncSpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncSpec.scala index 5eb9a85..ee3161b 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncSpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyAsyncSpec.scala @@ -1,14 +1,13 @@ package com.evolutiongaming.concurrent.sequentially import akka.stream.{Materializer, OverflowStrategy} - -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future, Promise, TimeoutException} -import scala.util.control.NoStackTrace import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import scala.annotation.nowarn +import scala.concurrent.duration.* +import scala.concurrent.{Await, Future, Promise, TimeoutException} +import scala.util.control.NoStackTrace @nowarn class SequentiallyAsyncSpec extends AnyWordSpec with ActorSpec with Matchers { @@ -89,7 +88,8 @@ class SequentiallyAsyncSpec extends AnyWordSpec with ActorSpec with Matchers { private class Scope( bufferSize: Int = Int.MaxValue, - overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure) { + overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure, + ) { implicit val materializer: Materializer = Materializer(system) @@ -98,18 +98,16 @@ class SequentiallyAsyncSpec extends AnyWordSpec with ActorSpec with Matchers { val promise1 = Promise[Unit]() val promise2 = Promise[Unit]() - def expectTimeout[T](future: Future[T]) = { + def expectTimeout[T](future: Future[T]): TimeoutException = { the[TimeoutException] thrownBy { Await.result(future, 100.millis) } } - def await[T](future: Future[T]) = { + def await[T](future: Future[T]): T = { Await.result(future, 300.millis) } case object TestException extends RuntimeException with NoStackTrace } } - - diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerSpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerSpec.scala index 813ad99..b4899ca 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerSpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyHandlerSpec.scala @@ -1,15 +1,14 @@ package com.evolutiongaming.concurrent.sequentially import akka.stream.Materializer -import com.evolutiongaming.concurrent.CurrentThreadExecutionContext -import com.evolutiongaming.concurrent.FutureHelper._ - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future, Promise, TimeoutException} -import scala.util.control.NoStackTrace +import com.evolutiongaming.concurrent.FutureHelper.* import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import scala.concurrent.* +import scala.concurrent.duration.* +import scala.util.control.NoStackTrace + class SequentiallyHandlerSpec extends AnyWordSpec with ActorSpec with Matchers { "SequentiallyHandler" should { @@ -107,19 +106,19 @@ class SequentiallyHandlerSpec extends AnyWordSpec with ActorSpec with Matchers { } implicit val materializer: Materializer = Materializer(system) - implicit val ec: ExecutionContext = CurrentThreadExecutionContext + implicit val ec: ExecutionContext = ExecutionContext.parasitic private trait Scope { val sequentially = SequentiallyHandler[Int]() - def expectTimeout[T](future: Future[T]) = { + def expectTimeout[T](future: Future[T]): TimeoutException = { the[TimeoutException] thrownBy { Await.result(future, 100.millis) } } - def await[T](future: Future[T]) = { + def await[T](future: Future[T]): T = { Await.result(future, 300.millis) } } diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallySpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallySpec.scala index 8cde5ba..a5d07d8 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallySpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SequentiallySpec.scala @@ -2,12 +2,12 @@ package com.evolutiongaming.concurrent.sequentially import akka.stream.Materializer import org.scalatest.concurrent.ScalaFutures - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future, Promise} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import scala.concurrent.duration.* +import scala.concurrent.{Await, ExecutionContext, Future, Promise} + class SequentiallySpec extends AnyWordSpec with ActorSpec with Matchers with ScalaFutures { implicit val ec: ExecutionContext = system.dispatcher @@ -19,10 +19,11 @@ class SequentiallySpec extends AnyWordSpec with ActorSpec with Matchers with Sca "run sequentially for key" in new Scope { - val futures = for {_ <- 0 to n} yield Future { - val futures = for {key <- 0 to n} yield sequentially(key) { Thread.sleep(1); key } - Future sequence futures - } flatMap identity + val futures = + for { _ <- 0 to n } yield Future { + val futures = for { key <- 0 to n } yield sequentially(key) { Thread.sleep(1); key } + Future sequence futures + } flatMap identity for { xs <- (Future sequence futures).futureValue @@ -42,7 +43,7 @@ class SequentiallySpec extends AnyWordSpec with ActorSpec with Matchers with Sca future2.futureValue shouldEqual 2 } - "support case class as key" in new ActorScope { + "support case class as key" in new ActorScope { case class Key(value: String) val sequentially: Sequentially[Key] = Sequentially[Key](system) @@ -71,7 +72,7 @@ class SequentiallySpec extends AnyWordSpec with ActorSpec with Matchers with Sca "handle any key" in new Scope { val expected = (0 to 100).toSet - val futures = for { key <- expected} yield sequentially(key) { key } + val futures = for { key <- expected } yield sequentially(key) { key } val actual = Future.sequence(futures).futureValue actual shouldEqual expected } @@ -81,10 +82,11 @@ class SequentiallySpec extends AnyWordSpec with ActorSpec with Matchers with Sca "run sequentially for key" in new StreamScope { - val futures = for {_ <- 0 to n} yield Future { - val futures = for {key <- 0 to n} yield sequentially(key) { Thread.sleep(1); key } - Future sequence futures - } flatMap identity + val futures = + for { _ <- 0 to n } yield Future { + val futures = for { key <- 0 to n } yield sequentially(key) { Thread.sleep(1); key } + Future sequence futures + } flatMap identity for { xs <- (Future sequence futures).futureValue @@ -121,7 +123,7 @@ class SequentiallySpec extends AnyWordSpec with ActorSpec with Matchers with Sca val sequentially: Sequentially[Int] = Sequentially[Int]() } - def await[T](future: Future[T]) = { + def await[T](future: Future[T]): T = { Await.result(future, 5.seconds) } } diff --git a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SubstreamSpec.scala b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SubstreamSpec.scala index 92fdbc0..731c0b9 100644 --- a/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SubstreamSpec.scala +++ b/sequentially/src/test/scala/com/evolutiongaming/concurrent/sequentially/SubstreamSpec.scala @@ -10,7 +10,8 @@ class SubstreamSpec extends AnyFunSuite with Matchers { (1, 1), (10, 0), (Int.MaxValue, 7), - (Int.MinValue, 8)) + (Int.MinValue, 8), + ) } { test(s"return $substream for $key") { Substream(key, 10) shouldEqual substream