From 350fbb06de61899c1f3e377fbca4ab6a4f095d6c Mon Sep 17 00:00:00 2001 From: Martynas Kazlauskas Date: Thu, 19 Dec 2024 09:43:20 +0200 Subject: [PATCH 1/3] chore(tx-construction): add missing tsconfig reference to util-rxjs --- packages/tx-construction/src/tsconfig.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/tx-construction/src/tsconfig.json b/packages/tx-construction/src/tsconfig.json index 77bfb985f5b..4b9f9590a94 100644 --- a/packages/tx-construction/src/tsconfig.json +++ b/packages/tx-construction/src/tsconfig.json @@ -7,6 +7,9 @@ { "path": "../../util/src" }, + { + "path": "../../util-rxjs/src" + }, { "path": "../../core/src" }, From 9bad2df58d48e920881da68adf51c20ee1d7c886 Mon Sep 17 00:00:00 2001 From: Martynas Kazlauskas Date: Thu, 19 Dec 2024 09:44:21 +0200 Subject: [PATCH 2/3] refactor!: rename 'coldObservableProvider' util to 'poll' this util is not Provider-aware it resides in 'util-rxjs' package, which does not depend on 'core' follow-up commit will add a new Provider-aware util that knows about retryable ProviderErrors BREAKING CHANGE: rename poll props 'provider' to 'sample' --- .../src/tx-builder/TxBuilder.ts | 8 +- packages/util-rxjs/src/index.ts | 2 +- .../{coldObservableProvider.ts => poll.ts} | 12 +-- ...bservableProvider.test.ts => poll.test.ts} | 90 +++++++++---------- packages/wallet/src/Wallets/BaseWallet.ts | 34 +++---- packages/wallet/src/services/AssetsTracker.ts | 8 +- .../DelegationTracker/DelegationTracker.ts | 8 +- .../DelegationTracker/RewardAccounts.ts | 12 +-- .../DelegationTracker/RewardsHistory.ts | 10 +-- .../wallet/src/services/DrepInfoTracker.ts | 6 +- .../src/services/SupplyDistributionTracker.ts | 10 +-- .../src/services/TransactionsTracker.ts | 9 +- packages/wallet/src/services/UtxoTracker.ts | 8 +- 13 files changed, 109 insertions(+), 108 deletions(-) rename packages/util-rxjs/src/{coldObservableProvider.ts => poll.ts} (92%) rename packages/util-rxjs/test/{coldObservableProvider.test.ts => poll.test.ts} (53%) diff --git a/packages/tx-construction/src/tx-builder/TxBuilder.ts b/packages/tx-construction/src/tx-builder/TxBuilder.ts index 12560ac6583..0f6405538e3 100644 --- a/packages/tx-construction/src/tx-builder/TxBuilder.ts +++ b/packages/tx-construction/src/tx-builder/TxBuilder.ts @@ -39,11 +39,11 @@ import { validateValidityInterval } from './utils'; import { SelectionSkeleton } from '@cardano-sdk/input-selection'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; import { contextLogger, deepEquals } from '@cardano-sdk/util'; import { createOutputValidator } from '../output-validation'; import { initializeTx } from './initializeTx'; import { lastValueFrom } from 'rxjs'; +import { poll } from '@cardano-sdk/util-rxjs'; import omit from 'lodash/omit.js'; import uniq from 'lodash/uniq.js'; @@ -499,12 +499,12 @@ export class GenericTxBuilder implements TxBuilder { allRewardAccounts = uniq([...knownAddresses, ...newAddresses]).map(({ rewardAccount }) => rewardAccount); } - const rewardAccounts$ = coldObservableProvider({ + const rewardAccounts$ = poll({ logger: contextLogger(this.#logger, 'getOrCreateRewardAccounts'), pollUntil: (rewardAccounts) => allRewardAccounts.every((newAccount) => rewardAccounts.some((acct) => acct.address === newAccount)), - provider: this.#dependencies.txBuilderProviders.rewardAccounts, - retryBackoffConfig: { initialInterval: 10, maxInterval: 100, maxRetries: 10 } + retryBackoffConfig: { initialInterval: 10, maxInterval: 100, maxRetries: 10 }, + sample: this.#dependencies.txBuilderProviders.rewardAccounts }); try { diff --git a/packages/util-rxjs/src/index.ts b/packages/util-rxjs/src/index.ts index 5fcb56e4e30..04b5f96e362 100644 --- a/packages/util-rxjs/src/index.ts +++ b/packages/util-rxjs/src/index.ts @@ -5,5 +5,5 @@ export * from './passthrough'; export * from './finalizeWithLatest'; export * from './concatAndCombineLatest'; export * from './shareRetryBackoff'; -export * from './coldObservableProvider'; +export * from './poll'; export * from './types'; diff --git a/packages/util-rxjs/src/coldObservableProvider.ts b/packages/util-rxjs/src/poll.ts similarity index 92% rename from packages/util-rxjs/src/coldObservableProvider.ts rename to packages/util-rxjs/src/poll.ts index b27fddfb5d0..a66a34131f5 100644 --- a/packages/util-rxjs/src/coldObservableProvider.ts +++ b/packages/util-rxjs/src/poll.ts @@ -18,8 +18,8 @@ import { } from 'rxjs'; import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs'; -export interface ColdObservableProviderProps { - provider: () => Promise; +export interface PollProps { + sample: () => Promise; retryBackoffConfig: RetryBackoffConfig; onFatalError?: (value: unknown) => void; trigger$?: Observable; @@ -30,8 +30,8 @@ export interface ColdObservableProviderProps { logger: Logger; } -export const coldObservableProvider = ({ - provider, +export const poll = ({ + sample, retryBackoffConfig, onFatalError, trigger$ = of(true), @@ -40,7 +40,7 @@ export const coldObservableProvider = ({ cancel$ = NEVER, pollUntil = () => true, logger -}: ColdObservableProviderProps) => +}: PollProps) => new Observable((subscriber) => { const cancelOnFatalError$ = new Subject(); const internalCancel$ = merge(cancel$, cancelOnFatalError$); @@ -48,7 +48,7 @@ export const coldObservableProvider = ({ .pipe( combinator(() => defer(() => - from(provider()).pipe( + from(sample()).pipe( mergeMap((v) => pollUntil(v) ? of(v) diff --git a/packages/util-rxjs/test/coldObservableProvider.test.ts b/packages/util-rxjs/test/poll.test.ts similarity index 53% rename from packages/util-rxjs/test/coldObservableProvider.test.ts rename to packages/util-rxjs/test/poll.test.ts index 84ab33fa26a..5751c0bca23 100644 --- a/packages/util-rxjs/test/coldObservableProvider.test.ts +++ b/packages/util-rxjs/test/poll.test.ts @@ -2,14 +2,14 @@ import { BehaviorSubject, EmptyError, Subject, firstValueFrom, lastValueFrom, ta import { InvalidStringError } from '@cardano-sdk/util'; import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs'; import { TestLogger, createLogger } from '@cardano-sdk/util-dev'; -import { coldObservableProvider } from '../src'; +import { poll } from '../src'; // There might be a more elegant way to mock with original implementation (spy) jest.mock('backoff-rxjs', () => ({ retryBackoff: jest.fn().mockImplementation((...args) => jest.requireActual('backoff-rxjs').retryBackoff(...args)) })); -describe('coldObservableProvider', () => { +describe('poll', () => { let logger: TestLogger; const testErrorStr = 'Test error'; @@ -17,53 +17,53 @@ describe('coldObservableProvider', () => { logger = createLogger({ record: true }); }); - it('returns an observable that calls underlying provider on each subscription and uses retryBackoff', async () => { - const underlyingProvider = jest.fn().mockResolvedValue(true); + it('returns an observable that calls sample on each subscription and uses retryBackoff', async () => { + const sample = jest.fn().mockResolvedValue(true); const backoffConfig: RetryBackoffConfig = { initialInterval: 1 }; - const provider$ = coldObservableProvider({ + const values$ = poll({ logger, - provider: underlyingProvider, - retryBackoffConfig: backoffConfig + retryBackoffConfig: backoffConfig, + sample }); - expect(await firstValueFrom(provider$)).toBe(true); - expect(await firstValueFrom(provider$)).toBe(true); - expect(underlyingProvider).toBeCalledTimes(2); + expect(await firstValueFrom(values$)).toBe(true); + expect(await firstValueFrom(values$)).toBe(true); + expect(sample).toBeCalledTimes(2); expect(retryBackoff).toBeCalledTimes(2); }); - it('provider is unsubscribed on cancel emit', async () => { - const fakeProviderSubject = new Subject(); - const underlyingProvider = () => firstValueFrom(fakeProviderSubject); + it('completes on cancel emit', async () => { + const fakeSampleSubject = new Subject(); + const sample = () => firstValueFrom(fakeSampleSubject); const backoffConfig: RetryBackoffConfig = { initialInterval: 1 }; const cancel$ = new BehaviorSubject(true); - const provider$ = coldObservableProvider({ + const values$ = poll({ cancel$, logger, - provider: underlyingProvider, - retryBackoffConfig: backoffConfig + retryBackoffConfig: backoffConfig, + sample }); try { - await firstValueFrom(provider$); + await firstValueFrom(values$); } catch (error) { expect(error).toBeInstanceOf(EmptyError); } expect.assertions(1); }); - it('retries using retryBackoff, when underlying provider rejects', async () => { - const underlyingProvider = jest.fn().mockRejectedValueOnce(false).mockResolvedValue(true); + it('retries using retryBackoff, when sample rejects', async () => { + const sample = jest.fn().mockRejectedValueOnce(false).mockResolvedValue(true); const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1 }; - const provider$ = coldObservableProvider({ logger, provider: underlyingProvider, retryBackoffConfig }); - const resolvedValue = await firstValueFrom(provider$); - expect(underlyingProvider).toBeCalledTimes(2); + const values$ = poll({ logger, retryBackoffConfig, sample }); + const resolvedValue = await firstValueFrom(values$); + expect(sample).toBeCalledTimes(2); expect(resolvedValue).toBeTruthy(); }); - it('does not retry, when underlying provider rejects with InvalidStringError', async () => { + it('does not retry, when sample rejects with InvalidStringError', async () => { const testValue = { test: 'value' }; const testError = new InvalidStringError('Test invalid string error'); - const underlyingProvider = jest + const sample = jest .fn() .mockRejectedValueOnce(new Error(testErrorStr)) .mockResolvedValueOnce(testValue) @@ -71,15 +71,15 @@ describe('coldObservableProvider', () => { .mockResolvedValueOnce(testValue); const onFatalError = jest.fn(); const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, shouldRetry: () => true }; - const provider$ = coldObservableProvider({ + const values$ = poll({ logger, onFatalError, - provider: underlyingProvider, - retryBackoffConfig + retryBackoffConfig, + sample }); - await expect(firstValueFrom(provider$)).resolves.toBe(testValue); - await expect(firstValueFrom(provider$)).rejects.toThrow(EmptyError); - expect(underlyingProvider).toBeCalledTimes(3); + await expect(firstValueFrom(values$)).resolves.toBe(testValue); + await expect(firstValueFrom(values$)).rejects.toThrow(EmptyError); + expect(sample).toBeCalledTimes(3); expect(onFatalError).toBeCalledWith(testError); expect(logger.messages).toStrictEqual([ { level: 'error', message: [new Error(testErrorStr)] }, @@ -89,8 +89,8 @@ describe('coldObservableProvider', () => { ]); }); - it('polls the provider until the pollUntil condition is satisfied', async () => { - const underlyingProvider = jest + it('polls sample until the pollUntil condition is satisfied', async () => { + const sample = jest .fn() .mockResolvedValueOnce('a') .mockResolvedValueOnce('b') @@ -98,36 +98,36 @@ describe('coldObservableProvider', () => { .mockResolvedValue('Never reached'); const backoffConfig: RetryBackoffConfig = { initialInterval: 1 }; - const provider$ = coldObservableProvider({ + const values$ = poll({ logger, pollUntil: (v) => v === 'c', - provider: underlyingProvider, - retryBackoffConfig: backoffConfig + retryBackoffConfig: backoffConfig, + sample }); - const providerValues: unknown[] = []; - await lastValueFrom(provider$.pipe(tap((v) => providerValues.push(v)))); - expect(providerValues).toEqual(['a', 'b', 'c']); - expect(underlyingProvider).toBeCalledTimes(3); + const sampleValues: unknown[] = []; + await lastValueFrom(values$.pipe(tap((v) => sampleValues.push(v)))); + expect(sampleValues).toEqual(['a', 'b', 'c']); + expect(sample).toBeCalledTimes(3); }); it('stops retrying after maxRetries attempts and handles the error in catchError', async () => { const testError = new Error(testErrorStr); - const underlyingProvider = jest.fn().mockRejectedValue(testError); + const sample = jest.fn().mockRejectedValue(testError); const maxRetries = 3; const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, maxRetries }; const onFatalError = jest.fn(); - const provider$ = coldObservableProvider({ + const values$ = poll({ logger, onFatalError, - provider: underlyingProvider, - retryBackoffConfig + retryBackoffConfig, + sample }); - await expect(firstValueFrom(provider$)).rejects.toThrow(testError); + await expect(firstValueFrom(values$)).rejects.toThrow(testError); - expect(underlyingProvider).toBeCalledTimes(maxRetries + 1); + expect(sample).toBeCalledTimes(maxRetries + 1); expect(onFatalError).toBeCalledWith(expect.any(Error)); expect(logger.messages).toStrictEqual([ { level: 'error', message: [testError] }, diff --git a/packages/wallet/src/Wallets/BaseWallet.ts b/packages/wallet/src/Wallets/BaseWallet.ts index 4935e604561..d50a660c443 100644 --- a/packages/wallet/src/Wallets/BaseWallet.ts +++ b/packages/wallet/src/Wallets/BaseWallet.ts @@ -71,7 +71,7 @@ import { TxSubmitProvider, UtxoProvider } from '@cardano-sdk/core'; -import { BehaviorObservable, TrackerSubject, coldObservableProvider } from '@cardano-sdk/util-rxjs'; +import { BehaviorObservable, TrackerSubject, poll } from '@cardano-sdk/util-rxjs'; import { BehaviorSubject, EMPTY, @@ -369,15 +369,15 @@ export class BaseWallet implements ObservableWallet { if (isBip32PublicCredentialsManager(this.#publicCredentialsManager)) { this.#addressTracker = createAddressTracker({ - addressDiscovery$: coldObservableProvider({ + addressDiscovery$: poll({ cancel$, logger: contextLogger(this.#logger, 'addressDiscovery$'), onFatalError, - provider: () => { + retryBackoffConfig, + sample: () => { const credManager = this.#publicCredentialsManager as Bip32PublicCredentialsManager; return credManager.addressDiscovery.discover(credManager.bip32Account); - }, - retryBackoffConfig + } }).pipe( take(1), catchError((error) => { @@ -403,12 +403,12 @@ export class BaseWallet implements ObservableWallet { logger: contextLogger(this.#logger, 'tip$'), maxPollInterval: maxInterval, minPollInterval: pollInterval, - provider$: coldObservableProvider({ + provider$: poll({ cancel$, logger: contextLogger(this.#logger, 'tip$'), onFatalError, - provider: this.networkInfoProvider.ledgerTip, - retryBackoffConfig + retryBackoffConfig, + sample: this.networkInfoProvider.ledgerTip }), store: stores.tip, syncStatus: this.syncStatus @@ -426,13 +426,13 @@ export class BaseWallet implements ObservableWallet { // Era summaries const eraSummariesTrigger = new BehaviorSubject(void 0); this.eraSummaries$ = new PersistentDocumentTrackerSubject( - coldObservableProvider({ + poll({ cancel$, equals: deepEquals, logger: contextLogger(this.#logger, 'eraSummaries$'), onFatalError, - provider: this.networkInfoProvider.eraSummaries, retryBackoffConfig, + sample: this.networkInfoProvider.eraSummaries, trigger$: eraSummariesTrigger.pipe(tap(() => 'Trigger request era summaries')) }), stores.eraSummaries @@ -450,25 +450,25 @@ export class BaseWallet implements ObservableWallet { tap((epoch) => this.#logger.debug(`Current epoch is ${epoch}`)) ); this.protocolParameters$ = new PersistentDocumentTrackerSubject( - coldObservableProvider({ + poll({ cancel$, equals: isEqual, logger: contextLogger(this.#logger, 'protocolParameters$'), onFatalError, - provider: this.networkInfoProvider.protocolParameters, retryBackoffConfig, + sample: this.networkInfoProvider.protocolParameters, trigger$: epoch$ }), stores.protocolParameters ); this.genesisParameters$ = new PersistentDocumentTrackerSubject( - coldObservableProvider({ + poll({ cancel$, equals: isEqual, logger: contextLogger(this.#logger, 'genesisParameters$'), onFatalError, - provider: this.networkInfoProvider.genesisParameters, retryBackoffConfig, + sample: this.networkInfoProvider.genesisParameters, trigger$: epoch$ }), stores.genesisParameters @@ -602,13 +602,13 @@ export class BaseWallet implements ObservableWallet { this.handles$ = this.handleProvider ? this.initializeHandles( new PersistentDocumentTrackerSubject( - coldObservableProvider({ + poll({ cancel$, equals: isEqual, logger: contextLogger(this.#logger, 'handles$'), onFatalError, - provider: () => this.handleProvider.getPolicyIds(), - retryBackoffConfig + retryBackoffConfig, + sample: () => this.handleProvider.getPolicyIds() }), stores.policyIds ) diff --git a/packages/wallet/src/services/AssetsTracker.ts b/packages/wallet/src/services/AssetsTracker.ts index 6d9c7023384..cbc5e554c8a 100644 --- a/packages/wallet/src/services/AssetsTracker.ts +++ b/packages/wallet/src/services/AssetsTracker.ts @@ -20,7 +20,7 @@ import { } from 'rxjs'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { TrackedAssetProvider } from './ProviderTracker'; -import { coldObservableProvider, concatAndCombineLatest } from '@cardano-sdk/util-rxjs'; +import { concatAndCombineLatest, poll } from '@cardano-sdk/util-rxjs'; import { deepEquals, isNotNil } from '@cardano-sdk/util'; import { newTransactions$ } from './TransactionsTracker'; import chunk from 'lodash/chunk.js'; @@ -139,13 +139,13 @@ export const createAssetService = (assetIds: Cardano.AssetId[]) => concatAndCombineLatest( chunk(assetIds, ASSET_INFO_FETCH_CHUNK_SIZE).map((assetIdsChunk) => - coldObservableProvider({ + poll({ logger, onFatalError, pollUntil: isEveryAssetInfoComplete, - provider: () => - getAssetsWithCache(assetIdsChunk, assetCache$, totalBalance$, assetProvider, maxAssetInfoCacheAge), retryBackoffConfig, + sample: () => + getAssetsWithCache(assetIdsChunk, assetCache$, totalBalance$, assetProvider, maxAssetInfoCacheAge), trigger$: of(true) // fetch only once }) ) diff --git a/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts b/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts index 716211b08e8..469e700e211 100644 --- a/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts +++ b/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts @@ -21,7 +21,7 @@ import { RetryBackoffConfig } from 'backoff-rxjs'; import { RewardsHistoryProvider, createRewardsHistoryProvider, createRewardsHistoryTracker } from './RewardsHistory'; import { Shutdown, contextLogger } from '@cardano-sdk/util'; import { TrackedRewardsProvider, TrackedStakePoolProvider } from '../ProviderTracker'; -import { TrackerSubject, coldObservableProvider } from '@cardano-sdk/util-rxjs'; +import { TrackerSubject, poll } from '@cardano-sdk/util-rxjs'; import { TxWithEpoch } from './types'; import { WalletStores } from '../../persistence'; import { createDelegationDistributionTracker } from './DelegationDistributionTracker'; @@ -35,11 +35,11 @@ export const createBlockEpochProvider = onFatalError?: (value: unknown) => void ) => (ids: Cardano.BlockId[]) => - coldObservableProvider({ + poll({ logger, onFatalError, - provider: () => chainHistoryProvider.blocksByHashes({ ids }), - retryBackoffConfig + retryBackoffConfig, + sample: () => chainHistoryProvider.blocksByHashes({ ids }) }).pipe(map((blocks) => blocks.map(({ epoch }) => epoch))); export type BlockEpochProvider = ReturnType; diff --git a/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts b/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts index 073f2fc5f12..769a327a2e6 100644 --- a/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts +++ b/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts @@ -25,9 +25,9 @@ import { PAGE_SIZE } from '../TransactionsTracker'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { TrackedStakePoolProvider } from '../ProviderTracker'; import { TxWithEpoch } from './types'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; import { drepsToDelegatees, drepsToDrepIds } from '../DrepInfoTracker'; import { lastStakeKeyCertOfType } from './transactionCertificates'; +import { poll } from '@cardano-sdk/util-rxjs'; import findLast from 'lodash/findLast.js'; import isEqual from 'lodash/isEqual.js'; import uniq from 'lodash/uniq.js'; @@ -67,11 +67,11 @@ export const createQueryStakePoolsProvider = } return merge( store.getValues(poolIds), - coldObservableProvider({ + poll({ logger, onFatalError, - provider: () => allStakePoolsByPoolIds(stakePoolProvider, { poolIds }), - retryBackoffConfig + retryBackoffConfig, + sample: () => allStakePoolsByPoolIds(stakePoolProvider, { poolIds }) }).pipe( tap((pageResults) => { for (const stakePool of pageResults) { @@ -119,12 +119,12 @@ export const createRewardsProvider = (rewardAccounts: Cardano.RewardAccount[], equals = isEqual): Observable => combineLatest( rewardAccounts.map((rewardAccount) => - coldObservableProvider({ + poll({ equals, logger, onFatalError, - provider: () => rewardsProvider.rewardAccountBalance({ rewardAccount }), retryBackoffConfig, + sample: () => rewardsProvider.rewardAccountBalance({ rewardAccount }), trigger$: fetchRewardsTrigger$(epoch$, txOnChain$, rewardAccount) }) ) diff --git a/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts b/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts index 10a397785c6..4140f5ce9e5 100644 --- a/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts +++ b/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts @@ -7,7 +7,7 @@ import { RetryBackoffConfig } from 'backoff-rxjs'; import { RewardsHistory } from '../types'; import { TrackedRewardsProvider } from '../ProviderTracker'; import { TxWithEpoch } from './types'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; +import { poll } from '@cardano-sdk/util-rxjs'; import first from 'lodash/first.js'; import flatten from 'lodash/flatten.js'; import sortBy from 'lodash/sortBy.js'; @@ -28,15 +28,15 @@ export const createRewardsHistoryProvider = onFatalError?: (value: unknown) => void ): Observable> => { if (lowerBound) { - return coldObservableProvider({ + return poll({ logger, onFatalError, - provider: () => + retryBackoffConfig, + sample: () => rewardsProvider.rewardsHistory({ epochs: { lowerBound }, rewardAccounts - }), - retryBackoffConfig + }) }); } rewardsProvider.setStatInitialized(rewardsProvider.stats.rewardsHistory$); diff --git a/packages/wallet/src/services/DrepInfoTracker.ts b/packages/wallet/src/services/DrepInfoTracker.ts index f1b1961deae..0b8a1fd43be 100644 --- a/packages/wallet/src/services/DrepInfoTracker.ts +++ b/packages/wallet/src/services/DrepInfoTracker.ts @@ -2,9 +2,9 @@ import { Cardano, DRepInfo, DRepProvider } from '@cardano-sdk/core'; import { Logger } from 'ts-log'; import { Observable, map, merge, of, withLatestFrom } from 'rxjs'; import { RetryBackoffConfig } from 'backoff-rxjs'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; import { distinctBlock } from './util'; import { isNotNil } from '@cardano-sdk/util'; +import { poll } from '@cardano-sdk/util-rxjs'; type DrepInfoObservableProps = { drepProvider: DRepProvider; @@ -17,10 +17,10 @@ type DrepInfoObservableProps = { export const createDrepInfoColdObservable = ({ drepProvider, retryBackoffConfig, refetchTrigger$, logger }: DrepInfoObservableProps) => (drepIds: Cardano.DRepID[]) => - coldObservableProvider({ + poll({ logger, - provider: () => drepProvider.getDRepsInfo({ ids: drepIds }), retryBackoffConfig, + sample: () => drepProvider.getDRepsInfo({ ids: drepIds }), trigger$: merge(of(true), refetchTrigger$) }); diff --git a/packages/wallet/src/services/SupplyDistributionTracker.ts b/packages/wallet/src/services/SupplyDistributionTracker.ts index 51cc20462c8..ec2022f4fba 100644 --- a/packages/wallet/src/services/SupplyDistributionTracker.ts +++ b/packages/wallet/src/services/SupplyDistributionTracker.ts @@ -4,7 +4,7 @@ import { Observable } from 'rxjs'; import { PersistentDocumentTrackerSubject } from './util'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { SupplyDistributionStores } from '../persistence'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; +import { poll } from '@cardano-sdk/util-rxjs'; import isEqual from 'lodash/isEqual.js'; export type SupplyDistributionNetworkInfoProvider = Pick; @@ -36,24 +36,24 @@ export const createSupplyDistributionTracker = ( { logger, stores, networkInfoProvider }: SupplyDistributionTrackerDependencies ) => { const stake$ = new PersistentDocumentTrackerSubject( - coldObservableProvider({ + poll({ equals: isEqual, logger, onFatalError, - provider: networkInfoProvider.stake, retryBackoffConfig, + sample: networkInfoProvider.stake, trigger$ }), stores.stake ); const lovelaceSupply$ = new PersistentDocumentTrackerSubject( - coldObservableProvider({ + poll({ equals: isEqual, logger, onFatalError, - provider: networkInfoProvider.lovelaceSupply, retryBackoffConfig, + sample: networkInfoProvider.lovelaceSupply, trigger$ }), stores.lovelaceSupply diff --git a/packages/wallet/src/services/TransactionsTracker.ts b/packages/wallet/src/services/TransactionsTracker.ts index 8feb09ab28c..430fa430367 100644 --- a/packages/wallet/src/services/TransactionsTracker.ts +++ b/packages/wallet/src/services/TransactionsTracker.ts @@ -32,7 +32,7 @@ import { FailedTx, OutgoingOnChainTx, OutgoingTx, TransactionFailure, Transactio import { Logger } from 'ts-log'; import { Range, Shutdown, contextLogger } from '@cardano-sdk/util'; import { RetryBackoffConfig } from 'backoff-rxjs'; -import { TrackerSubject, coldObservableProvider } from '@cardano-sdk/util-rxjs'; +import { TrackerSubject, poll } from '@cardano-sdk/util-rxjs'; import { distinctBlock, signedTxsEquals, transactionsEquals, txEquals, txInEquals } from './util'; import { WitnessedTx } from '@cardano-sdk/key-management'; @@ -204,7 +204,7 @@ const findIntersectionAndUpdateTxStore = ({ rollback$: Subject; addresses: Cardano.PaymentAddress[]; }) => - coldObservableProvider({ + poll({ // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first // It should also help when using poor internet connection. // Caveat is that local transactions might get out of date... @@ -212,8 +212,10 @@ const findIntersectionAndUpdateTxStore = ({ equals: transactionsEquals, logger, onFatalError, + + retryBackoffConfig, // eslint-disable-next-line sonarjs/cognitive-complexity,complexity - provider: async () => { + sample: async () => { let rollbackOcurred = false; // eslint-disable-next-line no-constant-condition while (true) { @@ -290,7 +292,6 @@ const findIntersectionAndUpdateTxStore = ({ return localTransactions; } }, - retryBackoffConfig, trigger$: tipBlockHeight$ }); diff --git a/packages/wallet/src/services/UtxoTracker.ts b/packages/wallet/src/services/UtxoTracker.ts index 13bd749dede..1bc62863926 100644 --- a/packages/wallet/src/services/UtxoTracker.ts +++ b/packages/wallet/src/services/UtxoTracker.ts @@ -5,7 +5,7 @@ import { PersistentCollectionTrackerSubject, txInEquals, utxoEquals } from './ut import { RetryBackoffConfig } from 'backoff-rxjs'; import { TxInFlight, UtxoTracker } from './types'; import { WalletStores } from '../persistence'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; +import { poll } from '@cardano-sdk/util-rxjs'; import { sortUtxoByTxIn } from '@cardano-sdk/input-selection'; import chunk from 'lodash/chunk.js'; import uniqWith from 'lodash/uniqWith.js'; @@ -40,11 +40,12 @@ export const createUtxoProvider = ( ) => addresses$.pipe( switchMap((paymentAddresses) => - coldObservableProvider({ + poll({ equals: utxoEquals, logger, onFatalError, - provider: async () => { + retryBackoffConfig, + sample: async () => { let utxos = new Array(); const addressesSubGroups = chunk(paymentAddresses, PAGE_SIZE); @@ -55,7 +56,6 @@ export const createUtxoProvider = ( return utxos.sort(sortUtxoByTxIn); }, - retryBackoffConfig, trigger$: history$ }) ) From bf4a8b99b4e7af58021c8081d5011eacb65f3422 Mon Sep 17 00:00:00 2001 From: Martynas Kazlauskas Date: Thu, 19 Dec 2024 16:23:51 +0200 Subject: [PATCH 3/3] fix: retry all ProviderErrors except BadRequest and NotImplemented BaseWallet was retrying all errors, which could potentially hide bugs by retrying something that will never recover BREAKING CHANGE: BaseWallet observables error instead of emitting fatalError$ - remove ObservableError.fatalError$ - 'poll' util observable errors instead of calling onFatalError - remove PollProps.onFatalError - 'poll' no longer checks for InvalidStringError, it's up to consumer --- packages/util-rxjs/src/poll.ts | 100 +++++++----------- packages/util-rxjs/test/poll.test.ts | 17 ++- packages/wallet/src/Wallets/BaseWallet.ts | 33 ++---- packages/wallet/src/services/AssetsTracker.ts | 10 +- .../DelegationTracker/DelegationTracker.ts | 24 ++--- .../DelegationTracker/RewardAccounts.ts | 14 +-- .../DelegationTracker/RewardsHistory.ts | 16 +-- .../src/services/SupplyDistributionTracker.ts | 16 +-- .../src/services/TransactionsTracker.ts | 17 +-- packages/wallet/src/services/UtxoTracker.ts | 24 +---- packages/wallet/src/services/util/index.ts | 1 + .../wallet/src/services/util/pollProvider.ts | 28 +++++ packages/wallet/src/types.ts | 6 -- .../wallet/test/PersonalWallet/load.test.ts | 60 +---------- .../CustomObservableWallet.test.ts | 19 ++-- .../test/services/AssetsTracker.test.ts | 37 +------ .../DelegationTracker.test.ts | 15 ++- .../DelegationTracker/RewardAccounts.test.ts | 22 ++-- .../DelegationTracker/RewardsHistory.test.ts | 7 +- .../test/services/util/pollProvider.test.ts | 35 ++++++ .../src/observableWallet/util.ts | 1 - 21 files changed, 188 insertions(+), 314 deletions(-) create mode 100644 packages/wallet/src/services/util/pollProvider.ts create mode 100644 packages/wallet/test/services/util/pollProvider.test.ts diff --git a/packages/util-rxjs/src/poll.ts b/packages/util-rxjs/src/poll.ts index a66a34131f5..6e54e6a2496 100644 --- a/packages/util-rxjs/src/poll.ts +++ b/packages/util-rxjs/src/poll.ts @@ -1,15 +1,11 @@ -import { InvalidStringError, strictEquals } from '@cardano-sdk/util'; import { Logger } from 'ts-log'; import { NEVER, Observable, - Subject, - catchError, concat, defer, distinctUntilChanged, from, - merge, mergeMap, of, switchMap, @@ -17,11 +13,13 @@ import { throwError } from 'rxjs'; import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs'; +import { strictEquals } from '@cardano-sdk/util'; + +const POLL_UNTIL_RETRY = Symbol('POLL_UNTIL_RETRY'); export interface PollProps { sample: () => Promise; retryBackoffConfig: RetryBackoffConfig; - onFatalError?: (value: unknown) => void; trigger$?: Observable; equals?: (t1: T, t2: T) => boolean; combinator?: typeof switchMap; @@ -33,7 +31,6 @@ export interface PollProps { export const poll = ({ sample, retryBackoffConfig, - onFatalError, trigger$ = of(true), equals = strictEquals, combinator = switchMap, @@ -41,63 +38,42 @@ export const poll = ({ pollUntil = () => true, logger }: PollProps) => - new Observable((subscriber) => { - const cancelOnFatalError$ = new Subject(); - const internalCancel$ = merge(cancel$, cancelOnFatalError$); - const sub = trigger$ - .pipe( - combinator(() => - defer(() => - from(sample()).pipe( - mergeMap((v) => - pollUntil(v) - ? of(v) - : // Emit value, but also throw error to force retryBackoff to kick in - concat( - of(v), - throwError(() => new Error('polling')) - ) - ) - ) - ).pipe( - retryBackoff({ - ...retryBackoffConfig, - shouldRetry: (error) => { - logger.error(error); - - if (retryBackoffConfig.shouldRetry) { - const shouldRetry = retryBackoffConfig.shouldRetry(error); - logger.debug(`Should retry: ${shouldRetry}`); - - if (!shouldRetry) { - return false; - } - } + trigger$.pipe( + combinator(() => + defer(() => + from(sample()).pipe( + mergeMap((v) => + pollUntil(v) + ? of(v) + : // Emit value, but also throw error to force retryBackoff to kick in + concat( + of(v), + throwError(() => POLL_UNTIL_RETRY) + ) + ) + ) + ).pipe( + retryBackoff({ + ...retryBackoffConfig, + shouldRetry: (error) => { + if (error === POLL_UNTIL_RETRY) { + logger.warn('"pollUntil" condition not met, will retry'); + return true; + } - if (error instanceof InvalidStringError) { - onFatalError?.(error); - cancelOnFatalError$.next(true); - return false; - } + logger.error(error); - return true; - } - }), - catchError((error) => { - onFatalError?.(error); + if (retryBackoffConfig.shouldRetry) { + const shouldRetry = retryBackoffConfig.shouldRetry(error); + logger.debug(`Should retry: ${shouldRetry}`); + return shouldRetry; + } - // Re-throw the error to propagate it to the subscriber and complete the observable - return throwError(() => error); - }) - ) - ), - distinctUntilChanged(equals), - takeUntil(internalCancel$) + return true; + } + }) ) - .subscribe(subscriber); - - return () => { - sub.unsubscribe(); - cancelOnFatalError$.complete(); - }; - }); + ), + distinctUntilChanged(equals), + takeUntil(cancel$) + ); diff --git a/packages/util-rxjs/test/poll.test.ts b/packages/util-rxjs/test/poll.test.ts index 5751c0bca23..3f86bace8fc 100644 --- a/packages/util-rxjs/test/poll.test.ts +++ b/packages/util-rxjs/test/poll.test.ts @@ -60,7 +60,7 @@ describe('poll', () => { expect(resolvedValue).toBeTruthy(); }); - it('does not retry, when sample rejects with InvalidStringError', async () => { + it('does not retry, when shouldRetry returns false', async () => { const testValue = { test: 'value' }; const testError = new InvalidStringError('Test invalid string error'); const sample = jest @@ -69,23 +69,23 @@ describe('poll', () => { .mockResolvedValueOnce(testValue) .mockRejectedValueOnce(testError) .mockResolvedValueOnce(testValue); - const onFatalError = jest.fn(); - const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, shouldRetry: () => true }; + const retryBackoffConfig: RetryBackoffConfig = { + initialInterval: 1, + shouldRetry: (error) => !(error instanceof InvalidStringError) + }; const values$ = poll({ logger, - onFatalError, retryBackoffConfig, sample }); await expect(firstValueFrom(values$)).resolves.toBe(testValue); - await expect(firstValueFrom(values$)).rejects.toThrow(EmptyError); + await expect(firstValueFrom(values$)).rejects.toThrow(testError); expect(sample).toBeCalledTimes(3); - expect(onFatalError).toBeCalledWith(testError); expect(logger.messages).toStrictEqual([ { level: 'error', message: [new Error(testErrorStr)] }, { level: 'debug', message: ['Should retry: true'] }, { level: 'error', message: [testError] }, - { level: 'debug', message: ['Should retry: true'] } + { level: 'debug', message: ['Should retry: false'] } ]); }); @@ -116,11 +116,9 @@ describe('poll', () => { const sample = jest.fn().mockRejectedValue(testError); const maxRetries = 3; const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, maxRetries }; - const onFatalError = jest.fn(); const values$ = poll({ logger, - onFatalError, retryBackoffConfig, sample }); @@ -128,7 +126,6 @@ describe('poll', () => { await expect(firstValueFrom(values$)).rejects.toThrow(testError); expect(sample).toBeCalledTimes(maxRetries + 1); - expect(onFatalError).toBeCalledWith(expect.any(Error)); expect(logger.messages).toStrictEqual([ { level: 'error', message: [testError] }, { level: 'error', message: [testError] }, diff --git a/packages/wallet/src/Wallets/BaseWallet.ts b/packages/wallet/src/Wallets/BaseWallet.ts index d50a660c443..5d061f18111 100644 --- a/packages/wallet/src/Wallets/BaseWallet.ts +++ b/packages/wallet/src/Wallets/BaseWallet.ts @@ -53,7 +53,8 @@ import { createUtxoTracker, createWalletUtil, currentEpochTracker, - distinctEraSummaries + distinctEraSummaries, + pollProvider } from '../services'; import { AddressType, Bip32Account, GroupedAddress, WitnessedTx, Witnesser, util } from '@cardano-sdk/key-management'; import { @@ -71,7 +72,7 @@ import { TxSubmitProvider, UtxoProvider } from '@cardano-sdk/core'; -import { BehaviorObservable, TrackerSubject, poll } from '@cardano-sdk/util-rxjs'; +import { BehaviorObservable, TrackerSubject } from '@cardano-sdk/util-rxjs'; import { BehaviorSubject, EMPTY, @@ -282,7 +283,6 @@ export class BaseWallet implements ObservableWallet { readonly protocolParameters$: TrackerSubject; readonly genesisParameters$: TrackerSubject; readonly assetInfo$: TrackerSubject; - readonly fatalError$: Subject; readonly syncStatus: SyncStatus; readonly name: string; readonly util: WalletUtil; @@ -357,10 +357,6 @@ export class BaseWallet implements ObservableWallet { this.witnesser = witnesser; - this.fatalError$ = new Subject(); - - const onFatalError = this.fatalError$.next.bind(this.fatalError$); - this.name = name; const cancel$ = connectionStatusTracker$.pipe( tap((status) => (status === ConnectionStatus.up ? 'Connection UP' : 'Connection DOWN')), @@ -369,10 +365,9 @@ export class BaseWallet implements ObservableWallet { if (isBip32PublicCredentialsManager(this.#publicCredentialsManager)) { this.#addressTracker = createAddressTracker({ - addressDiscovery$: poll({ + addressDiscovery$: pollProvider({ cancel$, logger: contextLogger(this.#logger, 'addressDiscovery$'), - onFatalError, retryBackoffConfig, sample: () => { const credManager = this.#publicCredentialsManager as Bip32PublicCredentialsManager; @@ -403,10 +398,9 @@ export class BaseWallet implements ObservableWallet { logger: contextLogger(this.#logger, 'tip$'), maxPollInterval: maxInterval, minPollInterval: pollInterval, - provider$: poll({ + provider$: pollProvider({ cancel$, logger: contextLogger(this.#logger, 'tip$'), - onFatalError, retryBackoffConfig, sample: this.networkInfoProvider.ledgerTip }), @@ -426,11 +420,10 @@ export class BaseWallet implements ObservableWallet { // Era summaries const eraSummariesTrigger = new BehaviorSubject(void 0); this.eraSummaries$ = new PersistentDocumentTrackerSubject( - poll({ + pollProvider({ cancel$, equals: deepEquals, logger: contextLogger(this.#logger, 'eraSummaries$'), - onFatalError, retryBackoffConfig, sample: this.networkInfoProvider.eraSummaries, trigger$: eraSummariesTrigger.pipe(tap(() => 'Trigger request era summaries')) @@ -450,11 +443,10 @@ export class BaseWallet implements ObservableWallet { tap((epoch) => this.#logger.debug(`Current epoch is ${epoch}`)) ); this.protocolParameters$ = new PersistentDocumentTrackerSubject( - poll({ + pollProvider({ cancel$, equals: isEqual, logger: contextLogger(this.#logger, 'protocolParameters$'), - onFatalError, retryBackoffConfig, sample: this.networkInfoProvider.protocolParameters, trigger$: epoch$ @@ -462,11 +454,10 @@ export class BaseWallet implements ObservableWallet { stores.protocolParameters ); this.genesisParameters$ = new PersistentDocumentTrackerSubject( - poll({ + pollProvider({ cancel$, equals: isEqual, logger: contextLogger(this.#logger, 'genesisParameters$'), - onFatalError, retryBackoffConfig, sample: this.networkInfoProvider.genesisParameters, trigger$: epoch$ @@ -487,7 +478,6 @@ export class BaseWallet implements ObservableWallet { inFlightTransactionsStore: stores.inFlightTransactions, logger: contextLogger(this.#logger, 'transactions'), newTransactions: this.#newTransactions, - onFatalError, retryBackoffConfig, signedTransactionsStore: stores.signedTransactions, tip$: this.tip$, @@ -521,7 +511,6 @@ export class BaseWallet implements ObservableWallet { addresses$, history$: this.transactions.history$, logger: contextLogger(this.#logger, 'utxo'), - onFatalError, retryBackoffConfig, stores, transactionsInFlight$: this.transactions.outgoing.inFlight$, @@ -546,7 +535,6 @@ export class BaseWallet implements ObservableWallet { eraSummaries$, knownAddresses$: this.addresses$, logger: contextLogger(this.#logger, 'delegation'), - onFatalError, retryBackoffConfig, rewardAccountAddresses$: this.addresses$.pipe( map((addresses) => uniq(addresses.map((groupedAddress) => groupedAddress.rewardAccount))) @@ -592,7 +580,6 @@ export class BaseWallet implements ObservableWallet { balanceTracker: this.balance, logger: contextLogger(this.#logger, 'assets$'), maxAssetInfoCacheAge, - onFatalError, retryBackoffConfig, transactionsTracker: this.transactions }), @@ -602,11 +589,10 @@ export class BaseWallet implements ObservableWallet { this.handles$ = this.handleProvider ? this.initializeHandles( new PersistentDocumentTrackerSubject( - poll({ + pollProvider({ cancel$, equals: isEqual, logger: contextLogger(this.#logger, 'handles$'), - onFatalError, retryBackoffConfig, sample: () => this.handleProvider.getPolicyIds() }), @@ -798,7 +784,6 @@ export class BaseWallet implements ObservableWallet { this.currentEpoch$.complete(); this.delegation.shutdown(); this.assetInfo$.complete(); - this.fatalError$.complete(); this.syncStatus.shutdown(); this.#newTransactions.failedToSubmit$.complete(); this.#newTransactions.pending$.complete(); diff --git a/packages/wallet/src/services/AssetsTracker.ts b/packages/wallet/src/services/AssetsTracker.ts index cbc5e554c8a..42b93b001e6 100644 --- a/packages/wallet/src/services/AssetsTracker.ts +++ b/packages/wallet/src/services/AssetsTracker.ts @@ -20,9 +20,10 @@ import { } from 'rxjs'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { TrackedAssetProvider } from './ProviderTracker'; -import { concatAndCombineLatest, poll } from '@cardano-sdk/util-rxjs'; +import { concatAndCombineLatest } from '@cardano-sdk/util-rxjs'; import { deepEquals, isNotNil } from '@cardano-sdk/util'; import { newTransactions$ } from './TransactionsTracker'; +import { pollProvider } from './util'; import chunk from 'lodash/chunk.js'; import uniq from 'lodash/uniq.js'; @@ -132,16 +133,14 @@ export const createAssetService = totalBalance$: Observable, retryBackoffConfig: RetryBackoffConfig, logger: Logger, - onFatalError?: (value: unknown) => void, maxAssetInfoCacheAge: Milliseconds = ONE_WEEK // eslint-disable-next-line max-params ) => (assetIds: Cardano.AssetId[]) => concatAndCombineLatest( chunk(assetIds, ASSET_INFO_FETCH_CHUNK_SIZE).map((assetIdsChunk) => - poll({ + pollProvider({ logger, - onFatalError, pollUntil: isEveryAssetInfoComplete, retryBackoffConfig, sample: () => @@ -160,7 +159,6 @@ export interface AssetsTrackerProps { logger: Logger; assetsCache$: Observable; balanceTracker: BalanceTracker; - onFatalError?: (value: unknown) => void; maxAssetInfoCacheAge?: Milliseconds; } @@ -182,7 +180,6 @@ export const createAssetsTracker = ( }, retryBackoffConfig, logger, - onFatalError, maxAssetInfoCacheAge }: AssetsTrackerProps, { @@ -192,7 +189,6 @@ export const createAssetsTracker = ( total$, retryBackoffConfig, logger, - onFatalError, maxAssetInfoCacheAge ) }: AssetsTrackerInternals = {} diff --git a/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts b/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts index 469e700e211..f7ebdf29f74 100644 --- a/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts +++ b/packages/wallet/src/services/DelegationTracker/DelegationTracker.ts @@ -21,23 +21,18 @@ import { RetryBackoffConfig } from 'backoff-rxjs'; import { RewardsHistoryProvider, createRewardsHistoryProvider, createRewardsHistoryTracker } from './RewardsHistory'; import { Shutdown, contextLogger } from '@cardano-sdk/util'; import { TrackedRewardsProvider, TrackedStakePoolProvider } from '../ProviderTracker'; -import { TrackerSubject, poll } from '@cardano-sdk/util-rxjs'; +import { TrackerSubject } from '@cardano-sdk/util-rxjs'; import { TxWithEpoch } from './types'; import { WalletStores } from '../../persistence'; import { createDelegationDistributionTracker } from './DelegationDistributionTracker'; +import { pollProvider } from '../util'; import { transactionsWithCertificates } from './transactionCertificates'; export const createBlockEpochProvider = - ( - chainHistoryProvider: ChainHistoryProvider, - retryBackoffConfig: RetryBackoffConfig, - logger: Logger, - onFatalError?: (value: unknown) => void - ) => + (chainHistoryProvider: ChainHistoryProvider, retryBackoffConfig: RetryBackoffConfig, logger: Logger) => (ids: Cardano.BlockId[]) => - poll({ + pollProvider({ logger, - onFatalError, retryBackoffConfig, sample: () => chainHistoryProvider.blocksByHashes({ ids }) }).pipe(map((blocks) => blocks.map(({ epoch }) => epoch))); @@ -63,7 +58,6 @@ export interface DelegationTrackerProps { slotEpochCalc$?: Observable; }; logger: Logger; - onFatalError?: (value: unknown) => void; } export const certificateTransactionsWithEpochs = ( @@ -127,14 +121,12 @@ export const createDelegationTracker = ({ utxoTracker, stores, logger, - onFatalError, internals: { queryStakePoolsProvider = createQueryStakePoolsProvider( stakePoolProvider, stores.stakePools, retryBackoffConfig, - logger, - onFatalError + logger ), rewardsHistoryProvider = createRewardsHistoryProvider(rewardsTracker, retryBackoffConfig), rewardsProvider = createRewardsProvider( @@ -142,8 +134,7 @@ export const createDelegationTracker = ({ transactionsTracker.outgoing.onChain$, rewardsTracker, retryBackoffConfig, - logger, - onFatalError + logger ), slotEpochCalc$ = eraSummaries$.pipe(map((eraSummaries) => createSlotEpochCalc(eraSummaries))) } = {} @@ -167,8 +158,7 @@ export const createDelegationTracker = ({ rewardAccountAddresses$, rewardsHistoryProvider, stores.rewardsHistory, - contextLogger(logger, 'rewardsHistory$'), - onFatalError + contextLogger(logger, 'rewardsHistory$') ) ); diff --git a/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts b/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts index 769a327a2e6..7453cf6dd06 100644 --- a/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts +++ b/packages/wallet/src/services/DelegationTracker/RewardAccounts.ts @@ -27,7 +27,7 @@ import { TrackedStakePoolProvider } from '../ProviderTracker'; import { TxWithEpoch } from './types'; import { drepsToDelegatees, drepsToDrepIds } from '../DrepInfoTracker'; import { lastStakeKeyCertOfType } from './transactionCertificates'; -import { poll } from '@cardano-sdk/util-rxjs'; +import { pollProvider } from '../util'; import findLast from 'lodash/findLast.js'; import isEqual from 'lodash/isEqual.js'; import uniq from 'lodash/uniq.js'; @@ -57,8 +57,7 @@ export const createQueryStakePoolsProvider = stakePoolProvider: TrackedStakePoolProvider, store: KeyValueStore, retryBackoffConfig: RetryBackoffConfig, - logger: Logger, - onFatalError?: (value: unknown) => void + logger: Logger ) => (poolIds: Cardano.PoolId[]) => { if (poolIds.length === 0) { @@ -67,9 +66,8 @@ export const createQueryStakePoolsProvider = } return merge( store.getValues(poolIds), - poll({ + pollProvider({ logger, - onFatalError, retryBackoffConfig, sample: () => allStakePoolsByPoolIds(stakePoolProvider, { poolIds }) }).pipe( @@ -112,17 +110,15 @@ export const createRewardsProvider = txOnChain$: Observable, rewardsProvider: RewardsProvider, retryBackoffConfig: RetryBackoffConfig, - logger: Logger, - onFatalError?: (value: unknown) => void + logger: Logger // eslint-disable-next-line max-params ) => (rewardAccounts: Cardano.RewardAccount[], equals = isEqual): Observable => combineLatest( rewardAccounts.map((rewardAccount) => - poll({ + pollProvider({ equals, logger, - onFatalError, retryBackoffConfig, sample: () => rewardsProvider.rewardAccountBalance({ rewardAccount }), trigger$: fetchRewardsTrigger$(epoch$, txOnChain$, rewardAccount) diff --git a/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts b/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts index 4140f5ce9e5..9a54c3b5880 100644 --- a/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts +++ b/packages/wallet/src/services/DelegationTracker/RewardsHistory.ts @@ -7,7 +7,7 @@ import { RetryBackoffConfig } from 'backoff-rxjs'; import { RewardsHistory } from '../types'; import { TrackedRewardsProvider } from '../ProviderTracker'; import { TxWithEpoch } from './types'; -import { poll } from '@cardano-sdk/util-rxjs'; +import { pollProvider } from '../util'; import first from 'lodash/first.js'; import flatten from 'lodash/flatten.js'; import sortBy from 'lodash/sortBy.js'; @@ -24,13 +24,11 @@ export const createRewardsHistoryProvider = ( rewardAccounts: Cardano.RewardAccount[], lowerBound: Cardano.EpochNo | null, - logger: Logger, - onFatalError?: (value: unknown) => void + logger: Logger ): Observable> => { if (lowerBound) { - return poll({ + return pollProvider({ logger, - onFatalError, retryBackoffConfig, sample: () => rewardsProvider.rewardsHistory({ @@ -63,9 +61,7 @@ export const createRewardsHistoryTracker = ( rewardAccounts$: Observable, rewardsHistoryProvider: RewardsHistoryProvider, rewardsHistoryStore: KeyValueStore, - logger: Logger, - onFatalError?: (value: unknown) => void - // eslint-disable-next-line max-params + logger: Logger ): Observable => rewardAccounts$ .pipe( @@ -77,9 +73,7 @@ export const createRewardsHistoryTracker = ( .pipe(map((rewards) => new Map(rewardAccounts.map((rewardAccount, i) => [rewardAccount, rewards[i]])))), firstDelegationEpoch$(transactions$, rewardAccounts).pipe( tap((firstEpoch) => logger.debug(`Fetching history rewards since epoch ${firstEpoch}`)), - switchMap((firstEpoch) => - rewardsHistoryProvider(rewardAccounts, Cardano.EpochNo(firstEpoch!), logger, onFatalError) - ), + switchMap((firstEpoch) => rewardsHistoryProvider(rewardAccounts, Cardano.EpochNo(firstEpoch!), logger)), tap((allRewards) => rewardsHistoryStore.setAll([...allRewards.entries()].map(([key, value]) => ({ key, value }))) ) diff --git a/packages/wallet/src/services/SupplyDistributionTracker.ts b/packages/wallet/src/services/SupplyDistributionTracker.ts index ec2022f4fba..ef6e58e99e0 100644 --- a/packages/wallet/src/services/SupplyDistributionTracker.ts +++ b/packages/wallet/src/services/SupplyDistributionTracker.ts @@ -1,10 +1,9 @@ import { Logger } from 'ts-log'; import { NetworkInfoProvider } from '@cardano-sdk/core'; import { Observable } from 'rxjs'; -import { PersistentDocumentTrackerSubject } from './util'; +import { PersistentDocumentTrackerSubject, pollProvider } from './util'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { SupplyDistributionStores } from '../persistence'; -import { poll } from '@cardano-sdk/util-rxjs'; import isEqual from 'lodash/isEqual.js'; export type SupplyDistributionNetworkInfoProvider = Pick; @@ -14,7 +13,6 @@ export interface SupplyDistributionTrackerProps { trigger$: Observable; /** Failed request retry strategy */ retryBackoffConfig?: RetryBackoffConfig; - onFatalError?: (value: unknown) => void; } export interface SupplyDistributionTrackerDependencies { @@ -28,18 +26,13 @@ export interface SupplyDistributionTrackerDependencies { * @returns object that continuously fetches and emits network stats (StakeSummary and SupplySummary) */ export const createSupplyDistributionTracker = ( - { - trigger$, - retryBackoffConfig = { initialInterval: 1000, maxInterval: 60_000 }, - onFatalError - }: SupplyDistributionTrackerProps, + { trigger$, retryBackoffConfig = { initialInterval: 1000, maxInterval: 60_000 } }: SupplyDistributionTrackerProps, { logger, stores, networkInfoProvider }: SupplyDistributionTrackerDependencies ) => { const stake$ = new PersistentDocumentTrackerSubject( - poll({ + pollProvider({ equals: isEqual, logger, - onFatalError, retryBackoffConfig, sample: networkInfoProvider.stake, trigger$ @@ -48,10 +41,9 @@ export const createSupplyDistributionTracker = ( ); const lovelaceSupply$ = new PersistentDocumentTrackerSubject( - poll({ + pollProvider({ equals: isEqual, logger, - onFatalError, retryBackoffConfig, sample: networkInfoProvider.lovelaceSupply, trigger$ diff --git a/packages/wallet/src/services/TransactionsTracker.ts b/packages/wallet/src/services/TransactionsTracker.ts index 430fa430367..e23c7fc1572 100644 --- a/packages/wallet/src/services/TransactionsTracker.ts +++ b/packages/wallet/src/services/TransactionsTracker.ts @@ -32,8 +32,8 @@ import { FailedTx, OutgoingOnChainTx, OutgoingTx, TransactionFailure, Transactio import { Logger } from 'ts-log'; import { Range, Shutdown, contextLogger } from '@cardano-sdk/util'; import { RetryBackoffConfig } from 'backoff-rxjs'; -import { TrackerSubject, poll } from '@cardano-sdk/util-rxjs'; -import { distinctBlock, signedTxsEquals, transactionsEquals, txEquals, txInEquals } from './util'; +import { TrackerSubject } from '@cardano-sdk/util-rxjs'; +import { distinctBlock, pollProvider, signedTxsEquals, transactionsEquals, txEquals, txInEquals } from './util'; import { WitnessedTx } from '@cardano-sdk/key-management'; import { newAndStoredMulticast } from './util/newAndStoredMulticast'; @@ -56,7 +56,6 @@ export interface TransactionsTrackerProps { }; failedFromReemitter$?: Observable; logger: Logger; - onFatalError?: (value: unknown) => void; } export interface TransactionsTrackerInternals { @@ -71,7 +70,6 @@ export interface TransactionsTrackerInternalsProps { tipBlockHeight$: Observable; store: OrderedCollectionStore; logger: Logger; - onFatalError?: (value: unknown) => void; } // Temporarily hardcoded. Will be replaced with ChainHistoryProvider 'maxPageSize' value once ADP-2249 is implemented @@ -191,28 +189,25 @@ const findIntersectionAndUpdateTxStore = ({ logger, store, retryBackoffConfig, - onFatalError, tipBlockHeight$, rollback$, localTransactions, addresses }: Pick< TransactionsTrackerInternalsProps, - 'chainHistoryProvider' | 'logger' | 'store' | 'retryBackoffConfig' | 'onFatalError' | 'tipBlockHeight$' + 'chainHistoryProvider' | 'logger' | 'store' | 'retryBackoffConfig' | 'tipBlockHeight$' > & { localTransactions: Cardano.HydratedTx[]; rollback$: Subject; addresses: Cardano.PaymentAddress[]; }) => - poll({ + pollProvider({ // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first // It should also help when using poor internet connection. // Caveat is that local transactions might get out of date... combinator: exhaustMap, equals: transactionsEquals, logger, - onFatalError, - retryBackoffConfig, // eslint-disable-next-line sonarjs/cognitive-complexity,complexity sample: async () => { @@ -363,14 +358,12 @@ export const createTransactionsTracker = ( inFlightTransactionsStore: newTransactionsStore, signedTransactionsStore, logger, - failedFromReemitter$, - onFatalError + failedFromReemitter$ }: TransactionsTrackerProps, { transactionsSource$: txSource$, rollback$ }: TransactionsTrackerInternals = createAddressTransactionsProvider({ addresses$, chainHistoryProvider, logger: contextLogger(logger, 'AddressTransactionsProvider'), - onFatalError, retryBackoffConfig, store: transactionsStore, tipBlockHeight$: distinctBlock(tip$) diff --git a/packages/wallet/src/services/UtxoTracker.ts b/packages/wallet/src/services/UtxoTracker.ts index 1bc62863926..4c836dbf8d4 100644 --- a/packages/wallet/src/services/UtxoTracker.ts +++ b/packages/wallet/src/services/UtxoTracker.ts @@ -1,11 +1,10 @@ import { Cardano, UtxoProvider } from '@cardano-sdk/core'; import { Logger } from 'ts-log'; import { NEVER, Observable, combineLatest, concat, distinctUntilChanged, map, of, shareReplay, switchMap } from 'rxjs'; -import { PersistentCollectionTrackerSubject, txInEquals, utxoEquals } from './util'; +import { PersistentCollectionTrackerSubject, pollProvider, txInEquals, utxoEquals } from './util'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { TxInFlight, UtxoTracker } from './types'; import { WalletStores } from '../persistence'; -import { poll } from '@cardano-sdk/util-rxjs'; import { sortUtxoByTxIn } from '@cardano-sdk/input-selection'; import chunk from 'lodash/chunk.js'; import uniqWith from 'lodash/uniqWith.js'; @@ -21,7 +20,6 @@ export interface UtxoTrackerProps { history$: Observable; retryBackoffConfig: RetryBackoffConfig; logger: Logger; - onFatalError?: (value: unknown) => void; } export interface UtxoTrackerInternals { @@ -34,16 +32,13 @@ export const createUtxoProvider = ( addresses$: Observable, history$: Observable, retryBackoffConfig: RetryBackoffConfig, - logger: Logger, - onFatalError?: (value: unknown) => void - // eslint-disable-next-line max-params + logger: Logger ) => addresses$.pipe( switchMap((paymentAddresses) => - poll({ + pollProvider({ equals: utxoEquals, logger, - onFatalError, retryBackoffConfig, sample: async () => { let utxos = new Array(); @@ -62,19 +57,10 @@ export const createUtxoProvider = ( ); export const createUtxoTracker = ( - { - utxoProvider, - addresses$, - stores, - transactionsInFlight$, - retryBackoffConfig, - history$, - logger, - onFatalError - }: UtxoTrackerProps, + { utxoProvider, addresses$, stores, transactionsInFlight$, retryBackoffConfig, history$, logger }: UtxoTrackerProps, { utxoSource$ = new PersistentCollectionTrackerSubject( - () => createUtxoProvider(utxoProvider, addresses$, history$, retryBackoffConfig, logger, onFatalError), + () => createUtxoProvider(utxoProvider, addresses$, history$, retryBackoffConfig, logger), stores.utxo ), unspendableUtxoSource$ = new PersistentCollectionTrackerSubject( diff --git a/packages/wallet/src/services/util/index.ts b/packages/wallet/src/services/util/index.ts index 60c13f31784..c7065a4467c 100644 --- a/packages/wallet/src/services/util/index.ts +++ b/packages/wallet/src/services/util/index.ts @@ -3,3 +3,4 @@ export * from './equals'; export * from './trigger'; export * from './connectionStatusTracker'; export * from './newAndStoredMulticast'; +export * from './pollProvider'; diff --git a/packages/wallet/src/services/util/pollProvider.ts b/packages/wallet/src/services/util/pollProvider.ts new file mode 100644 index 00000000000..a291cc46d86 --- /dev/null +++ b/packages/wallet/src/services/util/pollProvider.ts @@ -0,0 +1,28 @@ +import { PollProps, poll } from '@cardano-sdk/util-rxjs'; +import { ProviderError, ProviderFailure } from '@cardano-sdk/core'; +import { catchError } from 'rxjs'; + +export type PollProviderProps = Omit, 'retryBackoffConfig'> & { + retryBackoffConfig: Omit['retryBackoffConfig'], 'shouldRetry'>; +}; + +export const pollProvider = (props: PollProviderProps) => + poll({ + ...props, + retryBackoffConfig: { + ...props.retryBackoffConfig, + shouldRetry: (error) => { + if (error instanceof ProviderError) { + return ![ProviderFailure.NotImplemented, ProviderFailure.BadRequest].includes(error.reason); + } + return false; + } + } + }).pipe( + catchError((error) => { + if (error instanceof ProviderError) { + throw error; + } + throw new ProviderError(ProviderFailure.Unknown, error); + }) + ); diff --git a/packages/wallet/src/types.ts b/packages/wallet/src/types.ts index 86334ebbbca..32ce0498fc6 100644 --- a/packages/wallet/src/types.ts +++ b/packages/wallet/src/types.ts @@ -113,12 +113,6 @@ export interface ObservableWallet { }; /** All owned and historical assets */ readonly assetInfo$: Observable; - /** - * This is the catch all Observable for fatal errors emitted by the Wallet. - * Once errors are emitted, probably the only available recovery action is to - * shutdown the Wallet and to create a new one. - */ - readonly fatalError$: Observable; readonly syncStatus: SyncStatus; getName(): Promise; diff --git a/packages/wallet/test/PersonalWallet/load.test.ts b/packages/wallet/test/PersonalWallet/load.test.ts index 7f98e025237..ba67c9596e6 100644 --- a/packages/wallet/test/PersonalWallet/load.test.ts +++ b/packages/wallet/test/PersonalWallet/load.test.ts @@ -17,6 +17,8 @@ import { ChainHistoryProvider, HandleProvider, NetworkInfoProvider, + ProviderError, + ProviderFailure, RewardsProvider, UtxoProvider, coalesceValueQuantities @@ -30,7 +32,6 @@ import { somePartialStakePools } from '@cardano-sdk/util-dev'; import { InvalidConfigurationError } from '@cardano-sdk/tx-construction'; -import { InvalidStringError } from '@cardano-sdk/util'; import { ReplaySubject, firstValueFrom } from 'rxjs'; import { WalletStores, createInMemoryWalletStores } from '../../src/persistence'; import { dummyLogger as logger } from 'ts-log'; @@ -76,7 +77,7 @@ export class MockAddressDiscovery implements AddressDiscovery { public async discover(): Promise { if (this.#currentAttempt <= this.#resolveAfterAttempts) { ++this.#currentAttempt; - throw new Error('An error occurred during the discovery process.'); + throw new ProviderError(ProviderFailure.Unknown, 'An error occurred during the discovery process.'); } return this.#addresses; @@ -519,58 +520,3 @@ describe('BaseWallet.AddressDiscovery', () => { wallet.shutdown(); }); }); - -describe('BaseWallet.fatalError$', () => { - it('emits non retryable errors', async () => { - const stores = createInMemoryWalletStores(); - const tipHandler = jest.fn(); - const utxoSet = generateUtxos(30, 10); - - const wallet = await createWallet({ - providers: { - chainHistoryProvider: mocks.mockChainHistoryProvider(), - networkInfoProvider: { - ...mocks.mockNetworkInfoProvider(), - ledgerTip: jest.fn().mockRejectedValue(new InvalidStringError('Test invalid string error')) - }, - rewardsProvider: mocks.mockRewardsProvider(), - utxoProvider: mocks.mockUtxoProvider({ utxoSet }) - }, - stores - }); - - // wallet.fatalError$ must be observed till the beginning of time - const errorPromise = expect(firstValueFrom(wallet.fatalError$)).resolves.toBeInstanceOf(InvalidStringError); - - wallet.tip$.subscribe(tipHandler); - - await errorPromise; - - wallet.shutdown(); - - expect(tipHandler).not.toBeCalled(); - }); - - it('Observables work even if BaseWallet.fatalError$ is not observed', async () => { - const stores = createInMemoryWalletStores(); - const testValue = { test: 'value' }; - const utxoSet = generateUtxos(30, 10); - - const wallet = await createWallet({ - providers: { - chainHistoryProvider: mocks.mockChainHistoryProvider(), - networkInfoProvider: { - ...mocks.mockNetworkInfoProvider(), - ledgerTip: jest.fn().mockResolvedValue(testValue) - }, - rewardsProvider: mocks.mockRewardsProvider(), - utxoProvider: mocks.mockUtxoProvider({ utxoSet }) - }, - stores - }); - - await expect(firstValueFrom(wallet.tip$)).resolves.toBe(testValue); - - wallet.shutdown(); - }); -}); diff --git a/packages/wallet/test/integration/CustomObservableWallet.test.ts b/packages/wallet/test/integration/CustomObservableWallet.test.ts index 4b7543d1a05..a53c7fa88ac 100644 --- a/packages/wallet/test/integration/CustomObservableWallet.test.ts +++ b/packages/wallet/test/integration/CustomObservableWallet.test.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/no-unused-expressions */ /* eslint-disable sonarjs/no-extra-arguments */ /* eslint-disable unicorn/consistent-function-scoping */ -import { BaseWallet, ObservableWallet, createPersonalWallet } from '../../src'; +import { BaseWallet, ObservableWallet, createPersonalWallet, pollProvider } from '../../src'; import { Bip32Account, GroupedAddress, util } from '@cardano-sdk/key-management'; import { Cardano, Serialization } from '@cardano-sdk/core'; import { @@ -10,7 +10,6 @@ import { createOutputValidator } from '@cardano-sdk/tx-construction'; import { RetryBackoffConfig } from 'backoff-rxjs'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; import { createStubStakePoolProvider, mockProviders as mocks } from '@cardano-sdk/util-dev'; import { firstValueFrom, of, timer } from 'rxjs'; import { dummyLogger as logger } from 'ts-log'; @@ -87,37 +86,37 @@ describe('CustomObservableWallet', () => { // for the sake of simplicity, let's say we don't care about wallet's persistence/restoration // and want to just re-fetch data upon every subscription and then update using some interval. // If we did want more features, there are other SDK utils we could use - addresses$: coldObservableProvider({ + addresses$: pollProvider({ logger, - provider: getAddresses, retryBackoffConfig, + sample: getAddresses, trigger$: walletUpdateTrigger$ }), balance: { rewardAccounts: { // can entirely bypass SDK and it's utils, providing custom observables deposit$: of(200_000n), - rewards$: coldObservableProvider({ + rewards$: pollProvider({ logger, - provider: getAvailableRewardAccountsDeposit, retryBackoffConfig, + sample: getAvailableRewardAccountsDeposit, trigger$: walletUpdateTrigger$ }) }, utxo: { - available$: coldObservableProvider({ + available$: pollProvider({ logger, - provider: getAvailableUtxoBalance, retryBackoffConfig, + sample: getAvailableUtxoBalance, trigger$: walletUpdateTrigger$ }) } }, delegation: { - rewardAccounts$: coldObservableProvider({ + rewardAccounts$: pollProvider({ logger, - provider: getRewardAccountsDelegation, retryBackoffConfig, + sample: getRewardAccountsDelegation, trigger$: walletUpdateTrigger$ }) }, diff --git a/packages/wallet/test/services/AssetsTracker.test.ts b/packages/wallet/test/services/AssetsTracker.test.ts index 4a4917f0a43..e86d6d89eee 100644 --- a/packages/wallet/test/services/AssetsTracker.test.ts +++ b/packages/wallet/test/services/AssetsTracker.test.ts @@ -397,7 +397,6 @@ describe('createAssetsTracker', () => { describe('createAssetService', () => { let assetProvider: TrackedAssetProvider; const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 2 }; - const onFatalError = jest.fn(); beforeEach(() => { assetProvider = { @@ -440,14 +439,7 @@ describe('createAssetService', () => { coins: 0n }); - const assetService = createAssetService( - assetProvider, - assetCache$, - totalBalance$, - retryBackoffConfig, - logger, - onFatalError - ); + const assetService = createAssetService(assetProvider, assetCache$, totalBalance$, retryBackoffConfig, logger); const result$ = assetService([AssetId.TSLA, AssetId.PXL]); @@ -483,14 +475,7 @@ describe('createAssetService', () => { const assetCache$ = of(cachedAssets); const totalBalance$ = of({ assets: new Map([[AssetId.TSLA, 1000n]]), coins: 0n }); - const assetService = createAssetService( - assetProvider, - assetCache$, - totalBalance$, - retryBackoffConfig, - logger, - onFatalError - ); + const assetService = createAssetService(assetProvider, assetCache$, totalBalance$, retryBackoffConfig, logger); const result$ = assetService([AssetId.TSLA, AssetId.PXL, AssetId.Unit]); @@ -514,14 +499,7 @@ describe('createAssetService', () => { const assetCache$ = of(new Map()); const totalBalance$ = of({ assets: new Map(), coins: 0n }); - const assetService = createAssetService( - assetProvider, - assetCache$, - totalBalance$, - retryBackoffConfig, - logger, - onFatalError - ); + const assetService = createAssetService(assetProvider, assetCache$, totalBalance$, retryBackoffConfig, logger); const result$ = assetService([AssetId.TSLA, AssetId.PXL]); @@ -573,14 +551,7 @@ describe('createAssetService', () => { coins: 0n }); - const assetService = createAssetService( - assetProvider, - assetCache$, - totalBalance$, - retryBackoffConfig, - logger, - onFatalError - ); + const assetService = createAssetService(assetProvider, assetCache$, totalBalance$, retryBackoffConfig, logger); const result$ = assetService([AssetId.TSLA, AssetId.PXL]); diff --git a/packages/wallet/test/services/DelegationTracker/DelegationTracker.test.ts b/packages/wallet/test/services/DelegationTracker/DelegationTracker.test.ts index bdfd03788e0..53d47f35ca6 100644 --- a/packages/wallet/test/services/DelegationTracker/DelegationTracker.test.ts +++ b/packages/wallet/test/services/DelegationTracker/DelegationTracker.test.ts @@ -1,23 +1,22 @@ import * as Crypto from '@cardano-sdk/crypto'; import { Cardano, ChainHistoryProvider, metadatum } from '@cardano-sdk/core'; import { RetryBackoffConfig } from 'backoff-rxjs'; -import { TransactionsTracker, createDelegationPortfolioTracker } from '../../../src/services'; +import { TransactionsTracker, createDelegationPortfolioTracker, pollProvider } from '../../../src/services'; import { certificateTransactionsWithEpochs, createBlockEpochProvider } from '../../../src/services/DelegationTracker'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; import { createStubTxWithCertificates, createStubTxWithSlot } from './stub-tx'; import { createTestScheduler, logger } from '@cardano-sdk/util-dev'; -jest.mock('@cardano-sdk/util-rxjs', () => { - const originalModule = jest.requireActual('@cardano-sdk/util-rxjs'); - return { ...originalModule, coldObservableProvider: jest.fn() }; +jest.mock('../../../src/services/util/pollProvider', () => { + const originalModule = jest.requireActual('../../../src/services/util/pollProvider'); + return { ...originalModule, pollProvider: jest.fn() }; }); describe('DelegationTracker', () => { - const coldObservableProviderMock = coldObservableProvider as jest.MockedFunction; + const pollProviderMock = pollProvider as jest.MockedFunction; test('createBlockEpochProvider', () => { createTestScheduler().run(({ cold, expectObservable, flush }) => { - coldObservableProviderMock.mockReturnValue( + pollProviderMock.mockReturnValue( cold('a-b', { a: [{ epoch: 100 }], b: [{ epoch: 100 }, { epoch: 101 }] @@ -34,7 +33,7 @@ describe('DelegationTracker', () => { b: [100, 101] }); flush(); - expect(coldObservableProviderMock).toBeCalledTimes(1); + expect(pollProviderMock).toBeCalledTimes(1); }); }); diff --git a/packages/wallet/test/services/DelegationTracker/RewardAccounts.test.ts b/packages/wallet/test/services/DelegationTracker/RewardAccounts.test.ts index d6202331985..e4baef516dd 100644 --- a/packages/wallet/test/services/DelegationTracker/RewardAccounts.test.ts +++ b/packages/wallet/test/services/DelegationTracker/RewardAccounts.test.ts @@ -18,26 +18,26 @@ import { createQueryStakePoolsProvider, createRewardsProvider, fetchRewardsTrigger$, - getStakePoolIdAtEpoch + getStakePoolIdAtEpoch, + pollProvider } from '../../../src'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { TxWithEpoch } from '../../../src/services/DelegationTracker/types'; -import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; import { createTestScheduler, logger, mockProviders } from '@cardano-sdk/util-dev'; import { dummyCbor } from '../../util'; -const { currentEpoch, generateStakePools, mockStakePoolsProvider } = mockProviders; - -jest.mock('@cardano-sdk/util-rxjs', () => { - const actual = jest.requireActual('@cardano-sdk/util-rxjs'); +jest.mock('../../../src/services/util/pollProvider', () => { + const actual = jest.requireActual('../../../src/services/util/pollProvider'); return { ...actual, - coldObservableProvider: jest.fn().mockImplementation((...args) => actual.coldObservableProvider(...args)) + pollProvider: jest.fn().mockImplementation((...args) => actual.pollProvider(...args)) }; }); +const { currentEpoch, generateStakePools, mockStakePoolsProvider } = mockProviders; + describe('RewardAccounts', () => { - const coldObservableProviderMock = coldObservableProvider as jest.MockedFunction; + const pollProviderMock = pollProvider as jest.MockedFunction; const txId1 = Cardano.TransactionId('0000000000000000000000000000000000000000000000000000000000000000'); const txId2 = Cardano.TransactionId('295d5e0f7ee182426eaeda8c9f1c63502c72cdf4afd6e0ee0f209adf94a614e7'); const poolId1 = Cardano.PoolId('pool1zuevzm3xlrhmwjw87ec38mzs02tlkwec9wxpgafcaykmwg7efhh'); @@ -63,7 +63,7 @@ describe('RewardAccounts', () => { drepInfo$ = jest.fn( (drepIds: Cardano.DRepID[]): Observable => of(drepIds.map((id) => ({ active: true, id } as DRepInfo))) ); - coldObservableProviderMock.mockClear(); + pollProviderMock.mockClear(); }); test.todo('createQueryStakePoolsProvider emits stored values if they exist, updates storage when provider resolves'); @@ -492,7 +492,7 @@ describe('RewardAccounts', () => { const epoch$ = null as unknown as Observable; // not used in this test const onChainTx$ = EMPTY as Observable; createTestScheduler().run(({ cold, expectObservable, flush }) => { - coldObservableProviderMock + pollProviderMock .mockReturnValueOnce( cold('a-b-c', { a: 0n, @@ -512,7 +512,7 @@ describe('RewardAccounts', () => { c: [5n, 3n] // duplicates are filtered in the coldObservable and this one is fake and emits duplicates }); flush(); - expect(coldObservableProviderMock).toBeCalledTimes(2); + expect(pollProviderMock).toBeCalledTimes(2); }); }); diff --git a/packages/wallet/test/services/DelegationTracker/RewardsHistory.test.ts b/packages/wallet/test/services/DelegationTracker/RewardsHistory.test.ts index f49f71d9267..37ba58e3e4c 100644 --- a/packages/wallet/test/services/DelegationTracker/RewardsHistory.test.ts +++ b/packages/wallet/test/services/DelegationTracker/RewardsHistory.test.ts @@ -1,4 +1,3 @@ -/* eslint-disable unicorn/no-useless-undefined */ import * as Crypto from '@cardano-sdk/crypto'; import { Cardano } from '@cardano-sdk/core'; import { InMemoryRewardsHistoryStore } from '../../../src/persistence'; @@ -88,8 +87,7 @@ describe('RewardsHistory', () => { expect(getRewardsHistory).toBeCalledWith( rewardAccounts, Cardano.EpochNo(calcFirstDelegationEpoch(epoch)), - logger, - undefined + logger ); }); } @@ -143,8 +141,7 @@ describe('RewardsHistory', () => { expect(getRewardsHistory).toBeCalledWith( rewardAccounts, Cardano.EpochNo(calcFirstDelegationEpoch(epoch)), - logger, - undefined + logger ); }); } diff --git a/packages/wallet/test/services/util/pollProvider.test.ts b/packages/wallet/test/services/util/pollProvider.test.ts new file mode 100644 index 00000000000..84aeb49a43f --- /dev/null +++ b/packages/wallet/test/services/util/pollProvider.test.ts @@ -0,0 +1,35 @@ +import { ProviderError, ProviderFailure } from '@cardano-sdk/core'; +import { RetryBackoffConfig } from 'backoff-rxjs'; +import { firstValueFrom } from 'rxjs'; +import { logger } from '@cardano-sdk/util-dev'; +import { pollProvider } from '../../../src'; + +describe('pollProvider', () => { + it('retries retryable ProviderError', async () => { + const sample = jest + .fn() + .mockRejectedValueOnce(new ProviderError(ProviderFailure.ConnectionFailure)) + .mockResolvedValue(true); + const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1 }; + const values$ = pollProvider({ logger, retryBackoffConfig, sample }); + const resolvedValue = await firstValueFrom(values$); + expect(sample).toBeCalledTimes(2); + expect(resolvedValue).toBe(true); + }); + + it('does not retry non-retryable ProviderError', async () => { + const sample = jest.fn().mockRejectedValueOnce(new ProviderError(ProviderFailure.BadRequest)); + const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1 }; + const values$ = pollProvider({ logger, retryBackoffConfig, sample }); + await expect(firstValueFrom(values$)).rejects.toThrowError(ProviderError); + expect(sample).toBeCalledTimes(1); + }); + + it('does not retry errors other than ProviderError and wraps them in ProviderError', async () => { + const sample = jest.fn().mockRejectedValueOnce(new Error('other error')); + const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1 }; + const values$ = pollProvider({ logger, retryBackoffConfig, sample }); + await expect(firstValueFrom(values$)).rejects.toThrowError(ProviderError); + expect(sample).toBeCalledTimes(1); + }); +}); diff --git a/packages/web-extension/src/observableWallet/util.ts b/packages/web-extension/src/observableWallet/util.ts index 211636da25c..fba0b01b7ef 100644 --- a/packages/web-extension/src/observableWallet/util.ts +++ b/packages/web-extension/src/observableWallet/util.ts @@ -125,7 +125,6 @@ export const observableWalletProperties: RemoteApiProperties = }, discoverAddresses: RemoteApiPropertyType.MethodReturningPromise, eraSummaries$: RemoteApiPropertyType.HotObservable, - fatalError$: RemoteApiPropertyType.HotObservable, finalizeTx: RemoteApiPropertyType.MethodReturningPromise, genesisParameters$: RemoteApiPropertyType.HotObservable, getName: RemoteApiPropertyType.MethodReturningPromise,