Skip to content

Commit

Permalink
Merge pull request #18 from VirtoCommerce/feat/VCPS-import-completed
Browse files Browse the repository at this point in the history
Execute dataImporter.OnImportCompletedAsync on exceptions
  • Loading branch information
vladimir-buravlev authored Jan 4, 2024
2 parents e56c961 + 6ab2649 commit f52b451
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 34 deletions.
3 changes: 2 additions & 1 deletion src/VirtoCommerce.ImportModule.Core/ModuleConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public static class General
{
Name = "Import.RemainingEstimator",
GroupName = "Import",
ValueType = SettingValueType.ShortText
ValueType = SettingValueType.ShortText,
DefaultValue = "DefaultRemainingEstimator",
};

public static IEnumerable<SettingDescriptor> AllSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using VirtoCommerce.ImportModule.Core.Common;
using VirtoCommerce.ImportModule.Core.Models;
using VirtoCommerce.ImportModule.Core.Services;
using VirtoCommerce.Platform.Core.Exceptions;
using VirtoCommerce.Platform.Core.Settings;

namespace VirtoCommerce.ImportModule.Data.Services
Expand Down Expand Up @@ -34,26 +35,27 @@ public DataImportProcessManager(

public async Task ImportAsync(ImportProfile importProfile, Func<ImportProgressInfo, Task> progressCallback, CancellationToken token)
{
var maxErrorsCountThreshold = await _settingsManager.GetValueAsync(Core.ModuleConstants.Settings.General.MaxErrorsCountThreshold.Name, 50);
var maxErrorsCountThreshold = await _settingsManager.GetValueAsync<int>(Core.ModuleConstants.Settings.General.MaxErrorsCountThreshold);

// Create importer
var dataImporter = _dataImporterFactory.Create(importProfile.DataImporterType);

// Create remaining estimator
var remainingEstimatorType = await _settingsManager.GetValueAsync(Core.ModuleConstants.Settings.General.RemainingEstimator.Name, nameof(DefaultRemainingEstimator));
var remainingEstimatorType = await _settingsManager.GetValueAsync<string>(Core.ModuleConstants.Settings.General.RemainingEstimator);
var importRemainingEstimator = _importRemainingEstimatorFactory.Create(remainingEstimatorType);

// Create reporter
var defaultImportReporterType = await _settingsManager.GetValueAsync(Core.ModuleConstants.Settings.General.DefaultImportReporter.Name, nameof(DefaultDataReporter));
var defaultImportReporterType = await _settingsManager.GetValueAsync<string>(Core.ModuleConstants.Settings.General.DefaultImportReporter);
var importReporterType = !string.IsNullOrEmpty(importProfile.ImportReporterType) ? importProfile.ImportReporterType : defaultImportReporterType;
using var importReporter = _importReporterFactory.Create(importReporterType);
importReporter.SetContext(importProfile);

// Import progress
var importProgress = new ImportProgressInfo
{
Description = "Import has been started"
Description = "Import has been started",
};

var fixedSizeErrorsQueue = new FixedSizeQueue<ErrorInfo>(50);
// Import errors
var errorsCount = 0;
Expand All @@ -75,7 +77,7 @@ void ErrorCallback(ErrorInfo info)
var context = new ImportContext(importProfile)
{
ProgressInfo = importProgress,
ErrorCallback = ErrorCallback
ErrorCallback = ErrorCallback,
};

importRemainingEstimator.Start(context);
Expand All @@ -98,40 +100,55 @@ void ErrorCallback(ErrorInfo info)

await progressCallback(importProgress);

do
try
{
token.ThrowIfCancellationRequested();
do
{
token.ThrowIfCancellationRequested();

// Read items
var items = await reader.ReadNextPageAsync(context);
// Read items
var items = await reader.ReadNextPageAsync(context);

token.ThrowIfCancellationRequested();
token.ThrowIfCancellationRequested();

// Write items
await writer.WriteAsync(items, context);
// Write items
await writer.WriteAsync(items, context);

// Update processed count
importProgress.ProcessedCount += items.Length;
// Update processed count
importProgress.ProcessedCount += items.Length;

// Update remaining estimation
importRemainingEstimator.Update(context);
importRemainingEstimator.Estimate(context);
// Update remaining estimation
importRemainingEstimator.Update(context);
importRemainingEstimator.Estimate(context);

await progressCallback(importProgress);
await progressCallback(importProgress);

} while (reader.HasMoreResults && errorsCount < maxErrorsCountThreshold);
} while (reader.HasMoreResults && errorsCount < maxErrorsCountThreshold);
}
catch (Exception ex)
{
context.ErrorCallback?.Invoke(new ErrorInfo
{
ErrorLine = context.ProgressInfo?.ProcessedCount,
ErrorMessage = ex.ExpandExceptionMessage(),
});
throw;
}
finally
{
var errorReportResult = await importReporter.SaveErrorsAsync(fixedSizeErrorsQueue.GetTopValues().ToList());

var errorReportResult = await importReporter.SaveErrorsAsync(fixedSizeErrorsQueue.GetTopValues().ToList());
importRemainingEstimator.Stop(context);

importRemainingEstimator.Stop(context);
// Import finished
importProgress.Description = "Import has been finished";
importProgress.Finished = DateTime.UtcNow;
importProgress.ReportUrl = errorReportResult;

// Import finished
importProgress.Description = "Import has been finished";
importProgress.Finished = DateTime.UtcNow;
importProgress.ReportUrl = errorReportResult;
await dataImporter.OnImportCompletedAsync(context);
await dataImporter.OnImportCompletedAsync(context);

await progressCallback(importProgress);
await progressCallback(importProgress);
}
}
}
}
13 changes: 7 additions & 6 deletions src/VirtoCommerce.ImportModule.Data/Services/ImportRunService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ImportPushNotification RunImportBackgroundJob(ImportProfile importProfile
var pushNotification = new ImportPushNotification(_userNameResolver.GetCurrentUserName())
{
ProfileId = importProfile.Id,
ProfileName = importProfile.Name
ProfileName = importProfile.Name,
};

return RunImportBackgroundJob(importProfile, pushNotification);
Expand All @@ -91,16 +91,16 @@ public async Task<ImportPushNotification> RunImportAsync(ImportProfile importPro
var pushNotification = new ImportPushNotification(_userNameResolver.GetCurrentUserName())
{
ProfileId = importProfile.Id,
ProfileName = importProfile.Name
ProfileName = importProfile.Name,
};

return await RunImportAsync(importProfile, pushNotification, cancellationToken);
}

public async Task<ImportPushNotification> RunImportAsync(ImportProfile importProfile, ImportPushNotification pushNotification, CancellationToken cancellationToken)
{

var importRunHistory = ExType<ImportRunHistory>.New().CreateNew(importProfile, pushNotification);

async Task ProgressInfoCallback(ImportProgressInfo progressInfo)
{
pushNotification.Title = progressInfo.Description;
Expand All @@ -115,12 +115,13 @@ async Task ProgressInfoCallback(ImportProgressInfo progressInfo)
pushNotification.Errors = progressInfo.Errors;
pushNotification.ReportUrl = progressInfo.ReportUrl;

_pushNotificationManager.Send(pushNotification);
await _pushNotificationManager.SendAsync(pushNotification);

importRunHistory.UpdateProgress(pushNotification);
//Uncomment when needed
//await _importRunHistoryCrudService.SaveChangesAsync(new[] { importRunHistory });
}

try
{

Expand Down Expand Up @@ -153,7 +154,7 @@ async Task ProgressInfoCallback(ImportProgressInfo progressInfo)
var emailNotification = await _notificationSearchService.GetNotificationAsync<ImportCompletedEmailNotification>();
emailNotification.To = user.Email;
emailNotification.ImportRunHistory = importRunHistory;
if (user.MemberId != null)
if (!string.IsNullOrEmpty(user.MemberId))
{
emailNotification.Member = await _memberService.GetByIdAsync(user.MemberId);
}
Expand All @@ -174,7 +175,7 @@ public async Task<ImportDataPreview> PreviewAsync(ImportProfile importProfile)
var validationResult = await importer.ValidateAsync(context);
try
{
using var reader = importer.OpenReader(context);
using var reader = await importer.OpenReaderAsync(context);

var records = new List<object>();

Expand Down

0 comments on commit f52b451

Please sign in to comment.