Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use insert .. on conflict for bulk import operations #3301

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
upsert cleanup
  • Loading branch information
elexisvenator committed Oct 2, 2024
commit c9371f5179e6050a8dbaa1ced5134d713d005b2b
35 changes: 21 additions & 14 deletions src/Marten/DocumentStore.cs
Original file line number Diff line number Diff line change
@@ -17,10 +17,8 @@
using Marten.Internal.Sessions;
using Marten.Services;
using Marten.Storage;
using Microsoft.CodeAnalysis.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Weasel.Core.Migrations;
using Weasel.Postgresql.Connections;
using IsolationLevel = System.Data.IsolationLevel;

@@ -115,18 +113,18 @@ public ValueTask DisposeAsync()
public AdvancedOperations Advanced { get; }

public void BulkInsert<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000)
int batchSize = 1000, string? updateCondition = null)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
bulkInsertion.BulkInsert(documents, mode, batchSize);
}

public void BulkInsertEnlistTransaction<T>(IReadOnlyCollection<T> documents,
Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000)
int batchSize = 1000, string? updateCondition = null)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize);
bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize, updateCondition);
}

public void BulkInsertDocuments(IEnumerable<object> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
@@ -138,10 +136,11 @@ public void BulkInsertDocuments(IEnumerable<object> documents, BulkInsertMode mo

public void BulkInsert<T>(string tenantId, IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000)
int batchSize = 1000,
string? updateCondition = null)
{
var bulkInsertion = new BulkInsertion(Tenancy.GetTenant(Options.MaybeCorrectTenantId(tenantId)), Options);
bulkInsertion.BulkInsert(documents, mode, batchSize);
bulkInsertion.BulkInsert(documents, mode, batchSize, updateCondition);
}

public void BulkInsertDocuments(string tenantId, IEnumerable<object> documents,
@@ -154,27 +153,35 @@ public void BulkInsertDocuments(string tenantId, IEnumerable<object> documents,

public Task BulkInsertAsync<T>(IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000, CancellationToken cancellation = default)
int batchSize = 1000,
string? updateCondition = null,
CancellationToken cancellation = default)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation);
return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation);
}

public Task BulkInsertEnlistTransactionAsync<T>(IReadOnlyCollection<T> documents,
Transaction transaction,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000, CancellationToken cancellation = default)
int batchSize = 1000,
string? updateCondition = null,
CancellationToken cancellation = default)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, cancellation);
return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, updateCondition, cancellation);
}

public async Task BulkInsertAsync<T>(string tenantId, IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000,
public async Task BulkInsertAsync<T>(
string tenantId,
IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000,
string? updateCondition = null,
CancellationToken cancellation = default)
{
var bulkInsertion = new BulkInsertion(await Tenancy.GetTenantAsync(Options.MaybeCorrectTenantId(tenantId)).ConfigureAwait(false), Options);
await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation).ConfigureAwait(false);
await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation).ConfigureAwait(false);
}

public Task BulkInsertDocumentsAsync(IEnumerable<object> documents,
12 changes: 6 additions & 6 deletions src/Marten/IDocumentStore.cs
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ public interface IDocumentStore: IDisposable
/// <param name="mode"></param>
/// <param name="batchSize"></param>
void BulkInsert<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000);
int batchSize = 1000, string? updateCondition = null);

/// <summary>
/// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store
@@ -58,7 +58,7 @@ void BulkInsert<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode = BulkI
/// <param name="mode"></param>
/// <param name="batchSize"></param>
void BulkInsertEnlistTransaction<T>(IReadOnlyCollection<T> documents, Transaction transaction,
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000);
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null);

/// <summary>
/// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store
@@ -70,7 +70,7 @@ void BulkInsertEnlistTransaction<T>(IReadOnlyCollection<T> documents, Transactio
/// <param name="mode"></param>
/// <param name="batchSize"></param>
void BulkInsert<T>(string tenantId, IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000);
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null);

/// <summary>
/// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store
@@ -81,7 +81,7 @@ void BulkInsert<T>(string tenantId, IReadOnlyCollection<T> documents,
/// <param name="mode"></param>
/// <param name="batchSize"></param>
Task BulkInsertAsync<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000, CancellationToken cancellation = default);
int batchSize = 1000, string? updateCondition = null, CancellationToken cancellation = default);

/// <summary>
/// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store
@@ -94,7 +94,7 @@ Task BulkInsertAsync<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode =
/// <param name="batchSize"></param>
Task BulkInsertEnlistTransactionAsync<T>(IReadOnlyCollection<T> documents, Transaction transaction,
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000,
CancellationToken cancellation = default);
string? updateCondition = null, CancellationToken cancellation = default);

/// <summary>
/// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store
@@ -107,7 +107,7 @@ Task BulkInsertEnlistTransactionAsync<T>(IReadOnlyCollection<T> documents, Trans
/// <param name="batchSize"></param>
Task BulkInsertAsync<T>(string tenantId, IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000,
CancellationToken cancellation = default);
string? updateCondition = null, CancellationToken cancellation = default);

/// <summary>
/// Open a new IDocumentSession with the supplied DocumentTracking.
77 changes: 42 additions & 35 deletions src/Marten/Storage/BulkInsertion.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
@@ -29,8 +28,9 @@ public void Dispose()
}

public void BulkInsert<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000, string upsertCondition = null)
int batchSize = 1000, string updateCondition = null)
{
ValidateupdateCondition<T>(mode, updateCondition);
if (typeof(T) == typeof(object))
{
BulkInsertDocuments(documents.OfType<object>(), mode);
@@ -45,7 +45,7 @@ public void BulkInsert<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode

try
{
bulkInsertDocuments(documents, batchSize, conn, mode, upsertCondition);
bulkInsertDocuments(documents, batchSize, conn, mode, updateCondition);

tx.Commit();
}
@@ -61,8 +61,9 @@ public void BulkInsertEnlistTransaction<T>(IReadOnlyCollection<T> documents,
Transaction transaction,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000,
string upsertCondition = null)
string updateCondition = null)
{
ValidateupdateCondition<T>(mode, updateCondition);
if (typeof(T) == typeof(object))
{
BulkInsertDocumentsEnlistTransaction(documents.OfType<object>(), transaction, mode);
@@ -74,16 +75,17 @@ public void BulkInsertEnlistTransaction<T>(IReadOnlyCollection<T> documents,
using var conn = _tenant.Database.CreateConnection();
conn.Open();
conn.EnlistTransaction(transaction);
bulkInsertDocuments(documents, batchSize, conn, mode, upsertCondition);
bulkInsertDocuments(documents, batchSize, conn, mode, updateCondition);
}
}

public async Task BulkInsertAsync<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode, int batchSize,
string upsertCondition = null, CancellationToken cancellation = default)
string updateCondition = null, CancellationToken cancellation = default)
{
ValidateupdateCondition<T>(mode, updateCondition);
if (typeof(T) == typeof(object))
{
await BulkInsertDocumentsAsync(documents.OfType<object>(), mode, batchSize, upsertCondition, cancellation)
await BulkInsertDocumentsAsync(documents.OfType<object>(), mode, batchSize, cancellation)
.ConfigureAwait(false);
}
else
@@ -96,7 +98,7 @@ await BulkInsertDocumentsAsync(documents.OfType<object>(), mode, batchSize, upse
var tx = await conn.BeginTransactionAsync(cancellation).ConfigureAwait(false);
try
{
await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, upsertCondition, cancellation).ConfigureAwait(false);
await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, updateCondition, cancellation).ConfigureAwait(false);

await tx.CommitAsync(cancellation).ConfigureAwait(false);
}
@@ -109,25 +111,26 @@ await BulkInsertDocumentsAsync(documents.OfType<object>(), mode, batchSize, upse
}

public async Task BulkInsertEnlistTransactionAsync<T>(IReadOnlyCollection<T> documents, Transaction transaction,
BulkInsertMode mode, int batchSize, string upsertCondition = null, CancellationToken cancellation = default)
BulkInsertMode mode, int batchSize, string updateCondition = null, CancellationToken cancellation = default)
{
ValidateupdateCondition<T>(mode, updateCondition);
if (typeof(T) == typeof(object))
{
await BulkInsertDocumentsEnlistTransactionAsync(documents.OfType<object>(), transaction, mode, batchSize,
upsertCondition, cancellation).ConfigureAwait(false);
cancellation).ConfigureAwait(false);
}
else
{
await _tenant.Database.EnsureStorageExistsAsync(typeof(T), cancellation).ConfigureAwait(false);
await using var conn = _tenant.Database.CreateConnection();
await conn.OpenAsync(cancellation).ConfigureAwait(false);
conn.EnlistTransaction(transaction);
await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, upsertCondition, cancellation).ConfigureAwait(false);
await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, updateCondition, cancellation).ConfigureAwait(false);
}
}

public void BulkInsertDocuments(IEnumerable<object> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000, string upsertCondition = null)
int batchSize = 1000, string updateCondition = null)
{
var groups = bulkInserters(documents);

@@ -153,7 +156,7 @@ public void BulkInsertDocumentsEnlistTransaction(IEnumerable<object> documents,
Transaction transaction,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000,
string upsertCondition = null)
string updateCondition = null)
{
var groups = bulkInserters(documents);
var types = documentTypes(documents);
@@ -187,7 +190,7 @@ private static IBulkInserter[] bulkInserters(IEnumerable<object> documents)
}

public async Task BulkInsertDocumentsAsync(IEnumerable<object> documents, BulkInsertMode mode, int batchSize,
string upsertCondition = null, CancellationToken cancellation = default)
CancellationToken cancellation = default)
{
var groups = bulkInserters(documents);

@@ -199,7 +202,7 @@ public async Task BulkInsertDocumentsAsync(IEnumerable<object> documents, BulkIn
try
{
foreach (var group in groups)
await group.BulkInsertAsync(batchSize, conn, this, mode, upsertCondition, cancellation).ConfigureAwait(false);
await group.BulkInsertAsync(batchSize, conn, this, mode, cancellation).ConfigureAwait(false);

await tx.CommitAsync(cancellation).ConfigureAwait(false);
}
@@ -215,8 +218,7 @@ public async Task BulkInsertDocumentsEnlistTransactionAsync(
Transaction transaction,
BulkInsertMode mode,
int batchSize,
string upsertCondition = null,
CancellationToken cancellation = default
CancellationToken cancellation
)
{
var groups = bulkInserters(documents);
@@ -231,14 +233,12 @@ public async Task BulkInsertDocumentsEnlistTransactionAsync(
conn.EnlistTransaction(transaction);

foreach (var group in groups)
await group.BulkInsertAsync(batchSize, conn, this, mode, upsertCondition, cancellation).ConfigureAwait(false);
await group.BulkInsertAsync(batchSize, conn, this, mode, cancellation).ConfigureAwait(false);
}

private void bulkInsertDocuments<T>(IReadOnlyCollection<T> documents, int batchSize, NpgsqlConnection conn,
BulkInsertMode mode, string upsertCondition)
BulkInsertMode mode, string updateCondition)
{
ValidateUpsertCondition(mode, upsertCondition);

var provider = _tenant.Database.Providers.StorageFor<T>();
var loader = provider.BulkLoader!;

@@ -280,17 +280,15 @@ private void bulkInsertDocuments<T>(IReadOnlyCollection<T> documents, int batchS
}
else if (mode == BulkInsertMode.OverwriteExisting)
{
var upsert = string.Format(loader.UpsertFromTempTable(), upsertCondition ?? "1 = 1");
var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "1 = 1");

conn.CreateCommand(upsert).ExecuteNonQuery();
}
}

private async Task bulkInsertDocumentsAsync<T>(IReadOnlyCollection<T> documents, int batchSize,
NpgsqlConnection conn, BulkInsertMode mode, string upsertCondition, CancellationToken cancellation)
NpgsqlConnection conn, BulkInsertMode mode, string updateCondition, CancellationToken cancellation)
{
ValidateUpsertCondition(mode, upsertCondition);

var provider = _tenant.Database.Providers.StorageFor<T>();
var loader = provider.BulkLoader!;

@@ -332,18 +330,28 @@ private async Task bulkInsertDocumentsAsync<T>(IReadOnlyCollection<T> documents,
}
else if (mode == BulkInsertMode.OverwriteExisting)
{
var upsert = string.Format(loader.UpsertFromTempTable(), upsertCondition ?? "1 = 1");
var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "1 = 1");


await conn.CreateCommand(upsert).ExecuteNonQueryAsync(cancellation).ConfigureAwait(false);
}
}

private static void ValidateUpsertCondition(BulkInsertMode mode, string upsertCondition)
private static void ValidateupdateCondition<T>(BulkInsertMode mode, string updateCondition)
{
if (mode != BulkInsertMode.OverwriteExisting && !string.IsNullOrWhiteSpace(upsertCondition))
if (updateCondition is null)
{
return;
}

if (typeof(T) == typeof(object))
{
throw new ArgumentException($"An update condition can not be used on a collection of <object>, use a collection of a specific document type instead.", nameof(updateCondition));
}

if (mode != BulkInsertMode.OverwriteExisting)
{
throw new ArgumentException($"An upsert condition can only be provided when using {BulkInsertMode.OverwriteExisting}", nameof(upsertCondition));
throw new ArgumentException($"An update condition can only be provided when using {BulkInsertMode.OverwriteExisting}", nameof(updateCondition));
}
}

@@ -376,10 +384,10 @@ await loader.LoadIntoTempTableAsync(_tenant, Serializer, conn, documents, cancel

internal interface IBulkInserter
{
void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, BulkInsertMode mode, string upsertCondition = null);
void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, BulkInsertMode mode);

Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInsertion bulkInsertion, BulkInsertMode mode,
string upsertCondition, CancellationToken cancellation);
CancellationToken cancellation);
}

internal class BulkInserter<T>: IBulkInserter
@@ -392,19 +400,18 @@ public BulkInserter(IEnumerable<object> documents)
}

public void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent,
BulkInsertMode mode, string upsertCondition = null)
BulkInsertMode mode)
{
parent._tenant.Database.EnsureStorageExists(typeof(T));
parent.bulkInsertDocuments(_documents, batchSize, connection, mode, upsertCondition);
parent.bulkInsertDocuments(_documents, batchSize, connection, mode, null);
}

public async Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInsertion parent,
BulkInsertMode mode,
string upsertCondition,
CancellationToken cancellation)
{
await parent._tenant.Database.EnsureStorageExistsAsync(typeof(T), cancellation).ConfigureAwait(false);
await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, upsertCondition, cancellation)
await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, null, cancellation)
.ConfigureAwait(false);
}
}
Loading