Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/Jobs/MonitorRebalancesJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public async Task Execute(IJobExecutionContext context)
var window = TimeSpan.FromHours(Constants.REBALANCE_RECONCILE_TERMINAL_WINDOW_HOURS);
var rebalances = await _rebalanceRepository.GetReconcilable(window);

_logger.LogInformation(
"{JobName} found {Count} reconcilable rebalance(s) within window {WindowHours}h",
nameof(MonitorRebalancesJob), rebalances.Count, window.TotalHours);

foreach (var rebalance in rebalances)
{
try
Expand All @@ -85,7 +89,12 @@ public async Task Execute(IJobExecutionContext context)
private async Task ReconcileAsync(Rebalance rebalance, CancellationToken ct)
{
if (string.IsNullOrEmpty(rebalance.PaymentHashHex))
{
_logger.LogDebug(
"Rebalance {RebalanceId} (status={Status}) has no PaymentHashHex; nothing to reconcile against LND",
rebalance.Id, rebalance.Status);
return;
}

if (rebalance.Node == null)
{
Expand All @@ -100,11 +109,15 @@ private async Task ReconcileAsync(Rebalance rebalance, CancellationToken ct)
}
catch (FormatException)
{
_logger.LogWarning("Rebalance {RebalanceId} has malformed PaymentHashHex; skipping reconciliation",
rebalance.Id);
_logger.LogWarning("Rebalance {RebalanceId} has malformed PaymentHashHex {PaymentHashHex}; skipping reconciliation",
rebalance.Id, rebalance.PaymentHashHex);
return;
}

_logger.LogInformation(
"Reconciling rebalance {RebalanceId} (node={NodeId}, status={Status}, hash={PaymentHashHex}, attempt={Attempt}) against LND",
rebalance.Id, rebalance.NodeId, rebalance.Status, rebalance.PaymentHashHex, rebalance.AttemptNumber);

var payment = await _lightningService.TrackPaymentV2Async(rebalance.Node, paymentHash, ct);

if (payment == null)
Expand All @@ -118,6 +131,9 @@ private async Task ReconcileAsync(Rebalance rebalance, CancellationToken ct)
var oldStatus = rebalance.Status;
rebalance.Status = RebalanceStatus.Failed;
_rebalanceRepository.Update(rebalance);
_logger.LogWarning(
"Rebalance {RebalanceId} has no record in LND for hash {PaymentHashHex}; flipping {OldStatus} -> Failed",
rebalance.Id, rebalance.PaymentHashHex, oldStatus);
await _auditService.LogSystemAsync(
AuditActionType.RebalanceCompleted,
AuditEventType.Failure,
Expand All @@ -131,28 +147,48 @@ await _auditService.LogSystemAsync(
NewStatus = rebalance.Status.ToString(),
});
}
else
{
_logger.LogInformation(
"Rebalance {RebalanceId} TrackPaymentV2 returned no payment; row already terminal ({Status}), leaving as-is",
rebalance.Id, rebalance.Status);
}

return;
}

if (IsNonTerminalLndStatus(payment.Status))
{
// LND still in flight — nothing to reconcile yet.
// LND still in flight — emitted at info so an operator can spot rebalances stuck
// in flight across monitor sweeps (same RebalanceId appearing tick after tick).
_logger.LogInformation(
"Rebalance {RebalanceId} still {LndStatus} in LND (db status={DbStatus}, attempt={Attempt}, hash={PaymentHashHex}); skipping update",
rebalance.Id, payment.Status, rebalance.Status, rebalance.AttemptNumber, rebalance.PaymentHashHex);
return;
}

var previousStatus = rebalance.Status;
RebalanceService.ApplyTerminalPayment(rebalance, payment);

if (rebalance.Status == previousStatus)
{
_logger.LogInformation(
"Rebalance {RebalanceId} terminal status {Status} matches LND ({LndStatus}); no update needed",
rebalance.Id, rebalance.Status, payment.Status);
return;
}

_rebalanceRepository.Update(rebalance);

var eventType = rebalance.Status == RebalanceStatus.Succeeded
? AuditEventType.Success
: AuditEventType.Failure;

_logger.LogInformation(
"Rebalance {RebalanceId} reconciled: {OldStatus} -> {NewStatus} (LND status={LndStatus}, failureReason={LndFailureReason}, feeSats={FeeSats}, ppm={Ppm})",
rebalance.Id, previousStatus, rebalance.Status, payment.Status, payment.FailureReason,
rebalance.FeePaidSats, rebalance.EffectivePpm);

await _auditService.LogSystemAsync(
AuditActionType.RebalanceCompleted,
eventType,
Expand Down
63 changes: 61 additions & 2 deletions src/Services/RebalanceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public async Task<Rebalance> RebalanceAsync(RebalanceRequest request, Cancellati
throw new InvalidOperationException($"Failed to persist rebalance: {addError}");
}

_logger.LogInformation(
"Rebalance {RebalanceId} created (node={NodeId} '{NodeName}', amount={AmountSats} sats, maxFeePct={MaxFeePct}, sourceChanId={SourceChanIdLnd}, targetPubkey={TargetPubkey}, isManual={IsManual})",
rebalance.Id, node.Id, node.Name, rebalance.RequestedAmountSats, rebalance.MaxFeePct,
rebalance.SourceChanIdLnd, rebalance.TargetPubkey, rebalance.IsManual);

await _auditService.LogAsync(
AuditActionType.RebalanceInitiated,
AuditEventType.Attempt,
Expand Down Expand Up @@ -179,13 +184,21 @@ public async Task<Rebalance> ExecuteAsync(int rebalanceId, CancellationToken ct
if (node == null)
throw new InvalidOperationException($"Node {rebalance.NodeId} not found for rebalance {rebalanceId}");

_logger.LogInformation(
"Executing rebalance {RebalanceId} attempt {Attempt}/{MaxAttempts} (node={NodeId} '{NodeName}', amount={AmountSats} sats, maxFeePct={MaxFeePct}, timeoutSeconds={TimeoutSeconds})",
rebalance.Id, rebalance.AttemptNumber, rebalance.MaxAttempts ?? Constants.REBALANCE_MAX_ATTEMPTS,
node.Id, node.Name, rebalance.SatsAmount, rebalance.MaxFeePct, rebalance.TimeoutSeconds);

try
{
var memo = $"NG rebalance #{rebalance.Id} attempt {rebalance.AttemptNumber}";
var invoice = await _lightningService.AddInvoiceAsync(node, rebalance.SatsAmount, memo,
ComputeInvoiceExpirySeconds(rebalance));
var invoiceExpiry = ComputeInvoiceExpirySeconds(rebalance);
var invoice = await _lightningService.AddInvoiceAsync(node, rebalance.SatsAmount, memo, invoiceExpiry);
if (invoice == null || string.IsNullOrEmpty(invoice.PaymentRequest))
{
_logger.LogWarning(
"Rebalance {RebalanceId} failed to create self-invoice on node {NodeId}",
rebalance.Id, node.Id);
rebalance.Status = RebalanceStatus.Failed;
_rebalanceRepository.Update(rebalance);
await _auditService.LogAsync(AuditActionType.RebalanceCompleted, AuditEventType.Failure,
Expand All @@ -200,10 +213,18 @@ await _auditService.LogAsync(AuditActionType.RebalanceCompleted, AuditEventType.
rebalance.PaymentHashHex = Convert.ToHexString(invoice.RHash.ToByteArray()).ToLowerInvariant();
_rebalanceRepository.Update(rebalance);

_logger.LogInformation(
"Rebalance {RebalanceId} self-invoice created (hash={PaymentHashHex}, expirySeconds={ExpirySeconds})",
rebalance.Id, rebalance.PaymentHashHex, invoiceExpiry);

var feeLimitMsat = ComputeFeeLimitMsat(rebalance.SatsAmount, rebalance.MaxFeePct);

rebalance.Status = RebalanceStatus.Probing;
_rebalanceRepository.Update(rebalance);
_logger.LogInformation(
"Rebalance {RebalanceId} probing for route (amount={AmountSats} sats, feeLimitMsat={FeeLimitMsat}, probeBackoffRatio={ProbeBackoffRatio})",
rebalance.Id, rebalance.SatsAmount, feeLimitMsat,
rebalance.ProbeBackoffRatio ?? Constants.REBALANCE_PROBE_BACKOFF_RATIO);
await _auditService.LogAsync(AuditActionType.RebalanceProbing, AuditEventType.Attempt,
AuditObjectType.Rebalance, rebalance.Id.ToString(),
new { rebalance.AttemptNumber, rebalance.SatsAmount, FeeLimitMsat = feeLimitMsat });
Expand All @@ -214,6 +235,9 @@ await _auditService.LogAsync(AuditActionType.RebalanceProbing, AuditEventType.At

if (probe is ProbeResult.NoRoute noRoute)
{
_logger.LogInformation(
"Rebalance {RebalanceId} probe returned NoRoute: {Reason}",
rebalance.Id, noRoute.Reason);
rebalance.Status = RebalanceStatus.NoRoute;
_rebalanceRepository.Update(rebalance);
await _auditService.LogAsync(AuditActionType.RebalanceProbing, AuditEventType.Failure,
Expand All @@ -227,12 +251,18 @@ await _auditService.LogAsync(AuditActionType.RebalanceCompleted, AuditEventType.
}

var success = (ProbeResult.Success)probe;
_logger.LogInformation(
"Rebalance {RebalanceId} probe succeeded (probedAmountSats={ProbedAmountSats}, routeHops={RouteHops})",
rebalance.Id, success.AmountSats, success.Route.Hops.Count);
await _auditService.LogAsync(AuditActionType.RebalanceProbing, AuditEventType.Success,
AuditObjectType.Rebalance, rebalance.Id.ToString(),
new { ProbedAmountSats = success.AmountSats, RouteHops = success.Route.Hops.Count });

if (success.AmountSats < rebalance.SatsAmount)
{
_logger.LogInformation(
"Rebalance {RebalanceId} probe shrunk amount from {OriginalSats} to {ProbedSats} sats",
rebalance.Id, rebalance.SatsAmount, success.AmountSats);
rebalance.SatsAmount = success.AmountSats;
feeLimitMsat = ComputeFeeLimitMsat(rebalance.SatsAmount, rebalance.MaxFeePct);
}
Expand All @@ -244,6 +274,10 @@ await _auditService.LogAsync(AuditActionType.RebalanceProbing, AuditEventType.Su
? [rebalance.SourceChanIdLnd.Value]
: Array.Empty<ulong>();

_logger.LogInformation(
"Rebalance {RebalanceId} dispatching SendPaymentV2 (amount={AmountSats} sats, feeLimitMsat={FeeLimitMsat}, timeoutSeconds={TimeoutSeconds}, hash={PaymentHashHex})",
rebalance.Id, rebalance.SatsAmount, feeLimitMsat, rebalance.TimeoutSeconds, rebalance.PaymentHashHex);

// Probe gave us feasibility + (possibly shrunk) amount; the actual settle goes
// through LND's full pathfinder via SendPaymentV2 — that gives us MPP and
// built-in per-route retries inside one payment, which the SendToRouteV2 path
Expand All @@ -259,20 +293,32 @@ await _auditService.LogAsync(AuditActionType.RebalanceProbing, AuditEventType.Su

ApplyTerminalPayment(rebalance, payment);

_logger.LogInformation(
"Rebalance {RebalanceId} payment terminal: lndStatus={LndStatus}, failureReason={FailureReason}, dbStatus={DbStatus}, feeSats={FeeSats}, ppm={Ppm}",
rebalance.Id, payment.Status, payment.FailureReason, rebalance.Status,
rebalance.FeePaidSats, rebalance.EffectivePpm);

// Defensive post-hoc ppm guard. Should never fire because feeLimitMsat clamps,
// but if it does we surface a distinct status for operators / Grafana.
var maxFeePpmCap = (long)Math.Round(rebalance.MaxFeePct * 10_000d, MidpointRounding.AwayFromZero);
if (rebalance.Status == RebalanceStatus.Succeeded
&& rebalance.EffectivePpm.HasValue
&& rebalance.EffectivePpm.Value > maxFeePpmCap)
{
_logger.LogWarning(
"Rebalance {RebalanceId} effective ppm {EffectivePpm} exceeded cap {MaxFeePpmCap}; flipping Succeeded -> ExceededFeeLimit",
rebalance.Id, rebalance.EffectivePpm.Value, maxFeePpmCap);
rebalance.Status = RebalanceStatus.ExceededFeeLimit;
}

_rebalanceRepository.Update(rebalance);

if (rebalance.Status == RebalanceStatus.Succeeded)
{
_logger.LogInformation(
"Rebalance {RebalanceId} SUCCEEDED on attempt {Attempt} (feeSats={FeeSats}, ppm={Ppm}, actualAmountSats={ActualAmountSats})",
rebalance.Id, rebalance.AttemptNumber, rebalance.FeePaidSats, rebalance.EffectivePpm,
rebalance.SatsAmount);
await _auditService.LogAsync(AuditActionType.RebalanceCompleted, AuditEventType.Success,
AuditObjectType.Rebalance, rebalance.Id.ToString(),
new
Expand All @@ -285,6 +331,9 @@ await _auditService.LogAsync(AuditActionType.RebalanceCompleted, AuditEventType.
}
else
{
_logger.LogInformation(
"Rebalance {RebalanceId} attempt {Attempt} ended status={Status} (lndFailureReason={FailureReason})",
rebalance.Id, rebalance.AttemptNumber, rebalance.Status, payment.FailureReason);
await _auditService.LogAsync(AuditActionType.RebalanceCompleted, AuditEventType.Failure,
AuditObjectType.Rebalance, rebalance.Id.ToString(),
new
Expand Down Expand Up @@ -380,12 +429,18 @@ private async Task ScheduleRetryIfEligibleAsync(Rebalance rebalance)
or RebalanceStatus.InsufficientBalance
or RebalanceStatus.ExceededFeeLimit)
{
_logger.LogInformation(
"Rebalance {RebalanceId} not eligible for retry due to terminal status {Status}",
rebalance.Id, rebalance.Status);
return;
}

var maxAttempts = rebalance.MaxAttempts ?? Constants.REBALANCE_MAX_ATTEMPTS;
if (rebalance.AttemptNumber >= maxAttempts)
{
_logger.LogInformation(
"Rebalance {RebalanceId} exhausted retry budget at attempt {Attempt}/{MaxAttempts} (status={Status})",
rebalance.Id, rebalance.AttemptNumber, maxAttempts, rebalance.Status);
return;
}

Expand Down Expand Up @@ -421,6 +476,10 @@ or RebalanceStatus.InsufficientBalance

await scheduler.ScheduleJob(job, trigger);

_logger.LogInformation(
"Rebalance {RebalanceId} retry scheduled: attempt {NextAttempt}/{MaxAttempts} in {DelaySeconds}s (newMaxFeePct={NewMaxFeePct}, fireAt={FireAt:O})",
rebalance.Id, nextAttempt, maxAttempts, delaySeconds, rebalance.MaxFeePct, fireAt);

await _auditService.LogAsync(AuditActionType.RebalanceRetryScheduled, AuditEventType.Attempt,
AuditObjectType.Rebalance, rebalance.Id.ToString(),
new
Expand Down
Loading