Stream blockchain data into your backend via webhooks. Ethereum, Polygon, Avalanche, BNB Chain, Fantom, Cronos and all testnets are supported. More networks to be added soon.
Get events streamed to your backend whenever:
- An address sends, receives, stakes, swaps or burns assets
- An asset is being sent, received, staked, swapped or burned
- A battle starts in your web3 game
- Someone participates in your token sale
- Any other smart contract event fires on-chain based on your filters
- Fully typed
- Contract Factories supported
- Filters supported 🔥 see examples
- Internal transactions supported 🥃
This README will introduce you to Moralis Streams API.
Chain | Streams | Confirmations Until Confirmed read more | Internal Tx (all will be supported at launch) |
---|---|---|---|
ETH | ✅ | 12 | ✅ |
ROPSTEN | ✅ | 12 | ❌ |
GOERLI | ✅ | 12 | ❌ |
BSC | ✅ | 18 | ✅ |
BSC TEST | ✅ | 18 | ✅ |
POLYGON | ✅ | 100 | ✅ |
MUMBAI | ✅ | 100 | ✅ |
FANTOM | ✅ | 100 | ❌ |
AVAX | ✅ | 100 | ❌ |
AVAX TEST | ✅ | 100 | ❌ |
CRONOS | ✅ | 100 | ❌ |
CRONOS TEST | ✅ | 100 | ❌ |
- This guide will be using Moralis SDK to interact with Moralis Streams API. But you can interact directly with with Streams API using these Swagger Docs
- NestJS Demo
- Create an account on Moralis.io
- Get your API key here
This is not a finished product but a preview.
- High availability is not deployed yet
- We may have to purge the history and logs
- You may experience some bugs
- The UI is not fully built out or polished - slick dashboards soon ftw 🤩
- The response structure may change
- We are still collecting feedback and may adjust according to it
- You can create maximum 100 streams
- Pricing and limitaions based on pricing will be announced in October - during beta there are no limitations outside of the one mentioned above
We expect a full production ready roll-out in October.
If you experience any issues - please let us know in this dedicated Moralis Forum.
These are features that are not included in this beta but that will be out in the near future:
- Including multiple assets or addresses in the same stream
- Mempool support
- Automatic retries (browsing failed deliveries and manually retrying is possible in beta)
We will be using Moralis SDK to interact with Moralis Streams API.
You can install the Moralis SDK via npm/pnpm/yarn:
npm install moralis
In this example we will monitor a wallet.
Meaning all incoming and outgoing transactions of that wallet will be monitored!
We will use Moralis SDK to create a stream.
You can install the Moralis SDK via npm/pnpm/yarn:
npm install moralis
import Moralis from "moralis";
import { EvmChain } from "@moralisweb3/evm-utils";
Moralis.start({
apiKey: "YOUR_API_KEY",
});
const stream = {
chains: [EvmChain.ETHEREUM, EvmChain.POLYGON], // list of blockchains to monitor
description: "monitor Bobs wallet", // your description
tag: "bob", // give it a tag
webhookUrl: "https://YOUR_WEBHOOK_URL", // webhook url to receive events,
};
const newStream = await Moralis.Streams.add(stream);
const { id } = newStream.toJSON(); // { id: 'YOUR_STREAM_ID', ...newStream }
// Now we attach bobs address to the stream
const address = "0x68b3f12d6e8d85a8d3dbbc15bba9dc5103b888a4";
await Moralis.Streams.addAddress({ address, id });
- Go to http://admin.moralis.io/streams
- Click on
New Stream
button. - Fill out the Form:
- Address:
0x68b3f12d6e8d85a8d3dbbc15bba9dc5103b888a4
- Description:
monitor Bobs wallet
- Webhook URL:
https://YOUR_WEBHOOK_URL
- Tag:
bob
- Select Blockchain (e.g. Ethereum Mainnet)
- Address:
- Click on
Create Stream
If you want to add multiple addresses to another stream you can always attach them to an existing stream.
await Moralis.Streams.addAddress({
id: streamId,
address: [
"0xf977814e90da44bfa03b6295a0616a897441acec",
"0x5754284f345afc66a98fbb0a0afe71e0f007b949",
], // Charlies and Douglas address
});
You can attach millions of addresses to stream. You can remove and get addresses aswell as adding new ones at any time.
// Add addresses
await Moralis.Streams.addAddress({
id: streamId,
address: [
"0xCFDF6Aaae9f6B927E3736FBD327853B622c5060E",
"0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B",
], // Can also be a single string
});
// Delete an address
await Moralis.Streams.deleteAddress({
id,
address,
});
// Get all addresses
await Moralis.Streams.getAddresses({ id });
Now let us monitor a specific event happening on a specific smart contract. In this example we will monitor all CryptoPunks Transfers.
import Moralis from "moralis";
import { EvmChain } from "@moralisweb3/evm-utils";
Moralis.start({
apiKey: "YOUR_API_KEY",
});
const stream = {
chains: [EvmChain.ETHEREUM], // punks are on ethereum mainnet
description: "all cryptopunk transfers", // your description
tag: "cryptoPunks", // give it a tag
topic0: ["PunkTransfer(address,address,uint256)"], // topic0 is the event signature
includeContractLogs: true, // we want to include contract logs
abi: [{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "from", "type": "address" },
{ "indexed": true, "name": "to", "type": "address" },
{ "indexed": false, "name": "punkIndex", "type": "uint256" },
],
"name": "PunkTransfer",
"type": "event",
}],
webhookUrl: "https://YOUR_WEBHOOK_URL", // webhook url to receive events,
};
const newStream = await Moralis.Streams.add(stream);
const { id } = newStream.toJSON(); // id: StreamId
// Now we attach the contract address to the stream
const address = "0xb47e3cd837dDF8e4c57F05d70Ab865de6e193BBB";
await Moralis.Streams.addAddress({ address, id });
You can get typings for webhooks from the @moralisweb3/streams
package.
import { Types } from "@moralisweb3/streams";
- Go to Streams
- Click on
New Stream
button. - Fill out the Form:
- Address:
0xb47e3cd837dDF8e4c57F05d70Ab865de6e193BBB
- Webhook URL:
https://YOUR_WEBHOOK_URL
- Paste in a valid ABI and select the topic
- Descripition:
all cryptopunk transfers
- Tag:
bob
- Select Blockchain (e.g. Ethereum Mainnet)
- Address:
- Click on
Create Stream
Whenever you create or update a stream you will receive a test webhook.
You have to return status code 200 for the stream to be start.
The test body will look like this:
{
"abi": [],
"block": {
"hash": "",
"number": "",
"timestamp": ""
},
"txs": [],
"txsInternal": [],
"tag": "",
"streamId": "",
"logs": [],
"chainId": "",
"confirmed": true,
"retries": 0,
"erc20Approvals": [],
"erc20Transfers": [],
"nftApprovals": {
"ERC721": [],
"ERC1155": []
},
"nftTransfers": []
}
The includeContractLogs
option will include all contract logs in the webhook,
should be set to true if you are monitoring a contract. If you are monitoring a
wallet address you can set this to true
to also get the contract logs if a
wallet interacts with a contract
You can also monitor all internal transactions happening on chain by setting the
includeInternalTxs
to true
.
If you are monitoring contract events you can decide to also include the native transaction details
The Create Stream endpoint also supports advanced options. With this option you
can have more control over the topics you are passing to the stream. For each
topic you can have advanced options that include the topic
, a
filter and a includeNativeTxs
field which indicates if you
want to include native transactions.
Say you have the following topic Transfer(address,address,uint256)
. You can
get every transfer happening on chain by setting the topic to
Transfer(address,address,uint256)
and setting allAddresses
to true
. That
means you will get all transactions that match this topic.
Now whenever an ingoing or outgoing transaction involving the address you are monitoring occurs, you will receive a webhook with the transaction details.
You will receive two webhooks for each block that contains the events you are interested in.
The first webhook will come as soon as the block is mined and have
confirmed:false
. This means that the block is still running the risk of being
dropped due to a reorganization of the blockchain.
The second webhook will come once enough blocks have been mined after the block
containing your events and have confirmed:true
. This number of blocks is also
called number of confirmations
.
This table shows the number of confirmations required for Moralis to consider a block confirmed.
In rare cases the webhook with confirmed: true
may come before the one with
confirmed:false
, please ensure to handle this scenario on your end.
The Webhook will set a x-signature
in the header. It is for verifying if the
data you will receive is from Moralis. Essentially it is a hash (sha3) of the
body and your API Key.
You can verify the webhook with the following code:
import Moralis from "moralis";
const { headers, body } = request;
Moralis.Streams.verifySignature({
body,
signature: headers["x-signature"],
}); // throws error if not valid
The body contains the data you are interested in. Logs is in array containing raw events and stream information such as tag and the streamId. The body also contains a chainId, the blocknumber, internal transactions, the abis and a confirmed field that indicates if the block is confirmed.
{
"txs": [
{
"hash": "0x9e77b2e848c5bfa67cdd46a4fsd12df0daa2e8fde18f35db58c0406fe43e766f",
"gas": "21000",
"gasPrice": "1000000006",
"nonce": "13",
"input": "0x",
"transactionIndex": "3",
"fromAddress": "0x32ba4825204dce15c7147ea89b31178a00750f81",
"toAddress": "0x0e1458c4b529f564a14ec8fc3ae7369786a1625d",
"value": "241000000000000000",
"type": "2",
"v": "0",
"r": "576982745954673806006784990416278341842152496504273944233060992947236722288",
"s": "22542532798662050842266238043505315399614658322386149246105065480312118160986",
"streamId": "4f08e7df-3753-41f0-83ff-3fb24f7d0266"
},
...
],
"logs": [],
"tag": "WALLET_1",
"streamType": "wallet",
"chainId": "0x1",
"confirmed": true,
"block": {
"hash": "0x9e77b2e848c5bfa67cdd46a4fsd12df0daa2e8fde18f35db58c0406fe43e766f",
"timestamp": "1627400000",
"number": "12990639",
}
}
{
"abis": {
"d63fff7a-1f49-45d8-ab99-1fe1f3aee449": [{
// ... event abi
}]
}, // abis is ab object with the streamId as key and an array of abi items as value
"logs": [
{
"logIndex": "48",
"transactionHash": "0xb9730dd1b49061f3b5a6f93e0a66a03be199cad6f21ba5e8747a8087754e3e",
"transactionIndex": "40",
"transactionValue": "0",
"address": "0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce",
"data": "0x0000000000000000000000000000000000000000204f8a5f22b432605d238000",
"topic0": "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"topic1": "0x00000000000000000000000068b3f12d6e8d85a8d3dbbc15bba9dc5103b888a4",
"topic2": "0x0000000000000000000000002faf487a4414fe77e2327f0bf4ae2a264a776ad2",
"topic3": null
}
],
"txs": [],
"streamId": "d63fff7a-1f49-45d8-ab99-1fe1f3aee449",
"tag": "bob",
"txsInternal": [],
"chainId": "0x1",
"confirmed": true,
"block": {
"hash": "0x9e77b2e848c5bfa67cdd46a4fsd12df0daa2e8fde18f35db58c0406fe43e766f",
"timestamp": "1627400000",
"number": "12990639"
}
}
Moralis automatically parses the following data for you:
- ERC20 Transfers and Approvals
- NFT Transfers and Approvals
Example Body:
{
"logs": [
{
...rawLogs,
}
],
...,
"erc20Transfers": [
{
"from": "0x68b3f12d6e8d85a8d3dbbc15bba9dc5103b888a4",
"to": "0x2faf487a4414fe77e2327f0bf4ae2a264a776ad2",
"value": "9999678895548600000000000000",
"transactionHash": "0xb9730dd1b49061f3b5a6f93e0a66a03be199cad6f21ba5e8747a8087754e3e",
"transactionIndex": "40",
"logIndex": "48",
"tokenAddress": "0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce",
"tokenName": "SHIBA INU",
"tokenSymbol": "SHIB",
"tokenDecimals": "18",
"valueWithDecimals": "9999678895.5486"
}
],
"tag": "shib_transfers",
"streamId": "d63fff7a-1f49-45d8-ab99-1fe1f3aee449",
"erc20Approvals": [],
"nftTransfers": [],
"nftApprovals": {
"ERC721": [],
"ERC1155": []
}
}
If you are streaming a smart contract event you can use the code below to extract the data from the webhook into a typed data structure.
// event MyEvent(address indexed from, address indexed to);
interface MyEvent {
from: string;
to: string;
}
const decodedLogs = Moralis.Streams.parsedLogs<MyEvent>({ webhook, tag });
decodedLogs[0]; // { from: '0x...', to: '0x...' }
You can see a list of all your streams by calling the following method:
const streams = await Moralis.Streams.getAll({
limit: 100, // limit the number of streams to return
});
Or you can see all streams in the Admin Panel
{
"result": [
{
"webhookUrl": "string",
"description": "string",
"tag": "string",
"topic0": [],
"includeNativeTxs": true,
"allAddresses": false,
"includeContractLogs": true,
"advancedOptions": [{
"topic0": "string",
"includeNativeTxs": true,
"filter": {}
}],
"abi": [],
"filter": "string",
"address": "string",
"chainIds": [
"string"
],
"id": "3fa84f64-5717-4562-b3fc-2c963f66afa6",
"status": "active",
"statusMessage": "string"
}
],
"cursor": "string",
"total": 1
}
Sometimes you want to check if a stream is still active or if something went
wrong. When you query all your streams you can see the status of
the stream. There are three possible states: active
, paused
and error
.
Moralis sets a default region for your stream. You can change the region anytime. Choose the region that is closest to your backend for the best performance.
import Moralis from "moralis";
Moralis.start({
apiKey: "YOUR_API_KEY",
});
await Moralis.Streams.setSettings({
region: "eu-central-1", // 'us-east-1' | 'us-west-2' | 'eu-central-1'
});
- Go to Settings
- Choose a region which is closest to your backend
- Click on Save Changes
In some cases you want to add a chain to an already existing stream or change the webhook url. Luckily you can easily update your streams.
Example on how to update the webhook url of a stream:
import Moralis from "moralis";
Moralis.start({
apiKey: "YOUR_API_KEY",
});
await Moralis.Streams.update({
id: "STREAM_ID",
webhook: "https://YOUR_NEW_WEBHOOK_URL",
});
- Go to Streams.
- Hover on the last column of the streams table. You will be able to see more options. (Edit, Delete, Pause Stream).
- Select
Edit
to go to edit form page. - Change the things you want to update
- Click on
Edit Stream
Filter | Function | Note | Example | Demo |
---|---|---|---|---|
or | either ... or ... | Need at least 2 filters | { "or" : [ {..filter1}, {...filter2} ]} | Mint/Burn USDC |
and | all filters must satisfy | Need at least 2 filters | { "and" : [ {..filter1}, {...filter2} ]} | ENS Registration |
eq | checks for equality | { "eq": ["value", "1000"] } | Specifc NFT | |
ne | checks for inequality | { "ne": ["address", "0x...325"] } | ||
lt | value is less than | Value must be a number | { "lt": ["amount", "50"] } | |
gt | value is greater than | Value must be a number | { "gt": ["price", "500000"] } | USDT Whales |
lte | value is less than or equal to | Value must be a number | { "lte": ["value", "100"] } | |
gte | value is greater than or equal to | Value must be a number | { "gte": ["value", "100"] } | Mint/Burn USDC |
in | value is in array | Must provide an array | { "in": ["city": ["berlin", "paris"]]} | Specifc CryptoPunks |
nin | value is not in array | Must provide an array | { "nin": ["name": ["bob", "alice"]]} |
In some cases you might want to filter the data you receive from the webhook. You can do this by adding a filter to the stream. Important: You must add a (valid!) ABI of the event you want to filter! Otherwise the stream will not work
import { Moralis } from "moralis";
import { EvmChain } from "@moralisweb3/evm-utils";
const options = {
chains: [EvmChain.ETHEREUM], // list of blockchains to monitor
description: "monitor one NFT from Collection", // your description
tag: "mySpecialNft", // give it a tag
topic0: ["Transfer(address,address,uint256)"], // the event you want to monitor
includeContractLogs: true,
abi: [{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "from", "type": "address" },
{ "indexed": true, "name": "to", "type": "address" },
{ "indexed": false, "name": "tokenId", "type": "uint256" },
],
"name": "Transfer",
"type": "event",
}], // valid abi of the event
advancedOptions: [
{
topic0: "Transfer(address,address,uint256)",
filter: {
eq: ["tokenId", "1"], // only receive events where tokenId is 1
},
includeNativeTxs: true,
},
],
webhookUrl: "https://YOUR_WEBHOOK_URL", // webhook url to receive events,
};
const stream = await Moralis.Streams.add(stream);
// Attach the contract address to the stream
await Moralis.Streams.addAddress({
id: stream.id,
address: "0x68b3f12d6e8d85a8d3dbbc15bba9dc5103b888a4", // contract address of the NFT
});
- Create a new Stream
- Fill out the form
- Switch on event emittance and add the Abi and select the topic.
- Add the below to Advanced options
[
{
"topic0": "Transfer(address,address,uint256)",
"filter": { "eq": ["tokenId", "1"] }
}
]
- Click on create stream.
const transferAbi = {
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "from", "type": "address" },
{ "indexed": true, "name": "to", "type": "address" },
{ "indexed": false, "name": "value", "type": "uint256" },
],
"name": "Transfer",
"type": "event",
}; // valid abi of the event
const approvalAbi = {
"anonymous": false,
"inputs": [{ "indexed": true, "name": "owner", "type": "address" }, {
"indexed": true,
"name": "spender",
"type": "address",
}, { "indexed": false, "name": "value", "type": "uint256" }],
"name": "Approval",
"type": "event",
};
const options = {
chains: [EvmChain.ETHEREUM], // list of blockchains to monitor
description: "whale transactions", // your description
tag: "usdtwhale", // give it a tag
topic0: ["Transfer(address,address,uint256)", "Approval(address,address,uint256)], // topic of the event
includeContractLogs: true,
abi: [transferAbi, approvalAbi],
advancedOptions: [
{
topic0: "Transfer(address,address,uint256)",
filter: { "gt": ["value", "100000000000"] }, // only receive events where the value is above 100k USDT
includeNativeTxs: true,
},
{
topic0: "Approval(address,address,uint256)",
filter: { "gt": ["value", "100000000000"] },
includeNativeTxs: true,
}
],
webhookUrl: "https://YOUR_WEBHOOK_URL", // webhook url to receive events,
};
const stream = await Moralis.Streams.add(stream);
await Moralis.Streams.addAddress({
id: stream.id,
address: "0xdAC17F958D2ee523a2206206994597C13D831ec7"
})
- Create a new Stream
- Fill out the form
- Switch on Event Emittance and Add the Abi and select the topic
- Add below value to advanced options
[
{
"topic0": "Approval(address,address,uint256)",
"filter": { "gt": ["value", "100000000000"] }
}
]
- Click on create stream button.
const punkTransferAbi = [{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "from", "type": "address" },
{ "indexed": true, "name": "to", "type": "address" },
{ "indexed": false, "name": "punkIndex", "type": "uint256" },
],
"name": "PunkTransfer",
"type": "event",
}]; // valid abi of the event
const options = {
chains: [EvmChain.ETHEREUM], // list of blockchains to monitor
description: "1000 to 1002 cryptopunks", // your description
tag: "cryptoPunks", // give it a tag
abi: punkTransferAbi,
includeContractLogs: true,
topic0: ["PunkTransfer(address,address,uint256)"], // topic of the event
advancedOptions: [
{
topic0: "PunkTransfer(address,address,uint256)",
filter: { "in": ["punkIndex", ["1000", "1001", "1002"]] }, // only receive transfer events if the token id is 1000/1001/1002
includeNativeTxs: true,
},
],
webhookUrl: "https://YOUR_WEBHOOK_URL", // webhook url to receive events,
};
const stream = await Moralis.Streams.add(stream);
// Attach the contract address to the stream
await Moralis.Streams.addAddress({
id: stream.id,
address: "0xb47e3cd837dDF8e4c57F05d70Ab865de6e193BBB", // crypto punks address
});
- Create a new Stream
- Fill out the form
- Switch on Event Emittance and Add the Abi and select the topic
- Add below value to advanced options
[
{
"topic0": "PunkTransfer(address,address,uint256)",
"filter": { "in": ["punkIndex", ["1000", "1001", "1002"]] }
}
]
- Click on create stream button.
Lets check if a specific address has registered an ENS name that costs higher than 1 ETH.
const ensNameRegisteredAbi = [{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "name",
"type": "string",
},
{
"indexed": true,
"internalType": "bytes32",
"name": "label",
"type": "bytes32",
},
{
"indexed": true,
"internalType": "address",
"name": "owner",
"type": "address",
},
{
"indexed": false,
"internalType": "uint256",
"name": "cost",
"type": "uint256",
},
{
"indexed": false,
"internalType": "uint256",
"name": "expires",
"type": "uint256",
},
],
"name": "NameRegistered",
"type": "event",
}]; // valid abi of the event
const filter = {
"and": [
{ "eq": ["owner", "0x283Af0B28c62C092C9727F1Ee09c02CA627EB7F5"] },
{ "gt": ["cost", "1000000000000000000"] },
],
}; // only receive registration events if the owner is the address and the cost is higher than 1 ETH
const options = {
chains: [EvmChain.ETHEREUM], // Ethereum Name Service so we only monitor Ethereum
description: "ENS Name Registrations", // your description
tag: "ensRegistrationByBob", // give it a tag
abi: ensNameRegisteredAbi,
filter,
topic0: ["NameRegistered(string,bytes32,address,uint256,uint256)"],
includeContractLogs: true,
advancedOptions: [
{
topic0: "NameRegistered(string,bytes32,address,uint256,uint256)",
filter,
includeNativeTxs: true,
},
],
webhookUrl: "https://YOUR_WEBHOOK_URL", // webhook url to receive events,
};
const stream = await Moralis.Streams.add(options);
// Attach the contract address to the stream
await Moralis.Streams.addAddress({
id: stream.id,
address: "0x00000000000C2E074eC69A0dFb2997BA6C7d2e1e", // ENS Registry address
});
- Create a new Stream
- Fill out the form
- Switch on Event Emittance and Add the Abi and select the topic
- Add below value to advanced options
[
{
"topic0": "NameRegistered(string,bytes32,address,uint256,uint256)",
"filter": {
"and": [
{ "eq": ["owner", "0x283Af0B28c62C092C9727F1Ee09c02CA627EB7F5"] },
{ "gt": ["cost", "1000000000000000000"] }
]
}
}
]
- Click on create stream button.
Lets check if a specific wallet burns or mints at least 100K USDC
Lets check all USDC transfers but filter transaction where the recipent or the sender is the zero address and if the amount is greater or equal to 10000 USDC
const transferUsdcAbi = [{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address",
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address",
},
{
"indexed": false,
"internalType": "uint256",
"name": "value",
"type": "uint256",
},
],
"name": "Transfer",
"type": "event",
}];
const filter = {
"or": [
{
"and": [
{ "eq": ["sender", "0x00000...00000"] },
{ "gte": ["amount", "10000000000"] },
],
},
{
"and": [
{ "eq": ["receiver", "0x00000...00000"] },
{ "gte": ["amount", "10000000000"] },
],
},
],
}; // we will only receive events when the transfer recipent or the sender is the zero address meaning we are filtering mints and burn
const options = {
chains: [EvmChain.ETHEREUM], // Monitor USDC on ethereum
description: "ENS Name Registrations", // your description
tag: "mintsAndBurns", // give it a tag
abi: transferUsdcAbi,
includeContractLogs: true,
topic0: ["Transfer(address,address,uint256)"],
advancedOptions: [
{
topic0: "Transfer(address,address,uint256)",
filter,
includeNativeTxs: true,
},
],
filter,
webhookUrl: "https://YOUR_WEBHOOK_URL", // webhook url to receive events,
};
const stream = await Moralis.Streams.add(options);
// Attach the contract address to the stream
await Moralis.Streams.addAddress({
id: stream.id,
address: "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", // USDC address
});
- Create a new Stream
- Fill out the form
- Switch on Event Emittance and Add the Abi and select the topic
- Add below value to advanced options
[
{
"topic0": "Transfer(address,address,uint256)",
"filter": {
"or": [
{
"and": [
{ "eq": ["sender", "0x00000...00000"] },
{ "gte": ["amount", "10000000000"] }
]
},
{
"and": [
{ "eq": ["receiver", "0x00000...00000"] },
{ "gte": ["amount", "10000000000"] }
]
}
]
}
}
]
- Click on create stream button.
You can update the status of a stream at any time. Possible values for status
are active
, paused
and error
.
For example: In some cases you might want to pause a stream. You can do this by calling the specific endpoint.
await Moralis.Streams.updateStatus({
id: "YOUR_STREAM_ID",
status: "paused",
});
- Go to Streams.
- Hover on the last column of the streams table. You will be able to see more options. (Edit, Delete, Pause Stream).
- Select
Pause Stream
to change the status of your stream
curl -X 'POST' \
'https://api.moralis-streams.com/streams/evm/STREAM_ID/status' \
-H 'accept: application/json' \
-H 'x-api-key: YOUR_API_KEY' \
-H 'Content-Type: application/json' \
-d '{"status": "paused"}'
Moralis guarantees 100% delivery of webhooks. Still you can have errors in your streams for example if your server is down. Moralis will retry to send the webhook in intervals. If the webhook is not delivered after 24 hours but you still want to receive the failed webhooks after some time you can manuallay replay the failed webhooks.
Your Stream can enter into error state on the following 2 scenarios:
-
Your webhook success-rate is below 70%
-
Your webhook is not consuming the webhooks fast enough so a queue start to build up and reached the limit of 10k queued, you can monitor the size of the queue with the header x-queue-size when you receive the webhook, You need to ensure your server can consume all webhooks fast enough so a queue doesnt build up, moving the streams to a closer region will help reduce the time.
A stream in Error state will stop sending webhook but will continue saving them in history and the retry schedule will resume when the stream is enabled again.
If your Stream enters Error State you will receive a Email notification so you can act before it gets terminated.
If your Stream stays in Error State for 24 Hours it will be terminated.
A Terminated Stream will not send webhooks and will not process any new blocks, this means the blocks will be dropped.
If your Stream is terminated you will receive a Email notification
Retry | Interval |
---|---|
0 | 1 min |
1 | 10 min |
2 | 1 hour |
3 | 2 hours |
4 | 6 hours |
5 | 12 hours |
6 | 1 day |
The Streams API provides an endpoint to get all failed webhooks. It is useful to replay the failed webhooks.
While in BETA, all failed webhooks are stored for 48 Hours, once Streams is in production different plans will allow up to 2 weeks retention.
const history = await Moralis.Streams.getHistory({ limit: 100 });
- Go to Failed Deliveries. All your failed deliveries are listed here.
The Response is a list of failed webhooks that are uniquely identified by id. The payload contains the webhook details.
{
"result": [
{
"id": "HISTORY_ID",
"date": "string",
"payload": {
// the failed webhook
},
"tinyPayload": {
"block": {},
"chainId": "",
"amount": 0
},
"streamId": "STREAM_ID",
"tag": "TAG",
"errorMessage": "string",
"webhookUrl": "string"
}
],
"total": "number",
"cursor": "string"
}
You can replay (retry) a failed webhook by calling the specific endpoint.
await Moralis.Streams.retryWebhook({ id: "HISTORY_ID", streamId: "STREAM_ID" });
- Go to Failed Deliveries.
- Click on the
replay
icon against your failed delivery
You can use the Swagger UI or make an API call to the endpoint.
curl -X 'POST' \
'https://api.moralis-streams.com/history/replay/WEBHOOK_ID' \
-H 'accept: application/json' \
-H 'x-api-key: YOUR_API_KEY' \
-H 'Content-Type: application/json' \
Every Stream starts with a successrate of 100% per webhook URL. For every failed
webhook the rate decreases by 1% per webhook URL. For every successful webhook
the rate increases by 1% per webhook URL where the maximum is 100%. If the
success rate is below 70% per webhook URL the stream will be discontinued and an
email is sent to you. The status of your stream will be set to error
.
If you want to continue receiving events you can
update the status of a stream to active
.
You can get stats about your streams usage
The UI for this feature is currently under development
You can use the Swagger UI or make an API call to the endpoint.
curl -X 'GET' \
'https://api.moralis-streams.com/beta/stats' \
-H 'accept: application/json' \
-H 'x-api-key: YOUR_API_KEY' \
-H 'Content-Type: application/json' \
{
"totalWebhooksDelivered": 15,
"totalWebhooksFailed": 32,
"totalLogsProcessed": 26,
"totalTxsProcessed": 45,
"totalTxsInternalProcessed": 3
}