Skip to content

Commit

Permalink
Display error messages and enable multiple caching status for adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
tuncpolat committed Aug 29, 2023
1 parent f8d198c commit 5bacf82
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019-2023 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.adapter.ethereum;

public class CacheException extends RuntimeException {

public CacheException( String message ) {
super( message );
}


public CacheException( String message, Throwable cause ) {
super( message, cause );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ public class CachingStatus {
public BigInteger toBlock;
public BigInteger currentBlock;
public BigInteger currentEndBlock;

public int sourceAdapterId;
public String errorMessage;


public enum ProcessingState {
INITIALIZED, PROCESSING, DONE
INITIALIZED, PROCESSING, DONE, ERROR
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.adapter.DataSource.ExportedColumn;
import org.polypheny.db.adapter.ethereum.CachingStatus.ProcessingState;
import org.polypheny.db.ddl.DdlManager.FieldInformation;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.FunctionReturnDecoder;
Expand All @@ -48,6 +49,9 @@ public class ContractCache {
private final BigInteger fromBlock;
private final BigInteger toBlock;
private BigInteger currentBlock;
private boolean hasError = false;
private String errorMessage;


private final Map<String, EventCache> cache = new ConcurrentHashMap<>(); // a cache for each event
private final Map<String, List<EventData>> eventsPerContract;
Expand Down Expand Up @@ -107,7 +111,19 @@ public void startCaching() {
for ( Map.Entry<String, EventCache> entry : cache.entrySet() ) {
String address = entry.getKey();
EventCache eventCache = entry.getValue();
eventCache.addToCache( address, currentBlock, endBlock, targetAdapterId );
try {
eventCache.addToCache(address, currentBlock, endBlock, targetAdapterId);
} catch (CacheException e) {
log.error("Error occurred while adding to cache: " + e.getMessage());
hasError = true;
errorMessage = e.getMessage();
throw e;
} catch (Throwable t) {
log.error("Unexpected error during caching: " + t.getMessage(), t);
hasError = true;
errorMessage = t.getMessage();
return;
}
}

currentBlock = endBlock.add( BigInteger.ONE ); // avoid overlapping block numbers
Expand All @@ -122,6 +138,7 @@ public CachingStatus getStatus() {
status.toBlock = toBlock;
status.currentBlock = currentBlock;
status.currentEndBlock = currentBlock.add(BigInteger.valueOf(batchSizeInBlocks));
status.sourceAdapterId = sourceAdapterId;

if ( currentBlock.add( BigInteger.valueOf( batchSizeInBlocks ) ).compareTo( toBlock ) > 0 ) {
status.percent = 100;
Expand All @@ -139,6 +156,11 @@ public CachingStatus getStatus() {
}
}

if (hasError) {
status.state = ProcessingState.ERROR;
status.errorMessage = errorMessage;
}

return status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
@AdapterSettingBoolean(name = "Caching", description = "Cache event data", defaultValue = true, position = 9, modifiable = true)
@AdapterSettingInteger(name = "batchSizeInBlocks", description = "Batch size for caching in blocks", defaultValue = 50, position = 10, modifiable = true)
@AdapterSettingString(name = "CachingAdapterTargetName", description = "Adapter Target Name", defaultValue = "hsqldb", position = 11, modifiable = true) // todo DL: list
@AdapterSettingBoolean(name = "UseManualABI", description = "Cache event data", defaultValue = false, position = 12, modifiable = true)
@AdapterSettingString(name = "ContractABI", description = "Contract ABI", defaultValue = "", position = 13, modifiable = true)
@AdapterSettingString(name = "ContractName", description = "Contract name", defaultValue = "", position = 14, modifiable = true)
public class EthereumDataSource extends DataSource {

public static final String SCHEMA_NAME = "public";
Expand All @@ -101,6 +104,10 @@ public class EthereumDataSource extends DataSource {

private Map<String, List<ExportedColumn>> map;

private final boolean useManualABI;
private final String contractABI;
private final String contractName;


public EthereumDataSource( final int storeId, final String uniqueName, final Map<String, String> settings ) {
super( storeId, uniqueName, settings, true );
Expand All @@ -120,6 +127,9 @@ public EthereumDataSource( final int storeId, final String uniqueName, final Map
this.eventDataMap = new HashMap<>();
this.caching = Boolean.parseBoolean( settings.get( "Caching" ) );
this.cachingAdapterTargetName = settings.get( "CachingAdapterTargetName" );
this.useManualABI = Boolean.parseBoolean( settings.get( "UseManualABI" ) );
this.contractABI = settings.get( "ContractABI" );
this.contractName = settings.get( "ContractName" );
// todo DL
new Thread( () -> {
createInformationPage();
Expand Down Expand Up @@ -326,11 +336,20 @@ private void createExportedColumnsForEvents( Map<String, List<ExportedColumn>> m
for ( String address : smartContractAddresses ) {
String contractName = null;
List<JSONObject> contractEvents = null;
try {
contractName = callWithExponentialBackoff( () -> getContractName( address ) );
contractEvents = callWithExponentialBackoff( () -> getEventsFromABI( etherscanApiKey, address ) );
} catch ( Exception e ) {
throw new RuntimeException( e );
if ( useManualABI == true && !contractABI.isEmpty() && !this.contractName.isEmpty() ) {
if (smartContractAddresses.size() > 1) {
throw new IllegalArgumentException("Only one smart contract address should be provided when using a manual ABI.");
}
JSONArray abiArray = new JSONArray(contractABI);
contractEvents = getEventsFromABIArray(abiArray);
contractName = this.contractName;
} else {
try {
contractName = callWithExponentialBackoff( () -> getContractName( address ) );
contractEvents = callWithExponentialBackoff( () -> getEventsFromABI( etherscanApiKey, address ) );
} catch ( Exception e ) {
throw new RuntimeException( e );
}
}

for ( JSONObject event : contractEvents ) {
Expand Down Expand Up @@ -451,6 +470,21 @@ protected List<JSONObject> getEventsFromABI( String etherscanApiKey, String cont
return events;
}

protected List<JSONObject> getEventsFromABIArray(JSONArray abiArray) {
List<JSONObject> events = new ArrayList<>();

// Loop through the ABI
for (int i = 0; i < abiArray.length(); i++) {
JSONObject item = abiArray.getJSONObject(i);

// Check if the item is of type 'event'
if (item.has("type") && "event".equals(item.getString("type"))) {
events.add(item);
}
}

return events;
}

private String getContractName( String contractAddress ) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,17 @@ private void addLogsToCache( String address, EventData eventData, BigInteger sta
try {
EthLog ethLog = web3j.ethGetLogs( filter ).send(); // Get the EthLog response

// todo: show on screen and update
/*if ( startBlock.equals( BigInteger.valueOf( 17669096 ) ) ) {
throw new RuntimeException( "Error fetching logs for block range: " + startBlock + " to " + endBlock ); // just start new caching from startBlock
}*/

if ( ethLog.hasError() ) {
Response.Error error = ethLog.getError();
log.error( "Error fetching logs: " + error.getMessage() );
throw new RuntimeException( "Error fetching logs for block range: " + startBlock + " to " + endBlock + ". Message: " + error.getMessage() ); // just start new caching from startBlock
throw new CacheException( "Error occurred while fetching logs for block range: " + startBlock + " to " + endBlock + ". Please retry starting from block " + startBlock + " and continue to your intended final block. Error Message: " + error.getMessage() );
}
List<EthLog.LogResult> rawLogs = ethLog.getLogs();
List<List<Object>> structuredLogs = normalizeLogs( event, rawLogs );
cache.put( eventData, structuredLogs );

} catch ( IOException e ) {
throw new RuntimeException( "IO Error fetching logs", e );
throw new CacheException( "IO Error fetching logs", e );
}
}

Expand Down

0 comments on commit 5bacf82

Please sign in to comment.