Skip to content

Commit

Permalink
clean up result closing
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Hafner committed Mar 25, 2024
1 parent 9eaebd8 commit 0de8a3d
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public PIClient(
public Transaction getCurrentOrCreateNewTransaction() {
synchronized ( this ) {
if ( currentTransaction == null || !currentTransaction.isActive() ) {
//TODO TH: can a single transaction contain changes to different namespaces -> use null
currentTransaction = transactionManager.startTransaction( catalogUser.id, namespace.id, false, "ProtoInterface" );
}
return currentTransaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ public PIPreparedIndexedStatement(

public List<Long> executeBatch( List<List<PolyValue>> valuesBatch ) {
List<Long> updateCounts = new LinkedList<>();
synchronized ( client ) {
synchronized ( this ) {
if ( statement == null ) {
statement = client.getCurrentOrCreateNewTransaction().createStatement();
} else {
statement.getDataContext().resetParameterValues();
}
closeResults();
List<AlgDataType> types = valuesBatch.stream()
.map( v -> v.get( 0 ).getType() )
.map( v -> statement.getTransaction().getTypeFactory().createPolyType( v ) )
Expand All @@ -79,7 +78,7 @@ public List<Long> executeBatch( List<List<PolyValue>> valuesBatch ) {

@SuppressWarnings("Duplicates")
public StatementResult execute( List<PolyValue> values, int fetchSize ) {
synchronized ( client ) {
synchronized ( this ) {
if ( statement == null ) {
statement = client.getCurrentOrCreateNewTransaction().createStatement();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ public PIPreparedNamedStatement(

@SuppressWarnings("Duplicates")
public StatementResult execute( Map<String, PolyValue> values, int fetchSize ) throws Exception {
synchronized ( client ) {
synchronized ( this ) {
if ( statement == null ) {
statement = client.getCurrentOrCreateNewTransaction().createStatement();
} else {
statement.getDataContext().resetParameterValues();
}
closeResults();
List<PolyValue> valueList = namedValueProcessor.transformValueMap( values );
for ( int i = 0; i < valueList.size(); i++ ) {
statement.getDataContext().addParameterValues( i, null, List.of( valueList.get( i ) ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void closeResults() {
}
try {
iterator.close();
iterator = null;
} catch ( Exception e ) {
throw new GenericRuntimeException( "Closing of open result iterator failed" );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public PIUnparameterizedStatement( int id, PIClient client, QueryLanguage langua

public StatementResult execute( int fetchSize ) {
statement = client.getCurrentOrCreateNewTransaction().createStatement();
synchronized ( client ) {
synchronized ( this ) {
StatementProcessor.implement( this );
return StatementProcessor.executeAndGetResult( this, fetchSize );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,140 +59,139 @@ private LogicalNamespace getNamespace( String namespaceName ) {


public PIUnparameterizedStatement createUnparameterizedStatement( ExecuteUnparameterizedStatementRequest request ) throws PIServiceException {
synchronized ( client ) {
String languageName = request.getLanguageName();
if ( !isSupportedLanguage( languageName ) ) {
throw new PIServiceException( "Language " + languageName + " not supported." );
}
int statementId = statementIdGenerator.getAndIncrement();
LogicalNamespace namespace = client.getNamespace();
if ( request.hasNamespaceName() ) {
namespace = getNamespace( request.getNamespaceName() );
}
assert namespace != null;
PIUnparameterizedStatement statement = new PIUnparameterizedStatement(
statementId,
client,
QueryLanguage.from( languageName ),
namespace,
request.getStatement()
);
openStatements.put( statementId, statement );
if ( log.isTraceEnabled() ) {
log.trace( "created request {}", statement );
}
return statement;
//synchronized ( client ) {
String languageName = request.getLanguageName();
if ( !isSupportedLanguage( languageName ) ) {
throw new PIServiceException( "Language " + languageName + " not supported." );
}
int statementId = statementIdGenerator.getAndIncrement();
LogicalNamespace namespace = client.getNamespace();
if ( request.hasNamespaceName() ) {
namespace = getNamespace( request.getNamespaceName() );
}
assert namespace != null;
PIUnparameterizedStatement statement = new PIUnparameterizedStatement(
statementId,
client,
QueryLanguage.from( languageName ),
namespace,
request.getStatement()
);
openStatements.put( statementId, statement );
if ( log.isTraceEnabled() ) {
log.trace( "created request {}", statement );
}
return statement;
//}
}


public PIUnparameterizedStatementBatch createUnparameterizedStatementBatch( List<ExecuteUnparameterizedStatementRequest> statements ) {
synchronized ( client ) {
List<PIUnparameterizedStatement> PIUnparameterizedStatements = statements.stream()
.map( this::createUnparameterizedStatement )
.collect( Collectors.toList() );
final int batchId = statementIdGenerator.getAndIncrement();
final PIUnparameterizedStatementBatch batch = new PIUnparameterizedStatementBatch( batchId, client, PIUnparameterizedStatements );
openUnparameterizedBatches.put( batchId, batch );
if ( log.isTraceEnabled() ) {
log.trace( "created batch {}", batch );
}
return batch;
//synchronized ( client ) {
List<PIUnparameterizedStatement> PIUnparameterizedStatements = statements.stream()
.map( this::createUnparameterizedStatement )
.collect( Collectors.toList() );
final int batchId = statementIdGenerator.getAndIncrement();
final PIUnparameterizedStatementBatch batch = new PIUnparameterizedStatementBatch( batchId, client, PIUnparameterizedStatements );
openUnparameterizedBatches.put( batchId, batch );
if ( log.isTraceEnabled() ) {
log.trace( "created batch {}", batch );
}
return batch;
//}
}


public PIPreparedIndexedStatement createIndexedPreparedInterfaceStatement( PrepareStatementRequest request ) throws PIServiceException {
synchronized ( client ) {
String languageName = request.getLanguageName();
if ( !isSupportedLanguage( languageName ) ) {
throw new PIServiceException( "Language " + languageName + " not supported." );
}
int statementId = statementIdGenerator.getAndIncrement();
LogicalNamespace namespace = client.getNamespace();
if ( request.hasNamespaceName() ) {
namespace = getNamespace( request.getNamespaceName() );
}
assert namespace != null;
PIPreparedIndexedStatement statement = new PIPreparedIndexedStatement(
statementId,
client,
QueryLanguage.from( languageName ),
namespace,
request.getStatement()
);
openStatements.put( statementId, statement );
if ( log.isTraceEnabled() ) {
log.trace( "created named prepared statement {}", statement );
}
return statement;
//synchronized ( client ) {
String languageName = request.getLanguageName();
if ( !isSupportedLanguage( languageName ) ) {
throw new PIServiceException( "Language " + languageName + " not supported." );
}
int statementId = statementIdGenerator.getAndIncrement();
LogicalNamespace namespace = client.getNamespace();
if ( request.hasNamespaceName() ) {
namespace = getNamespace( request.getNamespaceName() );
}
assert namespace != null;
PIPreparedIndexedStatement statement = new PIPreparedIndexedStatement(
statementId,
client,
QueryLanguage.from( languageName ),
namespace,
request.getStatement()
);
openStatements.put( statementId, statement );
if ( log.isTraceEnabled() ) {
log.trace( "created named prepared statement {}", statement );
}
return statement;
//}
}


public PIPreparedNamedStatement createNamedPreparedInterfaceStatement( PrepareStatementRequest request ) throws PIServiceException {
synchronized ( client ) {
String languageName = request.getLanguageName();
if ( !isSupportedLanguage( languageName ) ) {
throw new PIServiceException( "Language " + languageName + " not supported." );
}
final int statementId = statementIdGenerator.getAndIncrement();
LogicalNamespace namespace = client.getNamespace();
if ( request.hasNamespaceName() ) {
namespace = getNamespace( request.getNamespaceName() );
}
assert namespace != null;
PIPreparedNamedStatement statement = new PIPreparedNamedStatement(
statementId,
client,
QueryLanguage.from( languageName ),
namespace,
request.getStatement()
);
openStatements.put( statementId, statement );
if ( log.isTraceEnabled() ) {
log.trace( "created named prepared statement {}", statement );
}
return statement;
//synchronized ( client ) {
String languageName = request.getLanguageName();
if ( !isSupportedLanguage( languageName ) ) {
throw new PIServiceException( "Language " + languageName + " not supported." );
}
final int statementId = statementIdGenerator.getAndIncrement();
LogicalNamespace namespace = client.getNamespace();
if ( request.hasNamespaceName() ) {
namespace = getNamespace( request.getNamespaceName() );
}
assert namespace != null;
PIPreparedNamedStatement statement = new PIPreparedNamedStatement(
statementId,
client,
QueryLanguage.from( languageName ),
namespace,
request.getStatement()
);
openStatements.put( statementId, statement );
if ( log.isTraceEnabled() ) {
log.trace( "created named prepared statement {}", statement );
}
return statement;
//}
}


public void closeAll() {
synchronized ( client ) {
openUnparameterizedBatches.values().forEach( this::closeBatch );
openStatements.values().forEach( s -> closeStatement( s.getId() ) );
}
//synchronized ( client ) {
openUnparameterizedBatches.values().forEach( this::closeBatch );
openStatements.values().forEach( s -> closeStatement( s.getId() ) );
//}
}


public void closeBatch( PIUnparameterizedStatementBatch toClose ) {
synchronized ( client ) {
//synchronized ( toClose ) {
toClose.getStatements().forEach( s -> closeStatementOrBatch( s.getId() ) );
}
//}
}


private void closeStatement( int statementId ) {
synchronized ( client ) {
PIStatement statementToClose = openStatements.remove( statementId );
if ( statementToClose == null ) {
return;
}
statementToClose.closeResults();
PIStatement statementToClose = openStatements.get( statementId );
if ( statementToClose == null ) {
return;
}
//synchronized ( statementToClose ) {
openStatements.remove( statementId );
statementToClose.closeResults();
//}
}


public void closeStatementOrBatch( int statementId ) {
synchronized ( client ) {
PIUnparameterizedStatementBatch batchToClose = openUnparameterizedBatches.remove( statementId );
if ( batchToClose != null ) {
closeBatch( batchToClose );
return;
}
closeStatement( statementId );
PIUnparameterizedStatementBatch batchToClose = openUnparameterizedBatches.remove( statementId );
if ( batchToClose != null ) {
closeBatch( batchToClose );
return;
}
closeStatement( statementId );
}


Expand Down

0 comments on commit 0de8a3d

Please sign in to comment.