Skip to content

Commit

Permalink
Refactor route finding (#2974)
Browse files Browse the repository at this point in the history
Refactor the route finding to share the same code for payment routes and onion message routes.
  • Loading branch information
thomash-acinq authored Jan 8, 2025
1 parent a35a972 commit e99fa2e
Show file tree
Hide file tree
Showing 20 changed files with 643 additions and 670 deletions.
5 changes: 4 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,10 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
for {
ignoredChannels <- getChannelDescs(ignoreShortChannelIds.toSet)
ignore = Ignore(ignoreNodeIds.toSet, ignoredChannels)
response <- (appKit.router ? RouteRequest(sourceNodeId, target, routeParams1, ignore)).mapTo[RouteResponse]
response <- appKit.router.toTyped.ask[PaymentRouteResponse](replyTo => RouteRequest(replyTo, sourceNodeId, target, routeParams1, ignore)).flatMap {
case r: RouteResponse => Future.successful(r)
case PaymentRouteNotFound(error) => Future.failed(error)
}
} yield response
case Left(t) => Future.failed(t)
}
Expand Down
18 changes: 9 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios}
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, PaymentWeightRatios}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf}
import fr.acinq.eclair.tor.Socks5ProxyParams
Expand Down Expand Up @@ -450,20 +450,20 @@ object NodeParams extends Logging {
maxFeeFlat = Satoshi(config.getLong("boundaries.max-fee-flat-sat")).toMilliSatoshi,
maxFeeProportional = config.getDouble("boundaries.max-fee-proportional-percent") / 100.0),
heuristics = if (config.getBoolean("use-ratios")) {
Left(WeightRatios(
PaymentWeightRatios(
baseFactor = config.getDouble("ratios.base"),
cltvDeltaFactor = config.getDouble("ratios.cltv"),
ageFactor = config.getDouble("ratios.channel-age"),
capacityFactor = config.getDouble("ratios.channel-capacity"),
hopCost = getRelayFees(config.getConfig("hop-cost")),
))
hopFees = getRelayFees(config.getConfig("hop-cost")),
)
} else {
Right(HeuristicsConstants(
HeuristicsConstants(
lockedFundsRisk = config.getDouble("locked-funds-risk"),
failureCost = getRelayFees(config.getConfig("failure-cost")),
hopCost = getRelayFees(config.getConfig("hop-cost")),
failureFees = getRelayFees(config.getConfig("failure-cost")),
hopFees = getRelayFees(config.getConfig("hop-cost")),
useLogProbability = config.getBoolean("use-log-probability"),
))
)
},
mpp = MultiPartParams(
Satoshi(config.getLong("mpp.min-amount-satoshis")).toMilliSatoshi,
Expand All @@ -482,7 +482,7 @@ object NodeParams extends Logging {
val ratioBase = config.getDouble("ratios.base")
val ratioAge = config.getDouble("ratios.channel-age")
val ratioCapacity = config.getDouble("ratios.channel-capacity")
MessageRouteParams(maxRouteLength, Graph.MessagePath.WeightRatios(ratioBase, ratioAge, ratioCapacity))
MessageRouteParams(maxRouteLength, Graph.MessageWeightRatios(ratioBase, ratioAge, ratioCapacity))
}

val unhandledExceptionStrategy = config.getString("channel.unhandled-exception-strategy") match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package fr.acinq.eclair.payment.receive

import akka.actor.Actor.Receive
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps}
import akka.actor.{ActorContext, ActorRef, PoisonPill, typed}
import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter}
import akka.pattern.ask
import akka.util.Timeout
import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Crypto}
Expand All @@ -37,7 +37,7 @@ import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.offer.OfferManager
import fr.acinq.eclair.router.BlindedRouteCreation.{aggregatePaymentInfo, createBlindedRouteFromHops, createBlindedRouteWithoutHops}
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams}
import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams, PaymentRouteResponse}
import fr.acinq.eclair.wire.protocol.OfferTypes.{InvoiceRequest, InvoiceTlv}
import fr.acinq.eclair.wire.protocol.PaymentOnion.FinalPayload
import fr.acinq.eclair.wire.protocol._
Expand Down Expand Up @@ -376,8 +376,7 @@ object MultiPartHandler {
val paymentInfo = aggregatePaymentInfo(r.amount, dummyHops, nodeParams.channelConf.minFinalExpiryDelta)
Future.successful(PaymentBlindedRoute(contactInfo, paymentInfo))
} else {
implicit val timeout: Timeout = 10.seconds
r.router.ask(Router.FinalizeRoute(Router.PredefinedNodeRoute(r.amount, route.nodes))).mapTo[Router.RouteResponse].map(routeResponse => {
r.router.toTyped.ask[PaymentRouteResponse](replyTo => Router.FinalizeRoute(replyTo, Router.PredefinedNodeRoute(r.amount, route.nodes)))(10.seconds, context.system.scheduler).mapTo[Router.RouteResponse].map(routeResponse => {
val clearRoute = routeResponse.routes.head
val blindedRoute = createBlindedRouteFromHops(clearRoute.hops ++ dummyHops, r.pathId, nodeParams.channelConf.htlcMinimum, route.maxFinalExpiryDelta.toCltvExpiry(nodeParams.currentBlockHeight))
val contactInfo = route.shortChannelIdDir_opt match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair.payment.send

import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.actor.{ActorRef, FSM, Props, Status}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.ByteVector32
Expand Down Expand Up @@ -58,7 +59,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
val routeParams = r.routeParams.copy(randomize = false) // we don't randomize the first attempt, regardless of configuration choices
log.debug("sending {} with maximum fee {}", r.recipient.totalAmount, r.routeParams.getMaxFee(r.recipient.totalAmount))
val d = PaymentProgress(r, r.maxAttempts, Map.empty, Ignore.empty, retryRouteRequest = false, failures = Nil)
router ! createRouteRequest(nodeParams, routeParams, d, cfg)
router ! createRouteRequest(self, nodeParams, routeParams, d, cfg)
goto(WAIT_FOR_ROUTES) using d
}

Expand All @@ -74,11 +75,11 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
// remaining amount. In that case we discard these routes and send a new request to the router.
log.debug("discarding routes, another child payment failed so we need to recompute them ({} payments still pending for {})", d.pending.size, d.pending.values.map(_.amount).sum)
val routeParams = d.request.routeParams.copy(randomize = true) // we randomize route selection when we retry
router ! createRouteRequest(nodeParams, routeParams, d, cfg)
router ! createRouteRequest(self, nodeParams, routeParams, d, cfg)
stay() using d.copy(retryRouteRequest = false)
}

case Event(Status.Failure(t), d: PaymentProgress) =>
case Event(PaymentRouteNotFound(t), d: PaymentProgress) =>
log.warning("router error: {}", t.getMessage)
// If no route can be found, we will retry once with the channels that we previously ignored.
// Channels are mostly ignored for temporary reasons, likely because they didn't have enough balance to forward
Expand All @@ -87,7 +88,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
if (d.ignore.channels.nonEmpty) {
log.debug("retry sending payment without ignoring channels {} ({} payments still pending for {})", d.ignore.channels.map(_.shortChannelId).mkString(","), d.pending.size, d.pending.values.map(_.amount).sum)
val routeParams = d.request.routeParams.copy(randomize = true) // we randomize route selection when we retry
router ! createRouteRequest(nodeParams, routeParams, d, cfg).copy(ignore = d.ignore.emptyChannels())
router ! createRouteRequest(self, nodeParams, routeParams, d, cfg).copy(ignore = d.ignore.emptyChannels())
retriedFailedChannels = true
stay() using d.copy(remainingAttempts = (d.remainingAttempts - 1).max(0), ignore = d.ignore.emptyChannels(), retryRouteRequest = false)
} else {
Expand Down Expand Up @@ -135,7 +136,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
log.debug("child payment failed, retrying payment ({} payments still pending for {})", stillPending.size, stillPending.values.map(_.amount).sum)
val routeParams = d.request.routeParams.copy(randomize = true) // we randomize route selection when we retry
val d1 = d.copy(pending = stillPending, ignore = ignore1, failures = d.failures ++ pf.failures, request = d.request.copy(recipient = recipient1), retryRouteRequest = false)
router ! createRouteRequest(nodeParams, routeParams, d1, cfg)
router ! createRouteRequest(self, nodeParams, routeParams, d1, cfg)
goto(WAIT_FOR_ROUTES) using d1
}

Expand Down Expand Up @@ -164,7 +165,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
gotoSucceededOrStop(PaymentSucceeded(d.request, ps.paymentPreimage, ps.parts, d.pending - ps.parts.head.id))

case Event(_: RouteResponse, _) => stay()
case Event(_: Status.Failure, _) => stay()
case Event(_: PaymentRouteNotFound, _) => stay()
}

when(PAYMENT_SUCCEEDED) {
Expand All @@ -190,7 +191,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
}

case Event(_: RouteResponse, _) => stay()
case Event(_: Status.Failure, _) => stay()
case Event(_: PaymentRouteNotFound, _) => stay()
}

private def spawnChildPaymentFsm(childId: UUID): ActorRef = {
Expand Down Expand Up @@ -368,8 +369,8 @@ object MultiPartPaymentLifecycle {
*/
case class PaymentSucceeded(request: SendMultiPartPayment, preimage: ByteVector32, parts: Seq[PartialPayment], pending: Set[UUID]) extends Data

private def createRouteRequest(nodeParams: NodeParams, routeParams: RouteParams, d: PaymentProgress, cfg: SendPaymentConfig): RouteRequest = {
RouteRequest(nodeParams.nodeId, d.request.recipient, routeParams, d.ignore, allowMultiPart = true, d.pending.values.toSeq, Some(cfg.paymentContext))
private def createRouteRequest(replyTo: ActorRef, nodeParams: NodeParams, routeParams: RouteParams, d: PaymentProgress, cfg: SendPaymentConfig): RouteRequest = {
RouteRequest(replyTo.toTyped, nodeParams.nodeId, d.request.recipient, routeParams, d.ignore, allowMultiPart = true, d.pending.values.toSeq, Some(cfg.paymentContext))
}

private def createChildPayment(replyTo: ActorRef, route: Route, request: SendMultiPartPayment): SendPaymentToRoute = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
case Event(request: SendPaymentToRoute, WaitingForRequest) =>
log.debug("sending {} to route {}", request.amount, request.printRoute())
request.route.fold(
hops => router ! FinalizeRoute(hops, request.recipient.extraEdges, paymentContext = Some(cfg.paymentContext)),
hops => router ! FinalizeRoute(self, hops, request.recipient.extraEdges, paymentContext = Some(cfg.paymentContext)),
route => self ! RouteResponse(route :: Nil)
)
if (cfg.storeInDb) {
Expand All @@ -64,7 +64,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A

case Event(request: SendPaymentToNode, WaitingForRequest) =>
log.debug("sending {} to {}", request.amount, request.recipient.nodeId)
router ! RouteRequest(nodeParams.nodeId, request.recipient, request.routeParams, paymentContext = Some(cfg.paymentContext))
router ! RouteRequest(self, nodeParams.nodeId, request.recipient, request.routeParams, paymentContext = Some(cfg.paymentContext))
if (cfg.storeInDb) {
paymentsDb.addOutgoingPayment(OutgoingPayment(id, cfg.parentId, cfg.externalId, paymentHash, cfg.paymentType, request.amount, request.recipient.totalAmount, request.recipient.nodeId, TimestampMilli.now(), cfg.invoice, cfg.payerKey_opt, OutgoingPaymentStatus.Pending))
}
Expand All @@ -84,7 +84,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
myStop(request, Left(PaymentFailed(id, paymentHash, failures :+ LocalFailure(request.amount, route.fullRoute, error))))
}

case Event(Status.Failure(t), WaitingForRoute(request, failures, _)) =>
case Event(PaymentRouteNotFound(t), WaitingForRoute(request, failures, _)) =>
Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(LocalFailure(request.amount, Nil, t))).increment()
myStop(request, Left(PaymentFailed(id, paymentHash, failures :+ LocalFailure(request.amount, Nil, t))))
}
Expand Down Expand Up @@ -135,7 +135,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
data.request match {
case request: SendPaymentToNode =>
val ignore1 = PaymentFailure.updateIgnored(failure, data.ignore)
router ! RouteRequest(nodeParams.nodeId, data.recipient, request.routeParams, ignore1, paymentContext = Some(cfg.paymentContext))
router ! RouteRequest(self, nodeParams.nodeId, data.recipient, request.routeParams, ignore1, paymentContext = Some(cfg.paymentContext))
goto(WAITING_FOR_ROUTE) using WaitingForRoute(data.request, data.failures :+ failure, ignore1)
case _: SendPaymentToRoute =>
log.error("unexpected retry during SendPaymentToRoute")
Expand Down Expand Up @@ -241,7 +241,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
log.error("unexpected retry during SendPaymentToRoute")
stop(FSM.Normal)
case request: SendPaymentToNode =>
router ! RouteRequest(nodeParams.nodeId, recipient1, request.routeParams, ignore1, paymentContext = Some(cfg.paymentContext))
router ! RouteRequest(self, nodeParams.nodeId, recipient1, request.routeParams, ignore1, paymentContext = Some(cfg.paymentContext))
goto(WAITING_FOR_ROUTE) using WaitingForRoute(request.copy(recipient = recipient1), failures :+ failure, ignore1)
}
} else {
Expand All @@ -252,7 +252,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
log.error("unexpected retry during SendPaymentToRoute")
stop(FSM.Normal)
case request: SendPaymentToNode =>
router ! RouteRequest(nodeParams.nodeId, recipient, request.routeParams, ignore + nodeId, paymentContext = Some(cfg.paymentContext))
router ! RouteRequest(self, nodeParams.nodeId, recipient, request.routeParams, ignore + nodeId, paymentContext = Some(cfg.paymentContext))
goto(WAITING_FOR_ROUTE) using WaitingForRoute(request, failures :+ failure, ignore + nodeId)
}
}
Expand All @@ -266,7 +266,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
log.error("unexpected retry during SendPaymentToRoute")
stop(FSM.Normal)
case request: SendPaymentToNode =>
router ! RouteRequest(nodeParams.nodeId, recipient, request.routeParams, ignore1, paymentContext = Some(cfg.paymentContext))
router ! RouteRequest(self, nodeParams.nodeId, recipient, request.routeParams, ignore1, paymentContext = Some(cfg.paymentContext))
goto(WAITING_FOR_ROUTE) using WaitingForRoute(request, failures :+ failure, ignore1)
}
case Right(e@Sphinx.DecryptedFailurePacket(nodeId, failureMessage)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fr.acinq.eclair.io.Peer.PeerRoutingMessage
import fr.acinq.eclair.io.Switchboard.RouterPeerConf
import fr.acinq.eclair.io.{ClientSpawner, Peer, PeerConnection, Switchboard}
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios}
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, PaymentPathWeight, PaymentWeightRatios, WeightRatios}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire.protocol.CommonCodecs._
Expand Down Expand Up @@ -57,27 +57,32 @@ object EclairInternalsSerializer {
("feeBase" | millisatoshi) ::
("feeProportionalMillionths" | int64)).as[RelayFees]

val weightRatiosCodec: Codec[WeightRatios] = (
val paymentWeightRatiosCodec: Codec[PaymentWeightRatios] = (
("baseFactor" | double) ::
("cltvDeltaFactor" | double) ::
("ageFactor" | double) ::
("capacityFactor" | double) ::
("hopCost" | relayFeesCodec)).as[WeightRatios]
("hopCost" | relayFeesCodec)).as[PaymentWeightRatios]

val heuristicsConstantsCodec: Codec[HeuristicsConstants] = (
("lockedFundsRisk" | double) ::
("failureCost" | relayFeesCodec) ::
("hopCost" | relayFeesCodec) ::
("useLogProbability" | bool(8))).as[HeuristicsConstants]

val weightRatiosCodec: Codec[WeightRatios[PaymentPathWeight]] =
discriminated[WeightRatios[PaymentPathWeight]].by(uint8)
.typecase(0x00, paymentWeightRatiosCodec)
.typecase(0xff, heuristicsConstantsCodec)

val multiPartParamsCodec: Codec[MultiPartParams] = (
("minPartAmount" | millisatoshi) ::
("maxParts" | int32)).as[MultiPartParams]

val pathFindingConfCodec: Codec[PathFindingConf] = (
("randomize" | bool(8)) ::
("boundaries" | searchBoundariesCodec) ::
("heuristicsParams" | either(bool(8), weightRatiosCodec, heuristicsConstantsCodec)) ::
("heuristicsParams" | weightRatiosCodec) ::
("mpp" | multiPartParamsCodec) ::
("experimentName" | utf8_32) ::
("experimentPercentage" | int32)).as[PathFindingConf]
Expand All @@ -90,7 +95,7 @@ object EclairInternalsSerializer {
("maxRouteLength" | int32) ::
(("baseFactor" | double) ::
("ageFactor" | double) ::
("capacityFactor" | double)).as[Graph.MessagePath.WeightRatios]).as[MessageRouteParams]
("capacityFactor" | double)).as[Graph.MessageWeightRatios]).as[MessageRouteParams]

val routerConfCodec: Codec[RouterConf] = (
("watchSpentWindow" | finiteDurationCodec) ::
Expand Down
Loading

0 comments on commit e99fa2e

Please sign in to comment.