Skip to content

Commit

Permalink
Minor cleanups for Evaluator logic (#4144)
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaoyi authored Dec 17, 2024
1 parent 61c5f8b commit 2c6157d
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 172 deletions.
1 change: 1 addition & 0 deletions build.mill
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ trait MillStableScalaModule extends MillPublishScalaModule with Mima {
// (5x) MIMA doesn't properly ignore things which are nested inside other private things
// so we have to put explicit ignores here (https://github.com/lightbend/mima/issues/771)
ProblemFilter.exclude[Problem]("mill.eval.ProfileLogger*"),
ProblemFilter.exclude[Problem]("mill.eval.ChromeProfileLogger*"),
ProblemFilter.exclude[Problem]("mill.eval.GroupEvaluator*"),
ProblemFilter.exclude[Problem]("mill.eval.EvaluatorCore*"),
ProblemFilter.exclude[Problem]("mill.eval.Tarjans*"),
Expand Down
4 changes: 4 additions & 0 deletions main/define/src/mill/define/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ abstract class Task[+T] extends Task.Ops[T] with Applyable[Task, T] {
def asCommand: Option[Command[T]] = None
def asWorker: Option[Worker[T]] = None
def self: Task[T] = this
def isExclusiveCommand: Boolean = this match {
case c: Command[_] if c.exclusive => true
case _ => false
}
}

object Task extends TaskBase {
Expand Down
134 changes: 31 additions & 103 deletions main/eval/src/mill/eval/EvaluatorCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import mill.api.Strict.Agg
import mill.api._
import mill.define._
import mill.eval.Evaluator.TaskResult
import mill.main.client.OutFiles
import mill.util._

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.collection.mutable
import scala.concurrent._
import scala.jdk.CollectionConverters.EnumerationHasAsScala

/**
* Core logic of evaluating tasks, without any user-facing helper methods
Expand Down Expand Up @@ -79,26 +78,19 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
val count = new AtomicInteger(1)
val indexToTerminal = sortedGroups.keys().toArray
val terminalToIndex = indexToTerminal.zipWithIndex.toMap
val upstreamIndexEdges =
indexToTerminal.map(t => interGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray)
os.write.over(
outPath / OutFiles.millDependencyTree,
SpanningForest.spanningTreeToJsonTree(
SpanningForest(upstreamIndexEdges, indexToTerminal.indices.toSet, true),
i => indexToTerminal(i).render
).render(indent = 2)
)

val futures = mutable.Map.empty[Terminal, Future[Option[GroupEvaluator.Results]]]
EvaluatorLogs.logDependencyTree(interGroupDeps, indexToTerminal, terminalToIndex, outPath)

// Prepare a lookup tables up front of all the method names that each class owns,
// and the class hierarchy, so during evaluation it is cheap to look up what class
// each target belongs to determine of the enclosing class code signature changed.
val (classToTransitiveClasses, allTransitiveClassMethods) =
CodeSigUtils.precomputeMethodNamesPerClass(sortedGroups)

val uncached = new java.util.concurrent.ConcurrentHashMap[Terminal, Unit]()
val changedValueHash = new java.util.concurrent.ConcurrentHashMap[Terminal, Unit]()
val uncached = new ConcurrentHashMap[Terminal, Unit]()
val changedValueHash = new ConcurrentHashMap[Terminal, Unit]()

val futures = mutable.Map.empty[Terminal, Future[Option[GroupEvaluator.Results]]]

def evaluateTerminals(
terminals: Seq[Terminal],
Expand All @@ -114,21 +106,19 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
// due to the topological order of traversal.
for (terminal <- terminals) {
val deps = interGroupDeps(terminal)
def isExclusiveCommand(t: Task[_]) = t match {
case c: Command[_] if c.exclusive => true
case _ => false
}

val group = sortedGroups.lookupKey(terminal)
val exclusiveDeps = deps.filter(d => isExclusiveCommand(d.task))
val exclusiveDeps = deps.filter(d => d.task.isExclusiveCommand)

if (!isExclusiveCommand(terminal.task) && exclusiveDeps.nonEmpty) {
if (!terminal.task.isExclusiveCommand && exclusiveDeps.nonEmpty) {
val failure = Result.Failure(
s"Non-exclusive task ${terminal.render} cannot depend on exclusive task " +
exclusiveDeps.map(_.render).mkString(", ")
)
val taskResults =
group.map(t => (t, TaskResult[(Val, Int)](failure, () => failure))).toMap
val taskResults = group
.map(t => (t, TaskResult[(Val, Int)](failure, () => failure)))
.toMap

futures(terminal) = Future.successful(
Some(GroupEvaluator.Results(taskResults, group.toSeq, false, -1, -1, false))
)
Expand All @@ -150,19 +140,13 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
.toMap

val startTime = System.nanoTime() / 1000
val targetLabel = terminal match {
case Terminal.Task(task) => None
case t: Terminal.Labelled[_] => Some(Terminal.printTerm(t))
}

// should we log progress?
val logRun = targetLabel.isDefined && {
val inputResults = for {
target <- group.indexed.filterNot(upstreamResults.contains)
item <- target.inputs.filterNot(group.contains)
} yield upstreamResults(item).map(_._1)
inputResults.forall(_.result.isInstanceOf[Result.Success[_]])
}
val inputResults = for {
target <- group.indexed.filterNot(upstreamResults.contains)
item <- target.inputs.filterNot(group.contains)
} yield upstreamResults(item).map(_._1)
val logRun = inputResults.forall(_.result.isInstanceOf[Result.Success[_]])

val tickerPrefix = terminal.render.collect {
case targetLabel if logRun && logger.enableTicker => targetLabel
Expand Down Expand Up @@ -195,31 +179,15 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
failed.set(true)

val endTime = System.nanoTime() / 1000

val duration = endTime - startTime

chromeProfileLogger.log(
task = Terminal.printTerm(terminal),
cat = "job",
startTime = startTime,
duration = duration,
threadId = threadNumberer.getThreadId(Thread.currentThread()),
cached = res.cached
)
val threadId = threadNumberer.getThreadId(Thread.currentThread())
chromeProfileLogger.log(terminal, "job", startTime, duration, threadId, res.cached)

if (!res.cached) uncached.put(terminal, ())
if (res.valueHashChanged) changedValueHash.put(terminal, ())

profileLogger.log(
ProfileLogger.Timing(
terminal.render,
(duration / 1000).toInt,
res.cached,
res.valueHashChanged,
deps.map(_.render),
res.inputsHash,
res.previousInputsHash
)
)
profileLogger.log(terminal, duration, res, deps)

Some(res)
}
Expand All @@ -241,12 +209,7 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {

val tasksTransitive = tasksTransitive0.toSet
val (tasks, leafExclusiveCommands) = terminals0.partition {
case Terminal.Labelled(t, _) =>
if (tasksTransitive.contains(t)) true
else t match {
case t: Command[_] => !t.exclusive
case _ => false
}
case Terminal.Labelled(t, _) => tasksTransitive.contains(t) || !t.isExclusiveCommand
case _ => !serialCommandExec
}

Expand All @@ -260,6 +223,15 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {

val finishedOptsMap = (nonExclusiveResults ++ exclusiveResults).toMap

EvaluatorLogs.logInvalidationTree(
interGroupDeps,
indexToTerminal,
terminalToIndex,
outPath,
uncached,
changedValueHash
)

val results0: Vector[(Task[_], TaskResult[(Val, Int)])] = terminals0
.flatMap { t =>
sortedGroups.lookupKey(t).flatMap { t0 =>
Expand All @@ -272,50 +244,6 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {

val results: Map[Task[_], TaskResult[(Val, Int)]] = results0.toMap

val reverseInterGroupDeps = interGroupDeps
.iterator
.flatMap { case (k, vs) => vs.map(_ -> k) }
.toSeq
.groupMap(_._1)(_._2)

val changedTerminalIndices = changedValueHash.keys().asScala.toSet
val downstreamIndexEdges = indexToTerminal
.map(t =>
if (changedTerminalIndices(t))
reverseInterGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray
else Array.empty[Int]
)

val edgeSourceIndices = downstreamIndexEdges
.zipWithIndex
.collect { case (es, i) if es.nonEmpty => i }
.toSet

os.write.over(
outPath / OutFiles.millInvalidationTree,
SpanningForest.spanningTreeToJsonTree(
SpanningForest(
downstreamIndexEdges,
uncached.keys().asScala
.flatMap { uncachedTask =>
val uncachedIndex = terminalToIndex(uncachedTask)
Option.when(
// Filter out input and source tasks which do not cause downstream invalidations
// from the invalidation tree, because most of them are un-interesting and the
// user really only cares about (a) inputs that cause downstream tasks to invalidate
// or (b) non-input tasks that were invalidated alone (e.g. due to a codesig change)
!uncachedTask.task.isInstanceOf[InputImpl[_]] || edgeSourceIndices(uncachedIndex)
) {
uncachedIndex
}
}
.toSet,
true
),
i => indexToTerminal(i).render
).render(indent = 2)
)

EvaluatorCore.Results(
goals.indexed.map(results(_).map(_._1).result),
// result of flatMap may contain non-distinct entries,
Expand Down
71 changes: 71 additions & 0 deletions main/eval/src/mill/eval/EvaluatorLogs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package mill.eval

import mill.define.InputImpl
import mill.main.client.OutFiles
import mill.util.SpanningForest
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.EnumerationHasAsScala

private[mill] object EvaluatorLogs {
def logDependencyTree(
interGroupDeps: Map[Terminal, Seq[Terminal]],
indexToTerminal: Array[Terminal],
terminalToIndex: Map[Terminal, Int],
outPath: os.Path
): Unit = {
SpanningForest.writeJsonFile(
outPath / OutFiles.millDependencyTree,
indexToTerminal.map(t => interGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray),
indexToTerminal.indices.toSet,
indexToTerminal(_).render
)
}
def logInvalidationTree(
interGroupDeps: Map[Terminal, Seq[Terminal]],
indexToTerminal: Array[Terminal],
terminalToIndex: Map[Terminal, Int],
outPath: os.Path,
uncached: ConcurrentHashMap[Terminal, Unit],
changedValueHash: ConcurrentHashMap[Terminal, Unit]
): Unit = {

val reverseInterGroupDeps = interGroupDeps
.iterator
.flatMap { case (k, vs) => vs.map(_ -> k) }
.toSeq
.groupMap(_._1)(_._2)

val changedTerminalIndices = changedValueHash.keys().asScala.toSet
val downstreamIndexEdges = indexToTerminal
.map(t =>
if (changedTerminalIndices(t))
reverseInterGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray
else Array.empty[Int]
)

val edgeSourceIndices = downstreamIndexEdges
.zipWithIndex
.collect { case (es, i) if es.nonEmpty => i }
.toSet

SpanningForest.writeJsonFile(
outPath / OutFiles.millInvalidationTree,
downstreamIndexEdges,
uncached.keys().asScala
.flatMap { uncachedTask =>
val uncachedIndex = terminalToIndex(uncachedTask)
Option.when(
// Filter out input and source tasks which do not cause downstream invalidations
// from the invalidation tree, because most of them are un-interesting and the
// user really only cares about (a) inputs that cause downstream tasks to invalidate
// or (b) non-input tasks that were invalidated alone (e.g. due to a codesig change)
!uncachedTask.task.isInstanceOf[InputImpl[_]] || edgeSourceIndices(uncachedIndex)
) {
uncachedIndex
}
}
.toSet,
indexToTerminal(_).render
)
}
}
Loading

0 comments on commit 2c6157d

Please sign in to comment.