From 14e608b31dc44a6a806558cdf6cf22d78aac1d90 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Mon, 7 Oct 2024 16:05:48 -0400 Subject: [PATCH] chore(transactor): Allow async forcing tx requests (#119) --- baseapp/runners.go | 2 +- core/transactor/transactor.go | 24 ++++++++++++++++++------ x/jobs/block_header_job.go | 2 -- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/baseapp/runners.go b/baseapp/runners.go index 749f899f..f2129057 100644 --- a/baseapp/runners.go +++ b/baseapp/runners.go @@ -164,7 +164,7 @@ func (jm *JobManager) retryableHeaderSubscriber( // Handle error while subscribing. if shouldRetry = (err != nil); shouldRetry { jm.Logger(ctx).Error( - "error subscribing to filter logs, retrying...", + "error subscribing to block headers, retrying...", "job", blockHeaderJob.RegistryKey(), "err", err, ) return shouldRetry diff --git a/core/transactor/transactor.go b/core/transactor/transactor.go index 5dea1720..c83774fd 100644 --- a/core/transactor/transactor.go +++ b/core/transactor/transactor.go @@ -159,16 +159,28 @@ func (t *TxrV2) SendTxRequest(txReq *types.Request) (string, error) { // ForceTxRequest immediately (whenever the sender is free from any previous sends) builds and // sends the tx request to the chain, after validating it. // NOTE: this bypasses the queue and batching even if configured to do so. -func (t *TxrV2) ForceTxRequest(ctx context.Context, txReq *types.Request) (string, error) { +func (t *TxrV2) ForceTxRequest(ctx context.Context, txReq *types.Request, async bool) (string, error) { if err := txReq.Validate(); err != nil { return "", err } - go t.fire( - ctx, - &tracker.Response{MsgIDs: []string{txReq.MsgID}, InitialTimes: []time.Time{txReq.Time()}}, - true, txReq.CallMsg, - ) + if async { + go t.fire( + ctx, + &tracker.Response{ + MsgIDs: []string{txReq.MsgID}, InitialTimes: []time.Time{txReq.Time()}, + }, + true, txReq.CallMsg, + ) + } else { + t.fire( + ctx, + &tracker.Response{ + MsgIDs: []string{txReq.MsgID}, InitialTimes: []time.Time{txReq.Time()}, + }, + true, txReq.CallMsg, + ) + } return txReq.MsgID, nil } diff --git a/x/jobs/block_header_job.go b/x/jobs/block_header_job.go index 0610be71..4efae6fa 100644 --- a/x/jobs/block_header_job.go +++ b/x/jobs/block_header_job.go @@ -40,8 +40,6 @@ func (w *BlockHeaderWatcher) Subscribe( return nil, nil, err } w.sub = sub - - sCtx.Logger().Info("Subscribed to new block headers") return sub, headerCh, nil }