Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanups for Evaluator logic #4144

Merged
merged 15 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.mill
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ trait MillStableScalaModule extends MillPublishScalaModule with Mima {
// (5x) MIMA doesn't properly ignore things which are nested inside other private things
// so we have to put explicit ignores here (https://github.com/lightbend/mima/issues/771)
ProblemFilter.exclude[Problem]("mill.eval.ProfileLogger*"),
ProblemFilter.exclude[Problem]("mill.eval.ChromeProfileLogger*"),
ProblemFilter.exclude[Problem]("mill.eval.GroupEvaluator*"),
ProblemFilter.exclude[Problem]("mill.eval.EvaluatorCore*"),
ProblemFilter.exclude[Problem]("mill.eval.Tarjans*"),
Expand Down
4 changes: 4 additions & 0 deletions main/define/src/mill/define/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ abstract class Task[+T] extends Task.Ops[T] with Applyable[Task, T] {
def asCommand: Option[Command[T]] = None
def asWorker: Option[Worker[T]] = None
def self: Task[T] = this
def isExclusiveCommand: Boolean = this match {
case c: Command[_] if c.exclusive => true
case _ => false
}
}

object Task extends TaskBase {
Expand Down
134 changes: 31 additions & 103 deletions main/eval/src/mill/eval/EvaluatorCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import mill.api.Strict.Agg
import mill.api._
import mill.define._
import mill.eval.Evaluator.TaskResult
import mill.main.client.OutFiles
import mill.util._

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.collection.mutable
import scala.concurrent._
import scala.jdk.CollectionConverters.EnumerationHasAsScala

/**
* Core logic of evaluating tasks, without any user-facing helper methods
Expand Down Expand Up @@ -79,26 +78,19 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
val count = new AtomicInteger(1)
val indexToTerminal = sortedGroups.keys().toArray
val terminalToIndex = indexToTerminal.zipWithIndex.toMap
val upstreamIndexEdges =
indexToTerminal.map(t => interGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray)
os.write.over(
outPath / OutFiles.millDependencyTree,
SpanningForest.spanningTreeToJsonTree(
SpanningForest(upstreamIndexEdges, indexToTerminal.indices.toSet, true),
i => indexToTerminal(i).render
).render(indent = 2)
)

val futures = mutable.Map.empty[Terminal, Future[Option[GroupEvaluator.Results]]]
EvaluatorLogs.logDependencyTree(interGroupDeps, indexToTerminal, terminalToIndex, outPath)

// Prepare a lookup tables up front of all the method names that each class owns,
// and the class hierarchy, so during evaluation it is cheap to look up what class
// each target belongs to determine of the enclosing class code signature changed.
val (classToTransitiveClasses, allTransitiveClassMethods) =
CodeSigUtils.precomputeMethodNamesPerClass(sortedGroups)

val uncached = new java.util.concurrent.ConcurrentHashMap[Terminal, Unit]()
val changedValueHash = new java.util.concurrent.ConcurrentHashMap[Terminal, Unit]()
val uncached = new ConcurrentHashMap[Terminal, Unit]()
val changedValueHash = new ConcurrentHashMap[Terminal, Unit]()

val futures = mutable.Map.empty[Terminal, Future[Option[GroupEvaluator.Results]]]

def evaluateTerminals(
terminals: Seq[Terminal],
Expand All @@ -114,21 +106,19 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
// due to the topological order of traversal.
for (terminal <- terminals) {
val deps = interGroupDeps(terminal)
def isExclusiveCommand(t: Task[_]) = t match {
case c: Command[_] if c.exclusive => true
case _ => false
}

val group = sortedGroups.lookupKey(terminal)
val exclusiveDeps = deps.filter(d => isExclusiveCommand(d.task))
val exclusiveDeps = deps.filter(d => d.task.isExclusiveCommand)

if (!isExclusiveCommand(terminal.task) && exclusiveDeps.nonEmpty) {
if (!terminal.task.isExclusiveCommand && exclusiveDeps.nonEmpty) {
val failure = Result.Failure(
s"Non-exclusive task ${terminal.render} cannot depend on exclusive task " +
exclusiveDeps.map(_.render).mkString(", ")
)
val taskResults =
group.map(t => (t, TaskResult[(Val, Int)](failure, () => failure))).toMap
val taskResults = group
.map(t => (t, TaskResult[(Val, Int)](failure, () => failure)))
.toMap

futures(terminal) = Future.successful(
Some(GroupEvaluator.Results(taskResults, group.toSeq, false, -1, -1, false))
)
Expand All @@ -150,19 +140,13 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
.toMap

val startTime = System.nanoTime() / 1000
val targetLabel = terminal match {
case Terminal.Task(task) => None
case t: Terminal.Labelled[_] => Some(Terminal.printTerm(t))
}

// should we log progress?
val logRun = targetLabel.isDefined && {
val inputResults = for {
target <- group.indexed.filterNot(upstreamResults.contains)
item <- target.inputs.filterNot(group.contains)
} yield upstreamResults(item).map(_._1)
inputResults.forall(_.result.isInstanceOf[Result.Success[_]])
}
val inputResults = for {
target <- group.indexed.filterNot(upstreamResults.contains)
item <- target.inputs.filterNot(group.contains)
} yield upstreamResults(item).map(_._1)
val logRun = inputResults.forall(_.result.isInstanceOf[Result.Success[_]])

val tickerPrefix = terminal.render.collect {
case targetLabel if logRun && logger.enableTicker => targetLabel
Expand Down Expand Up @@ -195,31 +179,15 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
failed.set(true)

val endTime = System.nanoTime() / 1000

val duration = endTime - startTime

chromeProfileLogger.log(
task = Terminal.printTerm(terminal),
cat = "job",
startTime = startTime,
duration = duration,
threadId = threadNumberer.getThreadId(Thread.currentThread()),
cached = res.cached
)
val threadId = threadNumberer.getThreadId(Thread.currentThread())
chromeProfileLogger.log(terminal, "job", startTime, duration, threadId, res.cached)

if (!res.cached) uncached.put(terminal, ())
if (res.valueHashChanged) changedValueHash.put(terminal, ())

profileLogger.log(
ProfileLogger.Timing(
terminal.render,
(duration / 1000).toInt,
res.cached,
res.valueHashChanged,
deps.map(_.render),
res.inputsHash,
res.previousInputsHash
)
)
profileLogger.log(terminal, duration, res, deps)

Some(res)
}
Expand All @@ -241,12 +209,7 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {

val tasksTransitive = tasksTransitive0.toSet
val (tasks, leafExclusiveCommands) = terminals0.partition {
case Terminal.Labelled(t, _) =>
if (tasksTransitive.contains(t)) true
else t match {
case t: Command[_] => !t.exclusive
case _ => false
}
case Terminal.Labelled(t, _) => tasksTransitive.contains(t) || !t.isExclusiveCommand
case _ => !serialCommandExec
}

Expand All @@ -260,6 +223,15 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {

val finishedOptsMap = (nonExclusiveResults ++ exclusiveResults).toMap

EvaluatorLogs.logInvalidationTree(
interGroupDeps,
indexToTerminal,
terminalToIndex,
outPath,
uncached,
changedValueHash
)

val results0: Vector[(Task[_], TaskResult[(Val, Int)])] = terminals0
.flatMap { t =>
sortedGroups.lookupKey(t).flatMap { t0 =>
Expand All @@ -272,50 +244,6 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {

val results: Map[Task[_], TaskResult[(Val, Int)]] = results0.toMap

val reverseInterGroupDeps = interGroupDeps
.iterator
.flatMap { case (k, vs) => vs.map(_ -> k) }
.toSeq
.groupMap(_._1)(_._2)

val changedTerminalIndices = changedValueHash.keys().asScala.toSet
val downstreamIndexEdges = indexToTerminal
.map(t =>
if (changedTerminalIndices(t))
reverseInterGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray
else Array.empty[Int]
)

val edgeSourceIndices = downstreamIndexEdges
.zipWithIndex
.collect { case (es, i) if es.nonEmpty => i }
.toSet

os.write.over(
outPath / OutFiles.millInvalidationTree,
SpanningForest.spanningTreeToJsonTree(
SpanningForest(
downstreamIndexEdges,
uncached.keys().asScala
.flatMap { uncachedTask =>
val uncachedIndex = terminalToIndex(uncachedTask)
Option.when(
// Filter out input and source tasks which do not cause downstream invalidations
// from the invalidation tree, because most of them are un-interesting and the
// user really only cares about (a) inputs that cause downstream tasks to invalidate
// or (b) non-input tasks that were invalidated alone (e.g. due to a codesig change)
!uncachedTask.task.isInstanceOf[InputImpl[_]] || edgeSourceIndices(uncachedIndex)
) {
uncachedIndex
}
}
.toSet,
true
),
i => indexToTerminal(i).render
).render(indent = 2)
)

EvaluatorCore.Results(
goals.indexed.map(results(_).map(_._1).result),
// result of flatMap may contain non-distinct entries,
Expand Down
71 changes: 71 additions & 0 deletions main/eval/src/mill/eval/EvaluatorLogs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package mill.eval

import mill.define.InputImpl
import mill.main.client.OutFiles
import mill.util.SpanningForest
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.EnumerationHasAsScala

private[mill] object EvaluatorLogs {
def logDependencyTree(
interGroupDeps: Map[Terminal, Seq[Terminal]],
indexToTerminal: Array[Terminal],
terminalToIndex: Map[Terminal, Int],
outPath: os.Path
): Unit = {
SpanningForest.writeJsonFile(
outPath / OutFiles.millDependencyTree,
indexToTerminal.map(t => interGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray),
indexToTerminal.indices.toSet,
indexToTerminal(_).render
)
}
def logInvalidationTree(
interGroupDeps: Map[Terminal, Seq[Terminal]],
indexToTerminal: Array[Terminal],
terminalToIndex: Map[Terminal, Int],
outPath: os.Path,
uncached: ConcurrentHashMap[Terminal, Unit],
changedValueHash: ConcurrentHashMap[Terminal, Unit]
): Unit = {

val reverseInterGroupDeps = interGroupDeps
.iterator
.flatMap { case (k, vs) => vs.map(_ -> k) }
.toSeq
.groupMap(_._1)(_._2)

val changedTerminalIndices = changedValueHash.keys().asScala.toSet
val downstreamIndexEdges = indexToTerminal
.map(t =>
if (changedTerminalIndices(t))
reverseInterGroupDeps.getOrElse(t, Nil).map(terminalToIndex).toArray
else Array.empty[Int]
)

val edgeSourceIndices = downstreamIndexEdges
.zipWithIndex
.collect { case (es, i) if es.nonEmpty => i }
.toSet

SpanningForest.writeJsonFile(
outPath / OutFiles.millInvalidationTree,
downstreamIndexEdges,
uncached.keys().asScala
.flatMap { uncachedTask =>
val uncachedIndex = terminalToIndex(uncachedTask)
Option.when(
// Filter out input and source tasks which do not cause downstream invalidations
// from the invalidation tree, because most of them are un-interesting and the
// user really only cares about (a) inputs that cause downstream tasks to invalidate
// or (b) non-input tasks that were invalidated alone (e.g. due to a codesig change)
!uncachedTask.task.isInstanceOf[InputImpl[_]] || edgeSourceIndices(uncachedIndex)
) {
uncachedIndex
}
}
.toSet,
indexToTerminal(_).render
)
}
}
Loading
Loading