Skip to content

Commit

Permalink
feat(EthereumRegistryWriter): onCidAdded + transaction receipts (#953)
Browse files Browse the repository at this point in the history
  • Loading branch information
lautarodragan authored Dec 2, 2019
1 parent 50028ec commit 81937ff
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 256 deletions.
480 changes: 258 additions & 222 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,22 @@
"bitcoin-core": "2.2.0",
"bs58": "4.0.1",
"form-data": "2.3.3",
"fp-ts": "2.2.0",
"ipfs-http-client": "28.1.0",
"joi": "14.3.0",
"koa": "2.6.2",
"koa-body": "4.0.4",
"koa-cors": "0.0.16",
"koa-helmet": "4.0.0",
"koa-router": "7.4.0",
"luxon": "1.21.2",
"mongodb": "3.1.10",
"node-fetch": "1.7.3",
"pino": "4.17.6",
"protobufjs": "6.8.8",
"ramda": "0.26.1",
"string-to-stream": "1.1.1",
"web3": "1.2.2"
"web3": "1.2.4"
},
"devDependencies": {
"@po.et/tslint-rules": "2.2.0",
Expand All @@ -84,6 +86,7 @@
"@types/koa": "2.0.47",
"@types/koa-helmet": "3.1.2",
"@types/koa-router": "7.0.35",
"@types/luxon": "1.21.0",
"@types/mongodb": "3.3.8",
"@types/node-fetch": "1.6.9",
"@types/pino": "4.16.1",
Expand Down
4 changes: 4 additions & 0 deletions src/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ export interface Configuration extends LoggingConfiguration, BitcoinRPCConfigura
readonly ethereumRegistryPrivateKey: string
readonly ethereumRegistryUploadAnchorReceiptIntervalInSeconds: number
readonly ethereumRegistryRegisterNextDirectoryIntervalInSeconds: number
readonly ethereumGasPrice: number
readonly ethereumMaximumUnconfirmedTransactionAgeInSeconds: number
}

export interface LoggingConfiguration {
Expand Down Expand Up @@ -141,6 +143,8 @@ export const DefaultConfiguration: Configuration = {
ethereumRegistryPrivateKey: '',
ethereumRegistryUploadAnchorReceiptIntervalInSeconds: 60,
ethereumRegistryRegisterNextDirectoryIntervalInSeconds: 60,
ethereumGasPrice: 1e9,
ethereumMaximumUnconfirmedTransactionAgeInSeconds: 5 * 60 * 1000,

exchangeAnchorNextHashRequest: 'ANCHOR_NEXT_HASH_REQUEST',
exchangeBatchReaderReadNextDirectoryRequest: 'BATCH_READER::READ_NEXT_DIRECTORY_REQUEST',
Expand Down
175 changes: 158 additions & 17 deletions src/EthereumRegistryWriter/Business.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { PoetBlockAnchor } from '@po.et/poet-js'
import { DateTime } from 'luxon'
import { Collection, Db } from 'mongodb'
import * as Pino from 'pino'
import { identity } from 'ramda'
import { identity, map, filter, tap, complement } from 'ramda'
import { TransactionReceipt } from 'web3-core'

import { EthereumRegistryContract } from 'Helpers/EthereumRegistryContract'
import { IPFS } from 'Helpers/IPFS'
import { childWithFileName } from 'Helpers/Logging'
import { asyncPipe } from 'Helpers/asyncPipe'
import { ClaimIPFSHashPair } from 'Interfaces'

export interface Dependencies {
Expand All @@ -15,8 +18,13 @@ export interface Dependencies {
readonly ethereumRegistryContract: EthereumRegistryContract
}

export interface Configuration {
readonly maximumUnconfirmedTransactionAgeInSeconds: number
}

export interface Arguments {
readonly dependencies: Dependencies
readonly configuration: Configuration
}

export interface Business {
Expand All @@ -28,7 +36,15 @@ export interface Business {
readonly confirmClaimFiles: (claimIPFSHashPairs: ReadonlyArray<ClaimIPFSHashPair>) => Promise<void>
readonly uploadNextAnchorReceipt: () => Promise<void>
readonly uploadNextClaimFileAnchorReceiptPair: () => Promise<void>
readonly registerNextDirectory: () => Promise<void>
readonly writeNextDirectoryToEthereum: () => Promise<void>
readonly setRegistryIndex: (
confirmClaimAndAnchorReceiptDirectory: string,
registryIndex: number,
transactionHash: string,
blockHash: string,
blockNumber: number,
) => Promise<void>
readonly getEthereumTransactionReceipts: () => Promise<void>
}

interface DbEntry {
Expand All @@ -40,6 +56,17 @@ interface DbEntry {
readonly batchDirectoryConfirmed?: boolean
readonly claimAndAnchorReceiptDirectory?: string
readonly registryIndex?: number
readonly registryAdditionTransactionReceipt?: {
readonly transactionHash?: string
readonly transactionCreationDate?: Date
readonly blockHash?: string
readonly blockNumber?: number, // comma due to some tslint randomness, will move to eslint in the future
}
readonly onCidAdded?: {
readonly transactionHash?: string
readonly blockHash?: string
readonly blockNumber?: number, // comma due to some tslint randomness, will move to eslint in the future
}
}

export const Business = ({
Expand All @@ -49,6 +76,9 @@ export const Business = ({
ipfs,
ethereumRegistryContract,
},
configuration: {
maximumUnconfirmedTransactionAgeInSeconds,
},
}: Arguments): Business => {
const businessLogger: Pino.Logger = childWithFileName(logger, __filename)
const claimAnchorReceiptsCollection: Collection<DbEntry> = db.collection('claimAnchorReceipts')
Expand Down Expand Up @@ -187,26 +217,135 @@ export const Business = ({
await claimAnchorReceiptsCollection.updateOne({ claimId }, { $set: { claimAndAnchorReceiptDirectory } })
}

const registerNextDirectory = async () => {
const writeNextDirectoryToEthereum = async () => {
const logger = businessLogger.child({ method: 'registerNextDirectory' })
const entry = await claimAnchorReceiptsCollection.findOne({
claimAndAnchorReceiptDirectory: { $ne: null },
registryIndex: null,
})
if (!entry)

const entry = await claimAnchorReceiptsCollection.findOneAndUpdate(
{
claimAndAnchorReceiptDirectory: { $ne: null },
$or: [
{ 'registryAdditionTransactionReceipt.transactionCreationDate': null },
{ $and: [
{ 'registryAdditionTransactionReceipt.transactionCreationDate': {
$lt: DateTime.utc().minus({ seconds: maximumUnconfirmedTransactionAgeInSeconds }).toJSDate(),
} },
{ 'registryAdditionTransactionReceipt.blockHash': null },
]},
],
},
{
$set: { 'registryAdditionTransactionReceipt.transactionCreationDate': new Date() },
},
)
if (!entry.value)
return
const { claimId, claimAndAnchorReceiptDirectory } = entry
const cidCount = await ethereumRegistryContract.getCidCount()
const { claimId, claimAndAnchorReceiptDirectory } = entry.value
logger.debug(
{ claimId, claimAndAnchorReceiptDirectory, cidCount },
'Registering next (claim + anchor receipt) directory to Ethereum',
{ claimId, claimAndAnchorReceiptDirectory },
'Adding (claim + anchor receipt) directory to Ethereum',
)
await ethereumRegistryContract.addCid(claimAndAnchorReceiptDirectory)
const transactionHash = await ethereumRegistryContract.addCid(claimAndAnchorReceiptDirectory)

logger.info(
{ claimId, claimAndAnchorReceiptDirectory, cidCount },
'(claim + anchor receipt) directory added to Ethereum',
{ claimId, claimAndAnchorReceiptDirectory, transactionHash },
'(claim + anchor receipt) transaction sent',
)
await claimAnchorReceiptsCollection.updateOne({ claimId }, { $set: { registryIndex: cidCount } })

await claimAnchorReceiptsCollection.updateOne(
{ claimAndAnchorReceiptDirectory },
{ $set: { 'registryAdditionTransactionReceipt.transactionHash': transactionHash } },
)

}

const setRegistryIndex = async (
claimAndAnchorReceiptDirectory: string,
registryIndex: number,
transactionHash: string,
blockHash: string,
blockNumber: number,
) => {
const logger = businessLogger.child({ method: 'setRegistryIndex' })
await claimAnchorReceiptsCollection.updateOne(
{ claimAndAnchorReceiptDirectory },
{ $set: { registryIndex, onCidAdded: { transactionHash, blockHash, blockNumber } } },
)
logger.info(
{ claimAndAnchorReceiptDirectory, registryIndex, blockNumber, transactionHash },
'Registry index for (claim + anchor receipt) directory set',
)
}

const getEthereumTransactionReceipts = async () => {
const logger = businessLogger.child({ method: 'getEthereumTransactionReceipts' })

logger.trace('Looking for transactions without confirmation')

const entries = await claimAnchorReceiptsCollection.find(
{
'registryAdditionTransactionReceipt.transactionHash': { $ne: null },
'registryAdditionTransactionReceipt.blockHash': null,
},
{ projection: { 'registryAdditionTransactionReceipt.transactionHash': 1 } },
).toArray()

if (!entries.length) {
logger.trace('No transactions without confirmation found')
return
}

const transactionHashes = entries.map(_ => _.registryAdditionTransactionReceipt.transactionHash)

logger.debug({ transactionHashes }, 'These transactions have no known confirmations yet')

const receiptToSimplified = ({ transactionHash, blockHash, blockNumber }: TransactionReceipt) => ({
transactionHash,
blockHash,
blockNumber,
})

const logErrors = (transactionReceipt: TransactionReceipt) => {
logger.error({ transactionReceipt }, 'Error in transaction receipt')
}

const transactionReceiptIsOk = (transactionReceipt: TransactionReceipt) => transactionReceipt.status

const filterAndLogErrors = asyncPipe(
tap(asyncPipe(
filter(complement(transactionReceiptIsOk)),
map(logErrors),
)),
filter(transactionReceiptIsOk),
)

const getReceipts = asyncPipe(
map(ethereumRegistryContract.getTransactionReceipt),
Promise.all.bind(Promise),
filter(identity),
filterAndLogErrors,
map(receiptToSimplified),
) as (transactionHashes: ReadonlyArray<string>) => Promise<ReadonlyArray<Partial<TransactionReceipt>>>

const receipts = await getReceipts(transactionHashes)

if (!receipts.length) {
logger.trace({ transactionHashes }, 'No transactions receipts available yet for these transactions')
return
}

logger.debug({ receipts }, 'Got these transaction receipts')

await Promise.all(receipts.map(({ transactionHash, blockHash, blockNumber }) =>
claimAnchorReceiptsCollection.updateOne(
{ 'registryAdditionTransactionReceipt.transactionHash': transactionHash },
{ $set: {
'registryAdditionTransactionReceipt.blockHash': blockHash,
'registryAdditionTransactionReceipt.blockNumber': blockNumber,
} },
),
))

logger.info({ receipts }, 'Transaction receipts set')
}

return {
Expand All @@ -218,6 +357,8 @@ export const Business = ({
confirmClaimFiles,
uploadNextAnchorReceipt,
uploadNextClaimFileAnchorReceiptPair,
registerNextDirectory,
writeNextDirectoryToEthereum,
setRegistryIndex,
getEthereumTransactionReceipts,
}
}
12 changes: 12 additions & 0 deletions src/EthereumRegistryWriter/EthereumRegistryWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export interface EthereumRegistryWriterConfiguration extends LoggingConfiguratio
readonly privateKey: string
readonly uploadAnchorReceiptIntervalInSeconds: number
readonly registerNextDirectoryIntervalInSeconds: number
readonly gasPrice?: number
readonly maximumUnconfirmedTransactionAgeInSeconds: number
}

type stop = () => Promise<void>
Expand All @@ -47,15 +49,21 @@ export const EthereumRegistryWriter = async (configuration: EthereumRegistryWrit
chainId: configuration.chainId,
contractAddress: configuration.contractAddress,
privateKey: configuration.privateKey,
gasPrice: configuration.gasPrice,
})

logger.info({ ethereumAccountAddress: ethereumRegistryContract.accountAddress })

const business = Business({
dependencies: {
logger,
db,
ipfs,
ethereumRegistryContract,
},
configuration: {
maximumUnconfirmedTransactionAgeInSeconds: configuration.maximumUnconfirmedTransactionAgeInSeconds,
},
})
await business.createDbIndices()

Expand All @@ -64,6 +72,7 @@ export const EthereumRegistryWriter = async (configuration: EthereumRegistryWrit
logger,
messaging,
business,
ethereumRegistryContract,
},
exchange: configuration.exchanges,
})
Expand Down Expand Up @@ -92,6 +101,9 @@ export const EthereumRegistryWriter = async (configuration: EthereumRegistryWrit
logger.debug('Closing database connection...')
await mongoClient.close()
logger.info('Database connection closed')
logger.debug('Closing WS connection to geth...')
ethereumRegistryContract.close()
logger.info('WS connection to geth closed')
logger.info('Stopped EthereumRegistryWriter...')
}

Expand Down
Loading

0 comments on commit 81937ff

Please sign in to comment.