Skip to content

Commit

Permalink
Merge pull request #383 from getAlby/fix/payment-finalizer-loop
Browse files Browse the repository at this point in the history
payment finalizer: never break loop
  • Loading branch information
kiwiidb committed Jun 20, 2023
2 parents ea0ab4e + bec0544 commit e6b270e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 67 deletions.
21 changes: 0 additions & 21 deletions rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"os"
"sync"
"time"

"github.com/getAlby/lndhub.go/db/models"
"github.com/getsentry/sentry-go"
Expand Down Expand Up @@ -167,33 +166,13 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv

client.logger.Infof("Payment finalizer: Found %d pending invoices", len(pendingInvoices))

ticker := time.NewTicker(time.Hour)
defer ticker.Stop()

client.logger.Info("Starting payment finalizer rabbitmq consumer")

for {
// Shortcircuit if no pending invoices are left
if len(pendingInvoices) == 0 {
client.logger.Info("Payment finalizer: Resolved all pending payments, exiting payment finalizer routine")

return nil
}

select {
case <-ctx.Done():
return context.Canceled

case <-ticker.C:
invoices, err := getInvoicesTable(ctx)
if err != nil {
return err
}

pendingInvoices = invoices

client.logger.Infof("Payment finalizer: Found %d pending invoices", len(pendingInvoices))

case delivery, ok := <-deliveryChan:
if !ok {
return err
Expand Down
73 changes: 27 additions & 46 deletions rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ func TestFinalizedInitializedPayments(t *testing.T) {
Times(1).
Return(ch, nil)

firstHash := "69e5f0f0590be75e30f671d56afe1d55"
secondHash := "ffff0f0590be75e30f671d56afe1d55"
firstHash := "69e5f0f0590be75e30f671d56afe1d55"
secondHash := "ffff0f0590be75e30f671d56afe1d55"

invoices := []models.Invoice{
{
ID: 0,
RHash: firstHash,
ID: 0,
RHash: firstHash,
},
{
ID: 1,
RHash: secondHash,
},
{
ID: 1,
RHash: secondHash,
},
}

lndHubService.EXPECT().
Expand Down Expand Up @@ -75,48 +75,29 @@ func TestFinalizedInitializedPayments(t *testing.T) {
Return(models.TransactionEntry{InvoiceID: invoices[1].ID}, nil)

ctx := context.Background()
successPayment, err := json.Marshal(&lnrpc.Payment{PaymentHash: firstHash, Status: lnrpc.Payment_SUCCEEDED})
if err != nil {
t.Error(err)
}

failedPayment, err := json.Marshal(&lnrpc.Payment{PaymentHash: secondHash, Status: lnrpc.Payment_FAILED})
if err != nil {
t.Error(err)
}

ch <- amqp.Delivery{Body: successPayment}
ch <- amqp.Delivery{Body: failedPayment}

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
err = client.FinalizeInitializedPayments(ctx, lndHubService)
successPayment, err := json.Marshal(&lnrpc.Payment{PaymentHash: firstHash, Status: lnrpc.Payment_SUCCEEDED})
if err != nil {
t.Error(err)
}

assert.NoError(t, err)
wg.Done()
}()
failedPayment, err := json.Marshal(&lnrpc.Payment{PaymentHash: secondHash, Status: lnrpc.Payment_FAILED})
if err != nil {
t.Error(err)
}

waitTimeout(&wg, time.Second * 3, t)
}
ch <- amqp.Delivery{Body: successPayment}
ch <- amqp.Delivery{Body: failedPayment}

// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
wg := sync.WaitGroup{}

select {
case <-c:
return false // completed normally
wg.Add(1)
go func() {
err = client.FinalizeInitializedPayments(ctx, lndHubService)

case <-time.After(timeout):
t.Errorf("Waiting on waitgroup timed out during test")
assert.NoError(t, err)
wg.Done()
}()

return true // timed out
}
//wait a bit for payments to be processed
time.Sleep(time.Second)
}

0 comments on commit e6b270e

Please sign in to comment.