Skip to content

Commit

Permalink
rebase changes
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy committed Sep 5, 2024
1 parent 1e1d6d1 commit 396530d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ final class MongoDB2Constants {
static final String COL_OBJ_TYPE = "y";
static final String COL_OBJ_VERS = "V";
static final String COL_OBJ_VALUE = "v";
static final String COL_OBJ_REFERENCED = "z";

static final String ID_REPO_PATH = ID_PROPERTY_NAME + "." + COL_REPO;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.projectnessie.versioned.storage.common.persist.ObjTypes.objTypeByName;
import static org.projectnessie.versioned.storage.common.persist.Reference.reference;
import static org.projectnessie.versioned.storage.mongodb2.MongoDB2Constants.COL_OBJ_ID;
import static org.projectnessie.versioned.storage.mongodb2.MongoDB2Constants.COL_OBJ_REFERENCED;
import static org.projectnessie.versioned.storage.mongodb2.MongoDB2Constants.COL_OBJ_TYPE;
import static org.projectnessie.versioned.storage.mongodb2.MongoDB2Constants.COL_OBJ_VALUE;
import static org.projectnessie.versioned.storage.mongodb2.MongoDB2Constants.COL_OBJ_VERS;
Expand Down Expand Up @@ -61,11 +62,13 @@
import com.mongodb.bulk.BulkWriteError;
import com.mongodb.bulk.BulkWriteInsert;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.bulk.BulkWriteUpsert;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.DeleteResult;
Expand Down Expand Up @@ -437,11 +440,17 @@ private <T extends Obj> void fetchObjsPage(
@Override
public boolean storeObj(@Nonnull Obj obj, boolean ignoreSoftSizeRestrictions)
throws ObjTooLargeException {
Document doc = objToDoc(obj, ignoreSoftSizeRestrictions);
long referenced = config.currentTimeMicros();
Document doc = objToDoc(obj, referenced, ignoreSoftSizeRestrictions);
try {
backend.objs().insertOne(doc);
} catch (MongoWriteException e) {
if (e.getError().getCategory() == DUPLICATE_KEY) {
backend
.objs()
.updateOne(
eq(ID_PROPERTY_NAME, idObjDoc(obj.id())),
Updates.set(COL_OBJ_REFERENCED, referenced));
return false;
}
throw handleMongoWriteException(e);
Expand All @@ -455,22 +464,80 @@ public boolean storeObj(@Nonnull Obj obj, boolean ignoreSoftSizeRestrictions)
@Nonnull
@Override
public boolean[] storeObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
boolean[] r = new boolean[objs.length];

storeObjsWrite(objs, r);

List<ObjId> updateReferenced = new ArrayList<>();
for (int i = 0; i < r.length; i++) {
if (!r[i]) {
Obj obj = objs[i];
if (obj != null) {
updateReferenced.add(obj.id());
}
}
}

if (!updateReferenced.isEmpty()) {
storeObjsUpdateReferenced(objs, updateReferenced);
}

return r;
}

private void storeObjsUpdateReferenced(Obj[] objs, List<ObjId> updateReferenced) {
long referenced = config.currentTimeMicros();
List<UpdateOneModel<Document>> docs =
updateReferenced.stream()
.map(
id ->
new UpdateOneModel<Document>(
eq(ID_PROPERTY_NAME, idObjDoc(id)),
Updates.set(COL_OBJ_REFERENCED, referenced)))
.collect(toList());
List<WriteModel<Document>> updates = new ArrayList<>(docs);
while (!updates.isEmpty()) {
try {
backend.objs().bulkWrite(updates);
break;
} catch (MongoBulkWriteException e) {
// Handle "insert of already existing objects".
//
// MongoDB returns a BulkWriteResult of what _would_ have succeeded. Use that information
// to retry the bulk write to make progress.
List<BulkWriteError> errs = e.getWriteErrors();
for (BulkWriteError err : errs) {
throw handleMongoWriteError(e, err);
}
BulkWriteResult res = e.getWriteResult();
updates.clear();
res.getUpserts().stream()
.map(MongoDB2Persist::objIdFromBulkWriteUpsert)
.mapToInt(id -> objIdIndex(objs, id))
.mapToObj(docs::get)
.forEach(updates::add);
} catch (RuntimeException e) {
throw unhandledException(e);
}
}
}

private void storeObjsWrite(Obj[] objs, boolean[] r) throws ObjTooLargeException {
List<WriteModel<Document>> docs = new ArrayList<>(objs.length);
long referenced = config.currentTimeMicros();
for (Obj obj : objs) {
if (obj != null) {
docs.add(new InsertOneModel<>(objToDoc(obj, false)));
docs.add(new InsertOneModel<>(objToDoc(obj, referenced, false)));
}
}

boolean[] r = new boolean[objs.length];

List<WriteModel<Document>> inserts = new ArrayList<>(docs);
while (!inserts.isEmpty()) {
try {
BulkWriteResult res = backend.objs().bulkWrite(inserts);
for (BulkWriteInsert insert : res.getInserts()) {
ObjId id = objIdFromBulkWriteInsert(insert);
r[objIdIndex(objs, id)] = id != null;
r[objIdIndex(objs, id)] = true;
}
break;
} catch (MongoBulkWriteException e) {
Expand All @@ -495,7 +562,6 @@ public boolean[] storeObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
throw unhandledException(e);
}
}
return r;
}

private static ObjId objIdFromDoc(Document doc) {
Expand All @@ -515,6 +581,10 @@ private static ObjId objIdFromBulkWriteInsert(BulkWriteInsert insert) {
return ObjId.objIdFromByteArray(insert.getId().asDocument().getBinary(COL_OBJ_ID).getData());
}

private static ObjId objIdFromBulkWriteUpsert(BulkWriteUpsert upsert) {
return ObjId.objIdFromByteArray(upsert.getId().asDocument().getBinary(COL_OBJ_ID).getData());
}

@Override
public void deleteObj(@Nonnull ObjId id) {
try {
Expand Down Expand Up @@ -545,7 +615,8 @@ public void upsertObj(@Nonnull Obj obj) throws ObjTooLargeException {

ReplaceOptions options = upsertOptions();

Document doc = objToDoc(obj, false);
long referenced = config.currentTimeMicros();
Document doc = objToDoc(obj, referenced, false);
UpdateResult result;
try {
result = backend.objs().replaceOne(eq(ID_PROPERTY_NAME, idObjDoc(id)), doc, options);
Expand All @@ -569,13 +640,14 @@ private static ReplaceOptions upsertOptions() {
public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
ReplaceOptions options = upsertOptions();

long referenced = config.currentTimeMicros();
List<WriteModel<Document>> docs = new ArrayList<>(objs.length);
for (Obj obj : objs) {
if (obj != null) {
ObjId id = obj.id();
docs.add(
new ReplaceOneModel<>(
eq(ID_PROPERTY_NAME, idObjDoc(id)), objToDoc(obj, false), options));
eq(ID_PROPERTY_NAME, idObjDoc(id)), objToDoc(obj, referenced, false), options));
}
}

Expand Down Expand Up @@ -623,7 +695,8 @@ public boolean updateConditional(@Nonnull UpdateableObj expected, @Nonnull Updat
checkArgument(expected.type().equals(newValue.type()));
checkArgument(!expected.versionToken().equals(newValue.versionToken()));

Document doc = objToDoc(newValue, false);
long referenced = config.currentTimeMicros();
Document doc = objToDoc(newValue, referenced, false);

List<Bson> updates =
doc.entrySet().stream()
Expand Down Expand Up @@ -675,23 +748,24 @@ private <T extends Obj> T docToObj(
}
Binary bin = doc.get(COL_OBJ_VALUE, Binary.class);
String versionToken = doc.getString(COL_OBJ_VERS);
Obj obj = deserializeObj(id, bin.getData(), versionToken);
Long referenced = doc.getLong(COL_OBJ_REFERENCED);
Obj obj = deserializeObj(id, referenced != null ? referenced : 0L, bin.getData(), versionToken);
@SuppressWarnings("unchecked")
T r = (T) obj;
return r;
}

private Document objToDoc(@Nonnull Obj obj, boolean ignoreSoftSizeRestrictions)
private Document objToDoc(@Nonnull Obj obj, long referenced, boolean ignoreSoftSizeRestrictions)
throws ObjTooLargeException {
ObjId id = obj.id();
checkArgument(id != null, "Obj to store must have a non-null ID");

ObjType type = obj.type();

Document doc = new Document();
Document inner = new Document();
doc.put(ID_PROPERTY_NAME, idObjDoc(id));
doc.put(COL_OBJ_TYPE, type.shortName());
doc.put(COL_OBJ_REFERENCED, referenced);
UpdateableObj.extractVersionToken(obj).ifPresent(token -> doc.put(COL_OBJ_VERS, token));
int incrementalIndexSizeLimit =
ignoreSoftSizeRestrictions ? Integer.MAX_VALUE : effectiveIncrementalIndexSizeLimit();
Expand Down

0 comments on commit 396530d

Please sign in to comment.