-
Notifications
You must be signed in to change notification settings - Fork 7
/
substrateetl
executable file
·152 lines (131 loc) · 6.06 KB
/
substrateetl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env node
const { Command } = require('commander');
const { BigQuery } = require('@google-cloud/bigquery')
const fs = require('fs');
class SubstrateETL {
project = "substrate-etl";
async execute_bqJob(sqlQuery, fn = false) {
// run bigquery job with suitable credentials
const bigqueryClient = new BigQuery();
const options = {
query: sqlQuery,
location: 'us-central1',
};
try {
let f = fn ? await fs.openSync(fn, "w", 0o666) : false;
const response = await bigqueryClient.createQueryJob(options);
const job = response[0];
const [rows] = await job.getQueryResults();
let recs = 0;
if ( fn ) console.log("writing to ", fn);
rows.forEach(r => {
if ( f ) {
// write to fn
fs.writeSync(f, JSON.stringify(r) + "\r\n");
recs++;
} else {
console.log(JSON.stringify(r));
}
})
if ( fn ) console.log(recs, " records written")
} catch (err) {
console.log(err);
throw new Error(`An error has occurred.`);
}
}
async chains(o) {
let { relayChain } = o;
let sql = `select * from ${this.project}.${relayChain}.chains`
await this.execute_bqJob(sql);
}
async export_chain_data(output, o, fn) {
let { relayChain, paraId, startDate, endDate, startBlock, endBlock } = o;
let sql = `select * from \`${this.project}.${relayChain}.${output}${paraId}\` where DATE(block_time) >= "${startDate}" and DATE(block_time) <= "${endDate}"`
let fld = ( output == "blocks" ) ? "number" : "block_number";
if ( startBlock ) sql += ` and ${fld} >= ${startBlock}`;
if ( endBlock ) sql += ` and ${fld} <= ${endBlock}`;
if ( output == "traces" ) {
sql += ` order by block_number, trace_id`;
} else if ( output == "extrinsics" ) {
sql += ` order by block_number, extrinsic_id`;
} else if ( output == "events" ) {
sql += ` order by block_number, event_id`;
} else if ( output == "logs" ) {
sql += ` order by block_number`;
} else if ( output == "blocks" ) {
sql += ` order by number`;
}
await this.execute_bqJob(sql, fn);
}
async export_xcmtransfers(o) {
let { relayChain, startDate, endDate, xcmtransfersOutput } = o;
let output = "xcmtransfers";
let sql = `select * from \`${this.project}.${relayChain}.${output}\` where DATE(block_time) >= "${startDate}" and DATE(block_time) <= "${endDate}"`
await this.execute_bqJob(sql, xcmtransfersOutput);
}
}
async function main() {
const program = new Command();
// compute yesterday as a default
var d = new Date();
d.setDate(d.getDate() - 1);
let dd = d.getUTCDate().toString().padStart(2, '0');
let mm = String(d.getUTCMonth() + 1).padStart(2, '0'); //January is 0!
let yyyy = d.getUTCFullYear();
let yesterday = `${yyyy}-${mm}-${dd}`;
program
.name('substrateetl')
.description('Substrate ETL CLI for Polkadot + Kusama Network Substrate chains (C) 2023 Colorful Notion, Inc.')
.version('0.0.1');
program.command('chains')
.description('Dump all chains of relaychain')
.option('-r, --relay-chain <relaychain>', 'relay chain (polkadot or kusama)', 'polkadot')
.action(async (o) => {
let substrateetl = new SubstrateETL();
await substrateetl.chains(o);
});
program.command('export')
.description('Export chain data')
.option('-r, --relay-chain <relaychain>', 'relay chain (polkadot or kusama)', 'polkadot')
.option('-p, --para-id <paraid>', 'The paraid (if not supplied paraid=0 is the relaychain)', '0')
.option('-s, --start-date <startdate>', 'Start date', yesterday)
.option('-e, --end-date <enddate>', 'End date', yesterday)
.option('--start-block <startblock>', 'Start block')
.option('--end-block <endblock>', 'End block')
.option('--blocks-output <output>', 'The output file for blocks. If not provided blocks data will not be exported.')
.option('--extrinsics-output <output>', 'The output file for extrinsics. If not provided extrinsics data will not be exported.')
.option('--traces-output <output>', 'The output file for traces. If not provided traces data will not be exported.')
.option('--events-output <output>', 'The output file for events. If not provided events data will not be exported.')
.option('--logs-output <output>', 'The output file for logs. If not provided logs data will not be exported.')
.option('--transfers-output <output>', 'The output file for logs. If not provided transfers data will not be exported.')
.option('--specversions-output <output>', 'The output file for logs. If not provided spec versions data will not be exported.')
.action(async (o) => {
let substrateetl = new SubstrateETL();
if ( o.blocksOutput ) await substrateetl.export_chain_data("blocks", o, o.blocksOutput);
if ( o.extrinsicsOutput ) await substrateetl.export_chain_data("extrinsics", o, o.extrinsicsOutput);
if ( o.tracesOutput ) await substrateetl.export_chain_data("traces", o, o.tracesOutput);
if ( o.eventsOutput ) await substrateetl.export_chain_data("events", o, o.eventsOutput);
if ( o.logsOutput ) await substrateetl.export_chain_data("logs", o, o.logsOutput);
if ( o.transfersOutput ) await substrateetl.export_chain_data("transfers", o, o.transfersOutput);
if ( o.specversionsOutput ) await substrateetl.export_chain_data("specversions", o, o.specversionsOutput);
});
program.command('xcmtransfers')
.description('Export xcmtransfers data')
.option('-r, --relay-chain <relaychain>', 'relay chain (polkadot or kusama)', 'polkadot')
.option('-s, --start-date <startdate>', 'Start date', yesterday)
.option('-e, --end-date <enddate>', 'End date', yesterday)
.option('--xcmtransfers-output <output>', 'The output file for xcmtransfers. If not provided xcmtransfers data will not be exported.', "xcmtransfers.json")
.action(async (o) => {
let substrateetl = new SubstrateETL();
if ( o.xcmtransfersOutput ) await substrateetl.export_xcmtransfers(o);
});
await program.parseAsync(process.argv);
}
main()
.then(() => {
process.exit(0);
})
.catch((e) => {
console.error('ERROR', e);
process.exit(1);
});