Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Melinda/storage #197

Merged
merged 6 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AnalysisOptionsBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected override AnalysisOptions GetBoundValue(BindingContext bindingContext)
bindingContext.ParseResult.GetValueForOption(_creationTimeStart),
bindingContext.ParseResult.GetValueForOption(_creationTimeEnd),
bindingContext.ParseResult.GetValueForOption(_filter),
bindingContext.ParseResult.GetValueForOption(_batchSize)
bindingContext.ParseResult.GetValueForOption(_batchSize)
);
}
}
Expand Down
9 changes: 4 additions & 5 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ This command forcefully removes all assets in the given account.
});

// disable storage migrate option until ready
/*

var storageOptionsBinder = new StorageOptionsBinder();
var storageCommand = storageOptionsBinder.GetCommand("storage", @"Directly migrate the assets from the storage account.
Doesn't require the Azure media services to be running.
Expand All @@ -100,11 +100,10 @@ amsmigrate storage -s <subscription id> -g <resource group> -n <source storage a
rootCommand.Add(storageCommand);
storageCommand.SetHandler(async context =>
{
var globalOptions = globalOptionsBinder.GetValue(context.BindingContext);
var storageOptions = storageOptionsBinder.GetValue(context.BindingContext);
await MigrateStorageAsync(globalOptions, storageOptions, context.GetCancellationToken());
var storageOptions = storageOptionsBinder.GetValue(context.BindingContext);
await MigrateStorageAsync(context, storageOptions, context.GetCancellationToken());
});
*/


// disable key migrate option until ready
/*
Expand Down
17 changes: 16 additions & 1 deletion StorageOptionsBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ This is specific to the cloud you are migrating to.
() => DefaultBatchSize,
description: @"Batch size for parallel processing.");

private readonly Option<bool> _encryptContent = new(
aliases: new[] { "-e", "--encrypt-content" },
() => false,
description: "Encrypt the content using CENC"
);

private readonly Option<Uri?> _keyVaultUri = new(
aliases: new[] { "--key-vault-uri" },
description: "The key vault to store encryption keys."
);

const int SegmentDurationInSeconds = 2;

public StorageOptionsBinder()
Expand Down Expand Up @@ -141,6 +152,8 @@ public Command GetCommand(string name, string description)
command.AddOption(_workingDirectory);
command.AddOption(_copyNonStreamable);
command.AddOption(_batchSize);
command.AddOption(_encryptContent);
command.AddOption(_keyVaultUri);
return command;
}

Expand All @@ -162,7 +175,9 @@ protected override StorageOptions GetBoundValue(BindingContext bindingContext)
bindingContext.ParseResult.GetValueForOption(_breakOutputLease),
bindingContext.ParseResult.GetValueForOption(_keepWorkingFolder),
SegmentDurationInSeconds,
bindingContext.ParseResult.GetValueForOption(_batchSize)
bindingContext.ParseResult.GetValueForOption(_batchSize),
bindingContext.ParseResult.GetValueForOption(_encryptContent),
bindingContext.ParseResult.GetValueForOption(_keyVaultUri)
);
}

Expand Down
201 changes: 147 additions & 54 deletions ams/AssetAnalyzer.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using AMSMigrate.Contracts;
using AMSMigrate.Transform;
using Azure;
using Azure.Core;
using Azure.ResourceManager.Media;
using Azure.ResourceManager.Media.Models;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;
using Spectre.Console;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Threading.Channels;

namespace AMSMigrate.Ams
Expand All @@ -30,29 +33,55 @@ public AssetAnalyzer(
_tracker = tracker;
}

private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobServiceClient storage, CancellationToken cancellationToken)
private async Task<AnalysisResult> AnalyzeAsync<T>(T item, BlobServiceClient storage, CancellationToken cancellationToken)
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
{
var result = new AnalysisResult(asset.Data.Name, MigrationStatus.NotMigrated);
_logger.LogDebug("Analyzing asset: {asset}, container: {container}", asset.Data.Name, asset.Data.Container);
string? assetName = null;
string? containerName = null;
BlobContainerClient? container = null;
AsyncPageable<MediaAssetStreamingLocator>? locators = null;
MediaAssetStorageEncryptionFormat? format = null;
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
if (item is MediaAssetResource mediaAsset)
{
assetName = mediaAsset.Data.Name;
containerName = mediaAsset.Data.Container;
format = mediaAsset.Data.StorageEncryptionFormat;
container = storage.GetContainer(mediaAsset);
locators = mediaAsset.GetStreamingLocatorsAsync();
}
else if (item is BlobContainerItem bcItem)
{
assetName = storage.AccountName;
container = storage.GetBlobContainerClient(bcItem.Name);
containerName = container.Name;
format = MediaAssetStorageEncryptionFormat.None;//todo StorageEncryptionFormat;
}
if (assetName == null || container == null)
{
_logger.LogDebug("Analyzing item or container is null.");
throw new ArgumentException();
}
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved

var result = new AnalysisResult(assetName, MigrationStatus.NotMigrated);
_logger.LogDebug("Analyzing asset: {asset}, container: {container}", assetName, containerName);
try
{
var container = storage.GetContainer(asset);

if (!await container.ExistsAsync(cancellationToken))
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
{
_logger.LogWarning("Container {name} missing for asset {asset}", container.Name, asset.Data.Name);
_logger.LogWarning("Container {name} missing for asset {asset}", container.Name, assetName);
result.Status = MigrationStatus.Failed;
return result;
}

// Get a list of LocatorIds if they exist.
var locators = asset.GetStreamingLocatorsAsync();

await foreach (var locator in locators)
if (locators != null)
{
if (locator.StreamingLocatorId != null && locator.StreamingLocatorId != Guid.Empty)
await foreach (var locator in locators!)
{
result.LocatorIds.Add(locator.StreamingLocatorId.Value.ToString("D"));
}
if (locator.StreamingLocatorId != null && locator.StreamingLocatorId != Guid.Empty)
{
result.LocatorIds.Add(locator.StreamingLocatorId.Value.ToString("D"));
}
}
}

// The asset container exists, try to check the metadata list first.
Expand All @@ -62,15 +91,15 @@ private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobSe
{
// Do further check only when the Status in Metadata is not Completed nor Failed.

if (asset.Data.StorageEncryptionFormat != MediaAssetStorageEncryptionFormat.None)
if (format != MediaAssetStorageEncryptionFormat.None)
{
_logger.LogWarning("Asset {name} is encrypted", asset.Data.Name);
_logger.LogWarning("Asset {name} is encrypted", assetName);

migrateResult.AssetType = AssetMigrationResult.AssetType_Encrypted;
}
else
{
var assetDetails = await container.GetDetailsAsync(_logger, cancellationToken, null, asset.Data.Name, false);
var assetDetails = await container.GetDetailsAsync(_logger, cancellationToken, null, assetName, false);

if (assetDetails.Manifest == null)
{
Expand All @@ -96,73 +125,137 @@ private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobSe
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to analyze asset {name}", asset.Data.Name);
_logger.LogError(ex, "Failed to analyze asset {name}", assetName);
result.Status = MigrationStatus.Failed;
}
_logger.LogDebug("Analyzed asset: {asset}, container: {container}, type: {type}, status: {status}", asset.Data.Name, asset.Data.Container, result.AssetType, result.Status);
_logger.LogDebug("Analyzed asset: {asset}, container: {container}, type: {type}, status: {status}", assetName, containerName, result.AssetType, result.Status);
return result;
}

public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var watch = Stopwatch.StartNew();
_logger.LogInformation("Begin analysis of assets for account: {name}", _analysisOptions.AccountName);
var account = await GetMediaAccountAsync(_analysisOptions.AccountName, cancellationToken);
double totalAssets = await QueryMetricAsync(account.Id.ToString(), "AssetCount", cancellationToken);
_logger.LogInformation("The total asset count of the media account is {count}.", totalAssets);
_logger.LogInformation("Begin analysis of items for account: {name}", _analysisOptions.AccountName);

var reportGenerator = new ReportGenerator(_globalOptions.HtmlReportFile, _globalOptions.JsonReportFile, _logger);
reportGenerator.WriteHeader();

var resourceFilter = GetAssetResourceFilter(_analysisOptions.ResourceFilter,
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
_analysisOptions.CreationTimeStart,
_analysisOptions.CreationTimeEnd);

var reportGenerator = new ReportGenerator(_globalOptions.HtmlReportFile, _globalOptions.JsonReportFile, _logger);
reportGenerator.WriteHeader();

await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
var assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, cancellationToken: cancellationToken);
var statistics = new AssetStats();
var assetTypes = new ConcurrentDictionary<string, int>();
bool isSrorageAcc = false;
try
{
var account = await GetMediaAccountAsync(_analysisOptions.AccountName, cancellationToken);
double totalAssets = await QueryMetricAsync(account.Id.ToString(), "AssetCount", cancellationToken);
_logger.LogInformation("The total asset count of the media account is {count}.", totalAssets);

List<MediaAssetResource>? filteredList = null;
await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
var assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, cancellationToken: cancellationToken);
statistics = new AssetStats();
assetTypes = new ConcurrentDictionary<string, int>();

if (resourceFilter != null)
{
// When a filter is used, it usually include a small list of assets,
// The total count of asset can be extracted in advance without much perf hit.
filteredList = await assets.ToListAsync(cancellationToken);
List<MediaAssetResource>? filteredList = null;

totalAssets = filteredList.Count;
}
if (resourceFilter != null)
{
// When a filter is used, it usually include a small list of assets,
// The total count of asset can be extracted in advance without much perf hit.
filteredList = await assets.ToListAsync(cancellationToken);

totalAssets = filteredList.Count;
}

_logger.LogInformation("The total assets to handle in this run is {count}.", totalAssets);
_logger.LogInformation("The total assets to handle in this run is {count}.", totalAssets);

var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Assets", "Assets", totalAssets, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Assets", "Assets", totalAssets, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(asset, storage, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of assets for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);
}
catch (Exception ex)
{
if (ex.Message.Contains("was not found"))//ex.Data is SomeExceptionType someException && someException.ErrorCode == "ResourceNotFound")
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
{
isSrorageAcc = true;
}
else
{
_logger.LogError("Asset analysis failed with the following error: {0}", ex.Message);
}
}

if (isSrorageAcc)
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(asset, storage, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of assets for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);
try{
var (storageClient, accountId) = await _resourceProvider.GetStorageAccount(_analysisOptions.AccountName, cancellationToken);

double totalItems = await GetStorageBlobMetricAsync(accountId, cancellationToken);
var containers = storageClient.GetBlobContainersAsync(
prefix: _analysisOptions.ResourceFilter, cancellationToken: cancellationToken);
_logger.LogInformation("The total contianers count of the storage account is {count}.", totalItems);
List<BlobContainerItem>? filteredList = null;

if (_analysisOptions.ResourceFilter != null)
{
filteredList = await containers.ToListAsync();
totalItems = filteredList.Count;
}
_logger.LogInformation("The total containers to handle in this run is {count}.", totalItems);
var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Containers", "Assets", totalItems, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(containers, filteredList, async (container, cancellationToken) =>
{
// var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(container, storageClient, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator?.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of containers for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);
}

catch (Exception ex)
{
_logger.LogError("Asset analysis failed with the following error: {0}", ex.Message);
}
}
WriteSummary(statistics, assetTypes);
WriteDetails(assetTypes);

reportGenerator.WriteTrailer();
reportGenerator.Dispose();

}


private void WriteSummary(AssetStats statistics, IDictionary<string, int> assetTypes)
{
var table = new Table()
Expand Down
11 changes: 10 additions & 1 deletion ams/AssetMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ public AssetMigrator(
public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var watch = Stopwatch.StartNew();
var account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
MediaServicesAccountResource? account = null;
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
try
{
account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
}
catch (Exception)
{
_logger.LogError("No valid media account was found.");
throw new Exception("No valid media account was found.");
}
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
_logger.LogInformation("Begin migration of assets for account: {name}", account.Data.Name);
var totalAssets = await QueryMetricAsync(
account.Id.ToString(),
Expand Down
Loading
Loading