Skip to content

Commit

Permalink
Add granular locking for ddls
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Hafner committed Nov 27, 2024
1 parent 268b3e0 commit 20be93a
Show file tree
Hide file tree
Showing 85 changed files with 891 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public int compareTo( @NotNull Entity o ) {
return Long.compare( this.id, o.id );
}

public ObjectType getObjectType() {
public ObjectType getLockableObjectType() {
throw new UnsupportedOperationException( "Should be overwritten by child" );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Expression asExpression() {


@Override
public ObjectType getObjectType() {
public ObjectType getLockableObjectType() {
return ObjectType.ENTITY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Expression asExpression() {


@Override
public ObjectType getObjectType() {
public ObjectType getLockableObjectType() {
return ObjectType.ENTITY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public int compareTo( LogicalNamespace o ) {


@Override
public ObjectType getObjectType() {
public ObjectType getLockableObjectType() {
return ObjectType.NAMESPACE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public List<Long> getConstraintIds() {


@Override
public ObjectType getObjectType() {
public ObjectType getLockableObjectType() {
return ObjectType.ENTITY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public enum RuntimeConfig {
S2PL_LOCKING_LEVEL(
"runtime/s2plLockingLevel",
"Define the granularity of lock acquisition.",
S2plLockingLevel.GLOBAL,
S2plLockingLevel.ENTITY,
ConfigType.ENUM
),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package org.polypheny.db.nodes;


import java.util.Map;
import org.polypheny.db.prepare.Context;
import org.polypheny.db.processing.QueryContext.ParsedQueryContext;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.locking.Lockable;
import org.polypheny.db.transaction.locking.Lockable.LockType;


/**
Expand All @@ -29,5 +32,5 @@ public interface ExecutableStatement {

void execute( Context context, Statement statement, ParsedQueryContext parsedQueryContext );


Map<Lockable, LockType> deriveLockables(Context context, ParsedQueryContext parsedQueryContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@

package org.polypheny.db.nodes;

import java.util.HashMap;
import java.util.Map;
import org.polypheny.db.catalog.entity.logical.LogicalNamespace;
import org.polypheny.db.prepare.Context;
import org.polypheny.db.processing.QueryContext.ParsedQueryContext;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.locking.Lockable;
import org.polypheny.db.transaction.locking.Lockable.LockType;


/**
Expand All @@ -31,4 +36,8 @@ default void execute( Context context, Statement statement, ParsedQueryContext p
throw new UnsupportedOperationException( "The operation is not supported by the used language." );
}

@Override
default Map<Lockable, LockType> deriveLockables( Context context, ParsedQueryContext parsedQueryContext ) {
return Map.of();
}
}
8 changes: 5 additions & 3 deletions core/src/main/java/org/polypheny/db/processing/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.PolyImplementation;
import org.polypheny.db.algebra.AlgRoot;
Expand All @@ -32,6 +33,7 @@
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.transaction.TransactionException;
import org.polypheny.db.transaction.locking.Lockable;
import org.polypheny.db.transaction.locking.Lockable.LockType;
import org.polypheny.db.util.DeadlockException;
import org.polypheny.db.util.Pair;

Expand All @@ -47,9 +49,9 @@ public abstract class Processor {

public PolyImplementation prepareDdl( Statement statement, ExecutableStatement node, ParsedQueryContext context ) {
try {
// Acquire global schema lock
lock( statement.getTransaction(), context );
// Execute statement
Map<Lockable, LockType> requiredLockables = node.deriveLockables( statement.getPrepareContext(), context );
Transaction transaction = statement.getTransaction();
requiredLockables.forEach( transaction::acquireLockable );
return getImplementation( statement, node, context );
} catch ( DeadlockException e ) {
throw new DeadlockException( e.getMessage() + " Exception while acquiring global schema lock" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ enum ObjectType {
ENTITY
}

ObjectType getObjectType();
ObjectType getLockableObjectType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.polypheny.db.algebra.AlgVisitor;
import org.polypheny.db.algebra.core.relational.RelAlg;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.Entity;
import org.polypheny.db.catalog.entity.logical.LogicalTable;
import org.polypheny.db.config.RuntimeConfig;
import org.polypheny.db.partition.properties.PartitionProperty;
Expand Down Expand Up @@ -60,7 +59,7 @@ private void visitRelationalNode( AlgNode currentNode ) {
if ( RuntimeConfig.FOREIGN_KEY_ENFORCEMENT.getBoolean() ) {
extractWriteConstraints( currentNode.getEntity().unwrap( LogicalTable.class ).orElseThrow() );
}
addResult( currentNode.getEntity(), lockType);
LockableUtils.updateMapOfDerivedLockables( currentNode.getEntity(), lockType, result );
}


Expand All @@ -73,36 +72,12 @@ private void extractWriteConstraints( LogicalTable logicalTable ) {
.filter( Optional::isPresent )
.map( Optional::get );
} )
.forEach( entry -> addResult( entry, LockType.SHARED ) );
.forEach( entry -> LockableUtils.updateMapOfDerivedLockables( entry, LockType.SHARED, result ) );
}


private void visitNonRelationalNode( AlgNode currentNode ) {
LockType lockType = currentNode.isDataModifying() ? LockType.EXCLUSIVE : LockType.SHARED;
result.put( LockablesRegistry.INSTANCE.getOrCreateLockable(LockableUtils.unwrapToLockableObject(currentNode.getEntity())) , lockType );
LockableUtils.updateMapOfDerivedLockables( currentNode.getEntity(), lockType, result );
}

private void addResult(Entity entity, LockType lockType) {
switch ((S2plLockingLevel) RuntimeConfig.S2PL_LOCKING_LEVEL.getEnum()) {
case GLOBAL -> {
LockType currentLockType = result.get( LockablesRegistry.GLOBAL_SCHEMA_LOCKABLE );
if ( currentLockType == null || currentLockType == LockType.EXCLUSIVE ) {
result.put( LockablesRegistry.GLOBAL_SCHEMA_LOCKABLE, lockType );

}
}
case NAMESPACE -> {
Lockable lockable = LockablesRegistry.INSTANCE.getOrCreateLockable(LockableUtils.getNamespaceLockableObjectOfEntity(entity));
LockType currentLockType = result.get( lockable );
if ( currentLockType == null || currentLockType == LockType.EXCLUSIVE ) {
result.put( lockable, lockType );
}
}
case ENTITY -> {
Lockable lockable = LockablesRegistry.INSTANCE.getOrCreateLockable(entity);
result.put(lockable, lockType);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package org.polypheny.db.transaction.locking;

public class GlobalSchemaLockable extends LockableImpl {
public class GlobalLockable extends LockableImpl {

public GlobalSchemaLockable() {
public GlobalLockable() {
super( null );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,23 @@

package org.polypheny.db.transaction.locking;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.NonNull;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.Entity;
import org.polypheny.db.catalog.entity.logical.LogicalCollection;
import org.polypheny.db.catalog.entity.logical.LogicalNamespace;
import org.polypheny.db.config.RuntimeConfig;
import org.polypheny.db.prepare.Context;
import org.polypheny.db.processing.QueryContext.ParsedQueryContext;
import org.polypheny.db.transaction.locking.Lockable.LockType;

public class LockableUtils {

public static LockableObject unwrapToLockableObject( Entity entity ) {
Optional<LockableObject> lockableObject = entity.unwrap( LockableObject.class );
if ( lockableObject.isPresent() ) {
return lockableObject.get();
}
throw new RuntimeException( "Could not unwrap lockableObject" );
}


public static Lockable convertToLockable( @NonNull LockableObject lockableObject ) {
switch ( lockableObject.getObjectType() ) {
switch ( lockableObject.getLockableObjectType() ) {
case NAMESPACE -> {
return convertNamespaceToLockable( lockableObject );
}
Expand All @@ -43,11 +41,8 @@ public static Lockable convertToLockable( @NonNull LockableObject lockableObject
return convertEntityToLockable( lockableObject );
}

default -> {
throw new IllegalArgumentException( "Can not convert object of unknown type to lockable: " + lockableObject.getObjectType() );
}
default -> throw new IllegalArgumentException( "Can not convert object of unknown type to lockable: " + lockableObject.getLockableObjectType() );
}

}


Expand All @@ -68,4 +63,77 @@ public static LockableObject getNamespaceLockableObjectOfEntity( Entity entity )
return Catalog.getInstance().getSnapshot().getNamespace( entity.getNamespaceId() ).orElseThrow();
}


public static void updateMapOfDerivedLockables( Entity entity, LockType lockType, Map<Lockable, LockType> currentLockables ) {
switch ( (S2plLockingLevel) RuntimeConfig.S2PL_LOCKING_LEVEL.getEnum() ) {
case GLOBAL -> updateLockableMapEntry( LockablesRegistry.GLOBAL_SCHEMA_LOCKABLE, lockType, currentLockables );
case NAMESPACE -> {
Lockable lockable = LockablesRegistry.INSTANCE.getOrCreateLockable( LockableUtils.getNamespaceLockableObjectOfEntity( entity ) );
updateLockableMapEntry( lockable, lockType, currentLockables );
}
case ENTITY -> {
Lockable lockable = LockablesRegistry.INSTANCE.getOrCreateLockable( entity );
updateLockableMapEntry( lockable, lockType, currentLockables );
}
}
}


public static void updateMapOfDerivedLockables( LockableObject lockableObject, LockType lockType, Map<Lockable, LockType> currentLockables ) {
switch ( lockableObject.getLockableObjectType() ) {
case NAMESPACE -> {
S2plLockingLevel lockingLevel = (S2plLockingLevel) RuntimeConfig.S2PL_LOCKING_LEVEL.getEnum();
if ( lockingLevel == S2plLockingLevel.GLOBAL ) {
updateLockableMapEntry( LockablesRegistry.GLOBAL_SCHEMA_LOCKABLE, lockType, currentLockables );
return;
}
// this always returns a lockable on namespace level as we checked the lockable object type
Lockable lockable = LockablesRegistry.INSTANCE.getOrCreateLockable( lockableObject );
updateLockableMapEntry( lockable, lockType, currentLockables );
}
case ENTITY -> updateMapOfDerivedLockables( (Entity) lockableObject, lockType, currentLockables );
}
}


private static void updateLockableMapEntry( Lockable lockable, LockType lockType, Map<Lockable, LockType> currentLockables ) {
LockType currentLockType = currentLockables.get( lockable );
if ( currentLockType == null || currentLockType == LockType.EXCLUSIVE ) {
currentLockables.put( lockable, lockType );
}
}

public static Map<Lockable, LockType> getMapWithGlobalLockable(LockType lockType) {
HashMap<Lockable, LockType> lockableObjects = new HashMap<>();
lockableObjects.put( LockablesRegistry.GLOBAL_SCHEMA_LOCKABLE, lockType );
return lockableObjects;
}

public static Map<Lockable, LockType> getMapOfNamespaceLockable(String namespaceName, Context context, LockType lockType) {
Optional<LogicalNamespace> logicalNamespace = context.getSnapshot().getNamespace( namespaceName );
HashMap<Lockable, LockType> lockableObjects = new HashMap<>();
logicalNamespace.ifPresent( n -> LockableUtils.updateMapOfDerivedLockables( n, lockType, lockableObjects ) );
return lockableObjects;
}

public static Map<Lockable, LockType> getMapOfNamespaceLockableFromContext(Context context, ParsedQueryContext parsedQueryContext, LockType lockType) {
long namespaceId = parsedQueryContext.getNamespaceId();
LogicalNamespace namespace = context.getSnapshot().getNamespace( namespaceId ).orElseThrow();
return getMapOfLockableFromObject( namespace, lockType );
}

public static Map<Lockable, LockType> getMapOfCollectionLockableFromContext(Context context, ParsedQueryContext parsedQueryContext, LockType lockType) {
long namespaceId = parsedQueryContext.getQueryNode().orElseThrow().getNamespaceId();
LogicalCollection collection = context.getSnapshot().doc().getCollection( namespaceId ).orElseThrow();
return getMapOfLockableFromObject( collection, lockType );
}

public static Map<Lockable, LockType> getMapOfLockableFromObject(LockableObject lockableObject, LockType lockType) {
HashMap<Lockable, LockType> lockableObjects = new HashMap<>();
LockableUtils.updateMapOfDerivedLockables( lockableObject, lockType, lockableObjects );
return lockableObjects;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

import java.util.concurrent.ConcurrentHashMap;
import lombok.NonNull;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.Entity;
import org.polypheny.db.catalog.entity.logical.LogicalNamespace;

public class LockablesRegistry {

public static final GlobalSchemaLockable GLOBAL_SCHEMA_LOCKABLE = new GlobalSchemaLockable();
public static final GlobalLockable GLOBAL_SCHEMA_LOCKABLE = new GlobalLockable();
public static final LockablesRegistry INSTANCE = new LockablesRegistry();

private final ConcurrentHashMap<LockableObject, Lockable> lockables = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.polypheny.db.cypher.admin;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import org.polypheny.db.catalog.entity.logical.LogicalNamespace;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
Expand All @@ -29,6 +32,9 @@
import org.polypheny.db.prepare.Context;
import org.polypheny.db.processing.QueryContext.ParsedQueryContext;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.locking.Lockable;
import org.polypheny.db.transaction.locking.Lockable.LockType;
import org.polypheny.db.transaction.locking.LockableUtils;


@Getter
Expand Down Expand Up @@ -65,4 +71,9 @@ public void execute( Context context, Statement statement, ParsedQueryContext pa
DdlManager.getInstance().replaceGraphAlias( graphs.get( 0 ).id, targetName, aliasName );
}

@Override
public Map<Lockable, LockType> deriveLockables( Context context, ParsedQueryContext parsedQueryContext ) {
return LockableUtils.getMapWithGlobalLockable( LockType.EXCLUSIVE );
}

}
Loading

0 comments on commit 20be93a

Please sign in to comment.