-
Using the Thread Bucket Selection Strategy and choosing adequate values for the number of buckets and number of async worker threads, it is possible to eliminate contention on insertion. |
Beta Was this translation helpful? Give feedback.
Replies: 8 comments 20 replies
-
Obviously, the "brute-force" solution is to synchronize the method that does the update, but I have tried this and it slows down the the update by about 10%. |
Beta Was this translation helpful? Give feedback.
-
I use ArcadeDB embedded, in this particular test scenario the db contains very few records (< 1000, so it's a worst-case scenario for the page contention). and I have a multi-threaded client (4 threads in this particular configuration) that sends http requests to an Undertow handler that executes the sql update statement, it is guaranteed that each client thread sends a request to update a different record. |
Beta Was this translation helpful? Give feedback.
-
How do I obtain the This is what I've come up with, but it looks ugly, with all those lambdas:
|
Beta Was this translation helpful? Give feedback.
-
I have tried that code that I have posted, and I get a lot of Concurrent Modification Exceptions: These are a lot more than I got when I didn't use the async API and I didn't use the automatic transaction with retries, maybe because I have inserted the records using the sync API, so they all ended up in the same bucket. |
Beta Was this translation helpful? Give feedback.
-
I thought that maybe the new API works well only if the records have been inserted asynchronously, so I re-wrote my insert method, but now I don't get any records inserted at all. I attach my full method, which produces the following results, when passed a list of 107 objects to insert:
I have also tried to remove the transaction, but I get the same result. |
Beta Was this translation helpful? Give feedback.
-
I see you call an async update from an async transaction. The best in terms of performance is to execute the lookup in the main thread and then schedule an insert or an update asynchronously. below you can find a full test case that inserts 100K of orders, then sends another batch of orders, where half are updates and half are new orders. In the end, checks that everything is all right.
It looks pretty fast to me, but please let me know how long Hsqldb takes to do the same. /*
* Copyright 2023 Arcade Data Ltd
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.arcadedb.server;
import com.arcadedb.ContextConfiguration;
import com.arcadedb.GlobalConfiguration;
import com.arcadedb.database.Database;
import com.arcadedb.database.DatabaseFactory;
import com.arcadedb.database.MutableDocument;
import com.arcadedb.database.Record;
import com.arcadedb.index.IndexCursor;
import com.arcadedb.integration.misc.IntegrationUtils;
import com.arcadedb.schema.DocumentType;
import com.arcadedb.schema.Schema;
import com.arcadedb.schema.Type;
import com.arcadedb.serializer.json.JSONObject;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static com.arcadedb.server.BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS;
/**
* From Issue https://github.com/ArcadeData/arcadedb/discussion/1129
*/
public class BatchInsertUpdateTest {
static final String DATABASE_NAME = "BatchInsertUpdateTest";
private class CandidateOrder {
private final String[] values;
private CandidateOrder(final String[] values) {
this.values = values;
}
public String getProcessor() {
return values[0];
}
public String getTriggerRid() {
return values[1];
}
public String getStartTime() {
return values[2];
}
public String getStopTime() {
return values[3];
}
public String getStatus() {
return values[4];
}
public void setStatus(String newStatus) {
values[4] = newStatus;
}
}
@Test
public void testBatchAsyncInsertUpdate() {
final ContextConfiguration serverConfiguration = new ContextConfiguration();
final String rootPath = IntegrationUtils.setRootPath(serverConfiguration);
try (DatabaseFactory databaseFactory = new DatabaseFactory(rootPath + "/databases/" + DATABASE_NAME)) {
if (databaseFactory.exists())
databaseFactory.open().drop();
try (Database database = databaseFactory.create()) {
DocumentType type = database.getSchema().getOrCreateDocumentType("Order");
type.createProperty("processor", Type.STRING);
type.createProperty("vstart", Type.STRING);
type.createProperty("vstop", Type.STRING);
type.createTypeIndex(Schema.INDEX_TYPE.LSM_TREE, true, "processor", "vstart", "vstop");
}
ArcadeDBServer arcadeDBServer = new ArcadeDBServer(serverConfiguration);
arcadeDBServer.start();
Database database = arcadeDBServer.getDatabase(DATABASE_NAME);
try {
int TOTAL = 100_000;
final List<CandidateOrder> ordersToProcess = new ArrayList<>(TOTAL);
for (int i = 0; i < TOTAL; i++)
ordersToProcess.add(new CandidateOrder(new String[] { "" + i, "" + i, "" + i, "" + i, "created" }));
insertOrdersAsync(database, ordersToProcess);
for (int i = 0; i < TOTAL / 2; i++)
ordersToProcess.get(i).setStatus("updated");
for (int i = TOTAL / 2; i < TOTAL; i++) {
int k = TOTAL + i;
ordersToProcess.set(i, new CandidateOrder(new String[] { "" + k, "" + k, "" + k, "" + k, "created" }));
}
insertOrdersAsync(database, ordersToProcess);
int created = 0;
int updated = 0;
for (Iterator<Record> it = database.iterateType("Order", false); it.hasNext(); ) {
final String status = it.next().asDocument().getString("status");
if (status.equals("created"))
++created;
else if (status.equals("updated"))
++updated;
}
Assertions.assertEquals(TOTAL, created);
Assertions.assertEquals(TOTAL / 2, updated);
} finally {
arcadeDBServer.stop();
databaseFactory.open().drop();
}
}
}
private JSONObject insertOrdersAsync(final Database database, List<CandidateOrder> orders) {
final long begin = System.currentTimeMillis();
JSONObject result = new JSONObject();
CountDownLatch countDownLatch = new CountDownLatch(orders.size());
AtomicLong counter = new AtomicLong();
int[] firstOrderId = new int[1];
database.async().onError(exception -> {
exception.printStackTrace();
});
final AtomicLong autoIncrementOrderId = new AtomicLong();
for (CandidateOrder order : orders) {
IndexCursor indexCursor = database.lookupByKey("Order", new String[] { "processor", "vstart", "vstop" },
new Object[] { order.getProcessor(), order.getStartTime(), order.getStopTime() });
MutableDocument record;
if (indexCursor.hasNext()) {
record = indexCursor.next().getRecord().asDocument(true).modify();
record.set("id", autoIncrementOrderId.incrementAndGet());
record.set("processor", order.getProcessor());
record.set("trigger", order.getTriggerRid());
record.set("vstart", order.getStartTime());
record.set("vstop", order.getStopTime());
record.set("status", order.getStatus());
database.async().updateRecord(record, newRecord -> {
counter.incrementAndGet();
countDownLatch.countDown();
}, exception -> {
exception.printStackTrace();
});
} else {
record = database.newDocument("Order");
record.set("id", autoIncrementOrderId.incrementAndGet());
record.set("processor", order.getProcessor());
record.set("trigger", order.getTriggerRid());
record.set("vstart", order.getStartTime());
record.set("vstop", order.getStopTime());
record.set("status", order.getStatus());
database.async().createRecord(record, newRecord -> {
counter.incrementAndGet();
countDownLatch.countDown();
}, exception -> {
exception.printStackTrace();
autoIncrementOrderId.decrementAndGet();
});
}
}
database.async().waitCompletion(10_000);
result.put("totalRows", counter.get());
System.out.println("insert orders result = " + result + " in " + (System.currentTimeMillis() - begin) + "ms");
return result;
}
@BeforeAll
public static void beginTests() {
final ContextConfiguration serverConfiguration = new ContextConfiguration();
final String rootPath = IntegrationUtils.setRootPath(serverConfiguration);
GlobalConfiguration.SERVER_ROOT_PASSWORD.setValue(DEFAULT_PASSWORD_FOR_TESTS);
//GlobalConfiguration.TYPE_DEFAULT_BUCKETS.setValue(1);
try (DatabaseFactory databaseFactory = new DatabaseFactory(rootPath + "/databases/" + DATABASE_NAME)) {
if (databaseFactory.exists())
databaseFactory.open().drop();
}
}
@AfterAll
public static void endTests() {
TestServerHelper.checkActiveDatabases();
GlobalConfiguration.resetAll();
}
} |
Beta Was this translation helpful? Give feedback.
-
Ok, I made some progress. |
Beta Was this translation helpful? Give feedback.
-
and finally I have a test case that allows to reproduce the issue using the Console. Interestingly, if I execute the test on my laptop (8 cores), setting the number of buckets and the number of async worker threads to 6, the select from the Console returns 600 records out of 3000, but if I execute the test on the Customer's test server (24 cores), setting the number of buckets and the number of async worker threads to 8, the select from the Console returns 100 records out of 3000. I don't know if it means anything. |
Beta Was this translation helpful? Give feedback.
Yes, without the call to
waitCompletion()
, the async thread haven't committed pending operations. This is configurable with the settingarcadedb.asyncTxBatchSize
that is se to10,240
by default, namely it commits every 10K operations.You can also change that to commit every time, just use this at the beginning
GlobalConfiguration.ASYNC_TX_BATCH_SIZE.setValue(1);
but it would be much slower, of course.For your use case, if I understand correctly, since you have these batches of updates, the best way is to do as with my test: one thread checks if the record exists, and then it schedules create or update in async tasks.
When everything is sent, you just wait for the finish. At that point e…