Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Oct 2, 2023
1 parent 2cc42df commit 475aa5d
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ private void startTransaction(boolean preferThreading) throws SailException {
if (!running.get()) {
logger.warn(
"LmdbSailStore was closed while active transaction was waiting for the next operation. Forcing a rollback!");
opQueue.add(ROLLBACK_TRANSACTION);
rollback();
} else if (Thread.interrupted()) {
throw new InterruptedException();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*******************************************************************************/
package org.eclipse.rdf4j.testsuite.sail;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand Down Expand Up @@ -323,15 +324,15 @@ public void testConcurrentConnectionsShutdown() throws InterruptedException {
}
}

CountDownLatch countDownLatch = new CountDownLatch(1);
Thread thread = new Thread(() -> {
CountDownLatch countDownLatch1 = new CountDownLatch(1);
Thread thread1 = new Thread(() -> {
SailConnection connection = store.getConnection();
countDownLatch.countDown();
countDownLatch1.countDown();
connection.begin(IsolationLevels.NONE);
connection.addStatement(RDF.FIRST, RDF.TYPE, RDF.PROPERTY);
});
thread.setName("Thread 1");
thread.start();
thread1.setName("Thread 1");
thread1.start();

CountDownLatch countDownLatch2 = new CountDownLatch(1);
Thread thread2 = new Thread(() -> {
Expand All @@ -344,15 +345,68 @@ public void testConcurrentConnectionsShutdown() throws InterruptedException {
thread2.setName("Thread 2");
thread2.start();

countDownLatch.await();
countDownLatch1.await();
countDownLatch2.await();

Thread.sleep(1000);
while (thread1.isAlive() && thread2.isAlive()) {
Thread.yield();
}

store.shutDown();

}

@Test
public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedException {
if (store instanceof AbstractSail) {
((AbstractSail) store).setConnectionTimeOut(200);
} else if (store instanceof SailWrapper) {
Sail baseSail = ((SailWrapper) store).getBaseSail();
if (baseSail instanceof AbstractSail) {
((AbstractSail) baseSail).setConnectionTimeOut(200);
}
}

CountDownLatch countDownLatch1 = new CountDownLatch(1);
Thread thread1 = new Thread(() -> {
SailConnection connection = store.getConnection();
countDownLatch1.countDown();
connection.begin(IsolationLevels.READ_COMMITTED);
connection.addStatement(RDF.FIRST, RDF.TYPE, RDF.PROPERTY);
});
thread1.setName("Thread 1");
thread1.start();

CountDownLatch countDownLatch2 = new CountDownLatch(1);
Thread thread2 = new Thread(() -> {
SailConnection connection = store.getConnection();
countDownLatch2.countDown();
connection.begin(IsolationLevels.READ_COMMITTED);
connection.addStatement(RDF.REST, RDF.TYPE, RDF.PROPERTY);

});
thread2.setName("Thread 2");
thread2.start();

countDownLatch1.await();
countDownLatch2.await();

while (thread1.isAlive() && thread2.isAlive()) {
Thread.yield();
}
store.shutDown();

store.init();

try (SailConnection connection = store.getConnection()) {
connection.begin();
long size = connection.size();
assertEquals(0, size);
connection.commit();
}

}

@Test
public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException {
if (store instanceof AbstractSail) {
Expand All @@ -368,15 +422,15 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept
AtomicReference<SailConnection> connection1 = new AtomicReference<>();
AtomicReference<SailConnection> connection2 = new AtomicReference<>();

CountDownLatch countDownLatch = new CountDownLatch(1);
Thread thread = new Thread(() -> {
CountDownLatch countDownLatch1 = new CountDownLatch(1);
Thread thread1 = new Thread(() -> {
connection1.set(store.getConnection());
countDownLatch.countDown();
countDownLatch1.countDown();
connection1.get().begin(IsolationLevels.NONE);
connection1.get().clear();
});
thread.setName("Thread 1");
thread.start();
thread1.setName("Thread 1");
thread1.start();

CountDownLatch countDownLatch2 = new CountDownLatch(1);
Thread thread2 = new Thread(() -> {
Expand All @@ -389,16 +443,76 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept
thread2.setName("Thread 2");
thread2.start();

countDownLatch.await();
countDownLatch1.await();
countDownLatch2.await();

Thread.sleep(1000);
while (thread1.isAlive() && thread2.isAlive()) {
Thread.yield();
}

Thread thread3 = new Thread(() -> {
try {
if (thread2.isAlive()) {
connection2.get().close();
connection1.get().close();
} else {
connection1.get().close();
connection2.get().close();
}
} catch (SailException ignored) {
}

try (SailConnection connection = store.getConnection()) {
connection.begin();
long size = connection.size();
connection.commit();
assertThat(size).isLessThanOrEqualTo(1);
}

store.shutDown();
}

@Test
public void testConcurrentConnectionsShutdownAndCloseRollback() throws InterruptedException {
if (store instanceof AbstractSail) {
((AbstractSail) store).setConnectionTimeOut(200);
}

try (SailConnection connection = store.getConnection()) {
connection.begin();
connection.addStatement(RDF.TYPE, RDF.TYPE, RDF.PROPERTY);
connection.commit();
}

AtomicReference<SailConnection> connection1 = new AtomicReference<>();
AtomicReference<SailConnection> connection2 = new AtomicReference<>();

CountDownLatch countDownLatch1 = new CountDownLatch(1);
Thread thread1 = new Thread(() -> {
connection1.set(store.getConnection());
countDownLatch1.countDown();
connection1.get().begin(IsolationLevels.READ_UNCOMMITTED);
connection1.get().clear();
});
thread3.setName("Thread 3");
thread3.start();
thread1.setName("Thread 1");
thread1.start();

CountDownLatch countDownLatch2 = new CountDownLatch(1);
Thread thread2 = new Thread(() -> {
connection2.set(store.getConnection());
countDownLatch2.countDown();
connection2.get().begin(IsolationLevels.READ_UNCOMMITTED);
connection2.get().clear();

});
thread2.setName("Thread 2");
thread2.start();

countDownLatch1.await();
countDownLatch2.await();

while (thread1.isAlive() && thread2.isAlive()) {
Thread.yield();
}

try {
if (thread2.isAlive()) {
Expand All @@ -411,6 +525,13 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept
} catch (SailException ignored) {
}

try (SailConnection connection = store.getConnection()) {
connection.begin();
long size = connection.size();
connection.commit();
assertThat(size).isEqualTo(1);
}

store.shutDown();
}

Expand Down

0 comments on commit 475aa5d

Please sign in to comment.