Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaoyi committed Dec 16, 2024
1 parent 09c551b commit 8d13cdb
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 103 deletions.
190 changes: 89 additions & 101 deletions main/eval/src/mill/eval/GroupEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,112 +54,102 @@ private[mill] trait GroupEvaluator {
allTransitiveClassMethods: Map[Class[_], Map[String, Method]],
executionContext: mill.api.Ctx.Fork.Api,
exclusive: Boolean
): GroupEvaluator.Results = {
logger.withPrompt {
val externalInputsHash = MurmurHash3.orderedHash(
group.items.flatMap(_.inputs).filter(!group.contains(_))
.flatMap(results(_).result.asSuccess.map(_.value._2))
): GroupEvaluator.Results = logger.withPrompt {
val externalInputsHash = MurmurHash3.orderedHash(
group.items.flatMap(_.inputs).filter(!group.contains(_))
.flatMap(results(_).result.asSuccess.map(_.value._2))
)

val sideHashes = MurmurHash3.orderedHash(group.iterator.map(_.sideHash))

val scriptsHash =
if (disableCallgraph) 0
else MurmurHash3.orderedHash(
group
.iterator
.collect { case namedTask: NamedTask[_] =>
CodeSigUtils.codeSigForTask(
namedTask,
classToTransitiveClasses,
allTransitiveClassMethods,
methodCodeHashSignatures,
constructorHashSignatures
)
}
.flatten
)

val sideHashes = MurmurHash3.orderedHash(group.iterator.map(_.sideHash))

val scriptsHash =
if (disableCallgraph) 0
else MurmurHash3.orderedHash(
group
.iterator
.collect { case namedTask: NamedTask[_] =>
CodeSigUtils.codeSigForTask(
namedTask,
classToTransitiveClasses,
allTransitiveClassMethods,
methodCodeHashSignatures,
constructorHashSignatures
)
}
.flatten
)
val inputsHash = externalInputsHash + sideHashes + classLoaderSigHash + scriptsHash

val inputsHash = externalInputsHash + sideHashes + classLoaderSigHash + scriptsHash
val labelled = terminal.asInstanceOf[Terminal.Labelled[_]]
val out = if (!labelled.task.ctx.external) outPath else externalOutPath
val paths = EvaluatorPaths.resolveDestPaths(out, Terminal.destSegments(labelled))
val cached = loadCachedJson(logger, inputsHash, labelled, paths)

val labelled = terminal.asInstanceOf[Terminal.Labelled[_]]
val out = if (!labelled.task.ctx.external) outPath else externalOutPath
val paths = EvaluatorPaths.resolveDestPaths(out, Terminal.destSegments(labelled))
val cached = loadCachedJson(logger, inputsHash, labelled, paths)
// `cached.isEmpty` means worker metadata file removed by user so recompute the worker
val upToDateWorker = loadUpToDateWorker(logger, inputsHash, labelled, cached.isEmpty)

// `cached.isEmpty` means worker metadata file removed by user so recompute the worker
val upToDateWorker = loadUpToDateWorker(logger, inputsHash, labelled, cached.isEmpty)
val cachedValueAndHash = upToDateWorker
.map((_, inputsHash))
.orElse(
cached.flatMap { case (inputHash, valOpt, valueHash) => valOpt.map((_, valueHash)) }
)

val cachedValueAndHash = upToDateWorker
.map((_, inputsHash))
.orElse(cached.flatMap { case (inputHash, valOpt, valueHash) =>
valOpt.map((_, valueHash))
})
cachedValueAndHash match {
case Some((v, hashCode)) =>
val res = Result.Success((v, hashCode))
val newResults: Map[Task[_], TaskResult[(Val, Int)]] =
Map(labelled.task -> TaskResult(res, () => res))

GroupEvaluator.Results(
newResults,
Nil,
cached = true,
inputsHash,
-1,
valueHashChanged = false
)

cachedValueAndHash match {
case Some((v, hashCode)) =>
val res = Result.Success((v, hashCode))
val newResults: Map[Task[_], TaskResult[(Val, Int)]] =
Map(labelled.task -> TaskResult(res, () => res))
case _ =>
// uncached
if (labelled.task.flushDest) os.remove.all(paths.dest)

GroupEvaluator.Results(
newResults,
Nil,
cached = true,
val (newResults, newEvaluated) =
evaluateGroup(
group,
results,
inputsHash,
-1,
valueHashChanged = false
paths = Some(paths),
maybeTargetLabel = Some(terminal.render),
counterMsg = countMsg,
verboseKeySuffix = verboseKeySuffix,
zincProblemReporter,
testReporter,
logger,
executionContext,
exclusive
)

case _ =>
// uncached
if (labelled.task.flushDest) os.remove.all(paths.dest)

val (newResults, newEvaluated) =
evaluateGroup(
group,
results,
inputsHash,
paths = Some(paths),
maybeTargetLabel = Some(terminal.render),
counterMsg = countMsg,
verboseKeySuffix = verboseKeySuffix,
zincProblemReporter,
testReporter,
logger,
executionContext,
exclusive
)

val valueHash = newResults(labelled.task) match {
case TaskResult(Result.Failure(_, Some((v, _))), _) =>
val valueHash = getValueHash(v, terminal.task, inputsHash)
handleTaskResult(v, valueHash, paths.meta, inputsHash, labelled)
valueHash

case TaskResult(Result.Success((v, _)), _) =>
val valueHash = getValueHash(v, terminal.task, inputsHash)
handleTaskResult(v, valueHash, paths.meta, inputsHash, labelled)
valueHash

case _ =>
// Wipe out any cached meta.json file that exists, so
// a following run won't look at the cached metadata file and
// assume it's associated with the possibly-borked state of the
// destPath after an evaluation failure.
os.remove.all(paths.meta)
0
}
val valueHash = newResults(labelled.task).result match {
case Result.Failure(_, Some((v, _))) => handleResult(v, paths.meta, inputsHash, labelled)
case Result.Success((v, _)) => handleResult(v, paths.meta, inputsHash, labelled)
case _ =>
// Wipe out any cached meta.json file that exists, so
// a following run won't look at the cached metadata file and
// assume it's associated with the possibly-borked state of the
// destPath after an evaluation failure.
os.remove.all(paths.meta)
0
}

GroupEvaluator.Results(
newResults,
newEvaluated.toSeq,
cached = if (labelled.task.isInstanceOf[InputImpl[_]]) null else false,
inputsHash,
cached.map(_._1).getOrElse(-1),
!cached.map(_._3).contains(valueHash)
)
}
GroupEvaluator.Results(
newResults,
newEvaluated.toSeq,
cached = if (labelled.task.isInstanceOf[InputImpl[_]]) null else false,
inputsHash,
cached.map(_._1).getOrElse(-1),
!cached.map(_._3).contains(valueHash)
)
}
}

Expand Down Expand Up @@ -294,13 +284,13 @@ private[mill] trait GroupEvaluator {
// classloader/class is the same or different doesn't matter.
def workerCacheHash(inputHash: Int): Int = inputHash + classLoaderIdentityHash

private def handleTaskResult(
private def handleResult(
v: Val,
hashCode: Int,
metaPath: os.Path,
inputsHash: Int,
labelled: Terminal.Labelled[_]
): Unit = {
): Int = {
val valueHash = getValueHash(v, labelled.task, inputsHash)
for (w <- labelled.task.asWorker)
workerCache.synchronized {
workerCache.update(w.ctx.segments, (workerCacheHash(inputsHash), v))
Expand All @@ -325,13 +315,11 @@ private[mill] trait GroupEvaluator {
for (json <- terminalResult) {
os.write.over(
metaPath,
upickle.default.stream(
Evaluator.Cached(json, hashCode, inputsHash),
indent = 4
),
upickle.default.stream(Evaluator.Cached(json, valueHash, inputsHash), indent = 4),
createFolders = true
)
}
valueHash
}

def resolveLogger(logPath: Option[os.Path], logger: mill.api.Logger): mill.api.Logger =
Expand Down
15 changes: 13 additions & 2 deletions main/eval/src/mill/eval/JsonArrayLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,18 @@ private[eval] class ChromeProfileLogger(outPath: os.Path)
extends JsonArrayLogger[ChromeProfileLogger.TraceEvent](outPath, indent = -1) {

def log(
terminal: Terminal,
term: Terminal,
cat: String,
startTime: Long,
duration: Long,
threadId: Int,
cached: Boolean
): Unit = {
log(term.render, cat, startTime, duration, threadId, cached)
}

def log(
task: String,
cat: String,
startTime: Long,
duration: Long,
Expand All @@ -86,7 +97,7 @@ private[eval] class ChromeProfileLogger(outPath: os.Path)
): Unit = {

val event = ChromeProfileLogger.TraceEvent(
name = terminal.render,
name = task,
cat = cat,
ph = "X",
ts = startTime,
Expand Down

0 comments on commit 8d13cdb

Please sign in to comment.