Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Melinda/storage #197

Merged
merged 6 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AnalysisOptionsBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected override AnalysisOptions GetBoundValue(BindingContext bindingContext)
bindingContext.ParseResult.GetValueForOption(_creationTimeStart),
bindingContext.ParseResult.GetValueForOption(_creationTimeEnd),
bindingContext.ParseResult.GetValueForOption(_filter),
bindingContext.ParseResult.GetValueForOption(_batchSize)
bindingContext.ParseResult.GetValueForOption(_batchSize)
);
}
}
Expand Down
9 changes: 4 additions & 5 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ This command forcefully removes all assets in the given account.
});

// disable storage migrate option until ready
/*

var storageOptionsBinder = new StorageOptionsBinder();
var storageCommand = storageOptionsBinder.GetCommand("storage", @"Directly migrate the assets from the storage account.
Doesn't require the Azure media services to be running.
Expand All @@ -100,11 +100,10 @@ amsmigrate storage -s <subscription id> -g <resource group> -n <source storage a
rootCommand.Add(storageCommand);
storageCommand.SetHandler(async context =>
{
var globalOptions = globalOptionsBinder.GetValue(context.BindingContext);
var storageOptions = storageOptionsBinder.GetValue(context.BindingContext);
await MigrateStorageAsync(globalOptions, storageOptions, context.GetCancellationToken());
var storageOptions = storageOptionsBinder.GetValue(context.BindingContext);
await MigrateStorageAsync(context, storageOptions, context.GetCancellationToken());
});
*/


// disable key migrate option until ready
/*
Expand Down
17 changes: 16 additions & 1 deletion StorageOptionsBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ This is specific to the cloud you are migrating to.
() => DefaultBatchSize,
description: @"Batch size for parallel processing.");

private readonly Option<bool> _encryptContent = new(
aliases: new[] { "-e", "--encrypt-content" },
() => false,
description: "Encrypt the content using CENC"
);

private readonly Option<Uri?> _keyVaultUri = new(
aliases: new[] { "--key-vault-uri" },
description: "The key vault to store encryption keys."
);

const int SegmentDurationInSeconds = 2;

public StorageOptionsBinder()
Expand Down Expand Up @@ -141,6 +152,8 @@ public Command GetCommand(string name, string description)
command.AddOption(_workingDirectory);
command.AddOption(_copyNonStreamable);
command.AddOption(_batchSize);
command.AddOption(_encryptContent);
command.AddOption(_keyVaultUri);
return command;
}

Expand All @@ -162,7 +175,9 @@ protected override StorageOptions GetBoundValue(BindingContext bindingContext)
bindingContext.ParseResult.GetValueForOption(_breakOutputLease),
bindingContext.ParseResult.GetValueForOption(_keepWorkingFolder),
SegmentDurationInSeconds,
bindingContext.ParseResult.GetValueForOption(_batchSize)
bindingContext.ParseResult.GetValueForOption(_batchSize),
bindingContext.ParseResult.GetValueForOption(_encryptContent),
bindingContext.ParseResult.GetValueForOption(_keyVaultUri)
);
}

Expand Down
201 changes: 143 additions & 58 deletions ams/AssetAnalyzer.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using AMSMigrate.Contracts;
using AMSMigrate.Transform;
using Azure;
using Azure.Core;
using Azure.ResourceManager.Media;
using Azure.ResourceManager.Media.Models;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;
using Spectre.Console;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Threading.Channels;

namespace AMSMigrate.Ams
Expand All @@ -30,29 +33,56 @@ public AssetAnalyzer(
_tracker = tracker;
}

private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobServiceClient storage, CancellationToken cancellationToken)
private async Task<AnalysisResult> AnalyzeAsync<T>(T item, BlobServiceClient storage, CancellationToken cancellationToken)
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
{
var result = new AnalysisResult(asset.Data.Name, MigrationStatus.NotMigrated);
_logger.LogDebug("Analyzing asset: {asset}, container: {container}", asset.Data.Name, asset.Data.Container);
string? assetName = null;
string? containerName = null;
BlobContainerClient? container = null;
AsyncPageable<MediaAssetStreamingLocator>? locators = null;
MediaAssetStorageEncryptionFormat? format = null;
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
bool isForAmsAccount = false;
if (item is MediaAssetResource mediaAsset)
{
assetName = mediaAsset.Data.Name;
containerName = mediaAsset.Data.Container;
format = mediaAsset.Data.StorageEncryptionFormat;
container = storage.GetContainer(mediaAsset);
locators = mediaAsset.GetStreamingLocatorsAsync();
isForAmsAccount = true;
}
else if (item is BlobContainerItem bcItem)
{
assetName = storage.AccountName;
container = storage.GetBlobContainerClient(bcItem.Name);
containerName = container.Name;
format = MediaAssetStorageEncryptionFormat.None;
}
else
{
throw new ArgumentException("item type is not supported.");
}

var result = new AnalysisResult(assetName, MigrationStatus.NotMigrated);
_logger.LogDebug("Analyzing asset: {asset}, container: {container}", assetName, containerName);
try
{
var container = storage.GetContainer(asset);
if (!await container.ExistsAsync(cancellationToken))

if (isForAmsAccount && !await container.ExistsAsync(cancellationToken))
{
_logger.LogWarning("Container {name} missing for asset {asset}", container.Name, asset.Data.Name);
_logger.LogWarning("Container {name} missing for asset {asset}", container.Name, assetName);
result.Status = MigrationStatus.Failed;
return result;
}

// Get a list of LocatorIds if they exist.
var locators = asset.GetStreamingLocatorsAsync();

await foreach (var locator in locators)
if (locators != null)
{
if (locator.StreamingLocatorId != null && locator.StreamingLocatorId != Guid.Empty)
await foreach (var locator in locators!)
{
result.LocatorIds.Add(locator.StreamingLocatorId.Value.ToString("D"));
}
if (locator.StreamingLocatorId != null && locator.StreamingLocatorId != Guid.Empty)
{
result.LocatorIds.Add(locator.StreamingLocatorId.Value.ToString("D"));
}
}
}

// The asset container exists, try to check the metadata list first.
Expand All @@ -62,15 +92,15 @@ private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobSe
{
// Do further check only when the Status in Metadata is not Completed nor Failed.

if (asset.Data.StorageEncryptionFormat != MediaAssetStorageEncryptionFormat.None)
if (format != MediaAssetStorageEncryptionFormat.None)
{
_logger.LogWarning("Asset {name} is encrypted", asset.Data.Name);
_logger.LogWarning("Asset {name} is encrypted", assetName);

migrateResult.AssetType = AssetMigrationResult.AssetType_Encrypted;
}
else
{
var assetDetails = await container.GetDetailsAsync(_logger, cancellationToken, null, asset.Data.Name, false);
var assetDetails = await container.GetDetailsAsync(_logger, cancellationToken, null, assetName, false);

if (assetDetails.Manifest == null)
{
Expand All @@ -96,73 +126,128 @@ private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobSe
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to analyze asset {name}", asset.Data.Name);
_logger.LogError(ex, "Failed to analyze asset {name}", assetName);
result.Status = MigrationStatus.Failed;
}
_logger.LogDebug("Analyzed asset: {asset}, container: {container}, type: {type}, status: {status}", asset.Data.Name, asset.Data.Container, result.AssetType, result.Status);
_logger.LogDebug("Analyzed asset: {asset}, container: {container}, type: {type}, status: {status}", assetName, containerName, result.AssetType, result.Status);
return result;
}

public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var watch = Stopwatch.StartNew();
_logger.LogInformation("Begin analysis of assets for account: {name}", _analysisOptions.AccountName);
var account = await GetMediaAccountAsync(_analysisOptions.AccountName, cancellationToken);
double totalAssets = await QueryMetricAsync(account.Id.ToString(), "AssetCount", cancellationToken);
_logger.LogInformation("The total asset count of the media account is {count}.", totalAssets);

var resourceFilter = GetAssetResourceFilter(_analysisOptions.ResourceFilter,
_analysisOptions.CreationTimeStart,
_analysisOptions.CreationTimeEnd);

_logger.LogInformation("Begin analysis of items for account: {name}", _analysisOptions.AccountName);
bool isSrorageAcc = false;
MediaServicesAccountResource? account = null;
try
{
account = await GetMediaAccountAsync(_analysisOptions.AccountName, cancellationToken);
}
catch (RequestFailedException ex)
{
if (ex.ErrorCode != null && ex.ErrorCode.Equals("ResourceNotFound"))
{
isSrorageAcc = true;
}
}
var reportGenerator = new ReportGenerator(_globalOptions.HtmlReportFile, _globalOptions.JsonReportFile, _logger);
reportGenerator.WriteHeader();

await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
var assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, cancellationToken: cancellationToken);
var statistics = new AssetStats();
var assetTypes = new ConcurrentDictionary<string, int>();
if (isSrorageAcc)
{
var (storageClient, accountId) = await _resourceProvider.GetStorageAccount(_analysisOptions.AccountName, cancellationToken);

List<MediaAssetResource>? filteredList = null;
double totalItems = await GetStorageBlobMetricAsync(accountId, cancellationToken);
var containers = storageClient.GetBlobContainersAsync(
prefix: _analysisOptions.ResourceFilter, cancellationToken: cancellationToken);
_logger.LogInformation("The total contianers count of the storage account is {count}.", totalItems);
List<BlobContainerItem>? filteredList = null;

if (resourceFilter != null)
{
// When a filter is used, it usually include a small list of assets,
// The total count of asset can be extracted in advance without much perf hit.
filteredList = await assets.ToListAsync(cancellationToken);
if (_analysisOptions.ResourceFilter != null)
{
filteredList = await containers.ToListAsync();
totalItems = filteredList.Count;
}
_logger.LogInformation("The total containers to handle in this run is {count}.", totalItems);
var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Containers", "Assets", totalItems, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(containers, filteredList, async (container, cancellationToken) =>
{
// var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(container, storageClient, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator?.WriteRecord(result);
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of containers for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);

totalAssets = filteredList.Count;
}
else if (account != null)
{
var resourceFilter = GetAssetResourceFilter(_analysisOptions.ResourceFilter,
_analysisOptions.CreationTimeStart,
_analysisOptions.CreationTimeEnd);

_logger.LogInformation("The total assets to handle in this run is {count}.", totalAssets);
double totalAssets = await QueryMetricAsync(account.Id.ToString(), "AssetCount", cancellationToken);
_logger.LogInformation("The total asset count of the media account is {count}.", totalAssets);

var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Assets", "Assets", totalAssets, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(asset, storage, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of assets for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);
await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
var assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, cancellationToken: cancellationToken);
statistics = new AssetStats();
assetTypes = new ConcurrentDictionary<string, int>();

List<MediaAssetResource>? filteredList = null;

if (resourceFilter != null)
{
// When a filter is used, it usually include a small list of assets,
// The total count of asset can be extracted in advance without much perf hit.
filteredList = await assets.ToListAsync(cancellationToken);

totalAssets = filteredList.Count;
}

_logger.LogInformation("The total assets to handle in this run is {count}.", totalAssets);

var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Assets", "Assets", totalAssets, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(asset, storage, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of assets for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);
}
WriteSummary(statistics, assetTypes);
WriteDetails(assetTypes);

reportGenerator.WriteTrailer();
reportGenerator.Dispose();

}


private void WriteSummary(AssetStats statistics, IDictionary<string, int> assetTypes)
{
var table = new Table()
Expand Down
3 changes: 2 additions & 1 deletion ams/AssetMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public AssetMigrator(
public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var watch = Stopwatch.StartNew();
var account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
MediaServicesAccountResource? account = null;
melindawangmsft marked this conversation as resolved.
Show resolved Hide resolved
account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
_logger.LogInformation("Begin migration of assets for account: {name}", account.Data.Name);
var totalAssets = await QueryMetricAsync(
account.Id.ToString(),
Expand Down
11 changes: 10 additions & 1 deletion ams/CleanupCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,16 @@ public CleanupCommand(GlobalOptions globalOptions,

public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
MediaServicesAccountResource? account = null;
try
{
account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
}
catch (Exception)
{
_logger.LogError("No valid media account was found.");
throw new Exception("No valid media account was found.");
}
_logger.LogInformation("Begin cleaning up on account: {name}", account.Data.Name);

if (_options.IsCleanUpAccount)
Expand Down
Loading
Loading