Skip to content

Commit

Permalink
Optimize execution of ZPure (#1306)
Browse files Browse the repository at this point in the history
* Optimizations for ZPure runloop

* Improve Fold

* Use `packed0 != 1`

* One more test for sanity's safe

* Final cleanups

* Reimplement runloop via a Runner class and use thread locals to cache runners

* Cleanup Fail loop
  • Loading branch information
kyri-petrou committed May 7, 2024
1 parent 7da87cb commit 4e52744
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 134 deletions.
64 changes: 64 additions & 0 deletions benchmarks/src/main/scala/zio/prelude/ZPureFullBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package zio.prelude

import org.openjdk.jmh.annotations.{State => BenchmarkState, _}
import org.openjdk.jmh.infra.Blackhole
import zio.prelude.fx.ZPure
import zio.{Scope => _}

import java.util.concurrent.TimeUnit

@BenchmarkState(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 3, timeUnit = TimeUnit.SECONDS, time = 1)
@Warmup(iterations = 3, timeUnit = TimeUnit.SECONDS, time = 1)
@Fork(value = 2)
class ZPureFullBenchmark {

var list: List[Int] = _

@Param(Array("1", "1000"))
var size: Int = _

@Setup(Level.Trial)
def setup(): Unit =
list = (1 to size).toList

@Benchmark
def fallibleBenchmark(bh: Blackhole): Unit = bh.consume(runFallible)

@Benchmark
def infallibleBenchmark(bh: Blackhole): Unit = bh.consume(runInfallible)

private def runFallible =
ZPure
.foreachDiscard(list)(_ =>
(for {
conf <- ZPure.environmentWith[Env](_.get.config)
_ <- ZPure.log(Event(s"Env = $conf"))
add = 1
_ <- if (true) ZPure.unit[State] else ZPure.fail(new Throwable("boom"))
_ <- ZPure.update[State, State](state => state.copy(value = state.value + add))
} yield ()).catchAll(e => ZPure.log[State, Event](Event(e.toString)))
)
.provideService(Env())
.run(State())

private def runInfallible =
ZPure
.foreachDiscard(list)(_ =>
(for {
conf <- ZPure.environmentWith[Env](_.get.config)
_ <- ZPure.log(Event(s"Env = $conf"))
add = 1
_ <- ZPure.unit[State]
_ <- ZPure.update[State, State](state => state.copy(value = state.value + add))
} yield ()) *> ZPure.unit[State]
)
.provideService(Env())
.run(State())

private case class Env(config: String = "foo")
private case class Event(value: String)
private case class State(value: Int = 0)
}
48 changes: 48 additions & 0 deletions core-tests/shared/src/test/scala/zio/prelude/fx/ZPureSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,26 @@ object ZPureSpec extends ZIOBaseSpec {
} yield ()
assert(zPure.provideState("").runLog)(equalTo((Chunk(1, 2), ())))
},
test("using clearLogOnError and keepLogOnError before the error is handled") {
def log(i: Int): ZPure[Int, String, String, Any, Nothing, Unit] = ZPure.log(i)
val zPure =
for {
_ <- (log(1) *> ZPure.fail("baz")).keepLogOnError.either
_ <- log(2)
_ <- (log(3) *> ZPure.fail("baz")).clearLogOnError.either
} yield ()
assert(zPure.provideState("").runLog)(equalTo((Chunk(1, 2), ())))
},
test("nested error handling with keepLogOnError / clearLogOnError") {
def log(i: Int): ZPure[Int, String, String, Any, Nothing, Unit] = ZPure.log(i)
val zPure =
for {
_ <- ((log(1) *> ZPure.fail("baz")).either *> log(11)).keepLogOnError
_ <- log(2)
_ <- ((log(3) *> ZPure.fail("baz")).either *> log(33)).clearLogOnError
} yield ()
assert(zPure.provideState("").runLog)(equalTo((Chunk(1, 11, 2, 33), ())))
},
test("log is not cleared after failure with keepLogOnError when the whole computation fails") {
def log(i: Int): ZPure[Int, String, String, Any, Nothing, Unit] = ZPure.log(i)
val zPure = log(1) *> ZPure.fail("baz")
Expand Down Expand Up @@ -952,6 +972,34 @@ object ZPureSpec extends ZIOBaseSpec {
res2 <- ZPure.get
} yield assert(res1)(isFalse) && assert(res2)(isTrue)
zPure.runResult(false)
},
suite("thread local caching") {
val computation: ZPure[String, Unit, Unit, Any, Nothing, Int] =
for {
a <- ZPure.succeed(1 + 1)
_ <- ZPure.log("plus")
b <- ZPure.succeed(a * 3)
_ <- ZPure.log("times")
} yield b

test("reentrant safe") {
val outer: ZPure[String, Unit, Unit, Any, Nothing, (Int, (Chunk[String], Int))] = for {
a <- ZPure.succeed(1 + 1)
_ <- ZPure.log("outerPlus")
innerResult = computation.runLog
b <- ZPure.succeed(a * 3)
_ <- ZPure.log("outerTimes")
} yield (b, innerResult)

val expected = (Chunk("outerPlus", "outerTimes"), (6, (Chunk("plus", "times"), 6)))
assert(outer.runLog)(equalTo(expected))
} +
test("runners are cleared after completion") {
val first = computation.runLog
val second = computation.runLog
val assertion = equalTo((Chunk("plus", "times"), 6))
assert(first)(assertion) && assert(second)(assertion)
}
}
)

Expand Down
65 changes: 65 additions & 0 deletions core/shared/src/main/scala/zio/prelude/fx/Stack.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package zio.prelude.fx

/**
* Lightweight port of zio.internal.Stack, optimized for usage with ZPure
*/
private final class Stack[A <: AnyRef] { self =>
import Stack._

private[this] var array = new Array[AnyRef](ArrSize)
private[this] var packed = 0

def clear(): Unit = {
var i = 0
while (i < ArrSize && (array(i) ne null)) {
array(i) = null
i += 1
}
packed = 0
}

/**
* Pushes an item onto the stack.
*/
def push(a: A): Unit = {
val packed0 = packed
val used = packed0 & 0xf
if (used == ArrSize) {
val newArr = new Array[AnyRef](ArrSize)
newArr(0) = array
newArr(1) = a
array = newArr
packed += 3
} else {
array(used) = a
packed += 1
}
}

/**
* Pops an item off the stack, or returns `null` if the stack is empty.
*/
def pop(): A = {
val packed0 = packed
if (packed0 == 0) {
null.asInstanceOf[A]
} else {
val used = packed0 & 0xf
val idx = used - 1
var a = array(idx)
if (idx == 0 && packed0 != 1) {
val arr0 = a.asInstanceOf[Array[AnyRef]]
a = arr0(ArrSize - 1)
array = arr0
packed -= 3
} else {
packed -= 1
}
a.asInstanceOf[A]
}
}
}

private object Stack {
private final val ArrSize = 15 // Can be made smaller, but not larger
}
Loading

0 comments on commit 4e52744

Please sign in to comment.