Skip to content

Commit

Permalink
Melinda/storage (#197)
Browse files Browse the repository at this point in the history
* save progress

* save progress

* updates

* address comments: remove -isSrotageAcc, rename Itemname to asserName, remove -prefix

* address comments
  • Loading branch information
melindawangmsft authored Sep 27, 2023
1 parent 7054e4f commit 1995999
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 71 deletions.
2 changes: 1 addition & 1 deletion AnalysisOptionsBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected override AnalysisOptions GetBoundValue(BindingContext bindingContext)
bindingContext.ParseResult.GetValueForOption(_creationTimeStart),
bindingContext.ParseResult.GetValueForOption(_creationTimeEnd),
bindingContext.ParseResult.GetValueForOption(_filter),
bindingContext.ParseResult.GetValueForOption(_batchSize)
bindingContext.ParseResult.GetValueForOption(_batchSize)
);
}
}
Expand Down
9 changes: 4 additions & 5 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ This command forcefully removes all assets in the given account.
});

// disable storage migrate option until ready
/*

var storageOptionsBinder = new StorageOptionsBinder();
var storageCommand = storageOptionsBinder.GetCommand("storage", @"Directly migrate the assets from the storage account.
Doesn't require the Azure media services to be running.
Expand All @@ -100,11 +100,10 @@ amsmigrate storage -s <subscription id> -g <resource group> -n <source storage a
rootCommand.Add(storageCommand);
storageCommand.SetHandler(async context =>
{
var globalOptions = globalOptionsBinder.GetValue(context.BindingContext);
var storageOptions = storageOptionsBinder.GetValue(context.BindingContext);
await MigrateStorageAsync(globalOptions, storageOptions, context.GetCancellationToken());
var storageOptions = storageOptionsBinder.GetValue(context.BindingContext);
await MigrateStorageAsync(context, storageOptions, context.GetCancellationToken());
});
*/


// disable key migrate option until ready
/*
Expand Down
17 changes: 16 additions & 1 deletion StorageOptionsBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ This is specific to the cloud you are migrating to.
() => DefaultBatchSize,
description: @"Batch size for parallel processing.");

private readonly Option<bool> _encryptContent = new(
aliases: new[] { "-e", "--encrypt-content" },
() => false,
description: "Encrypt the content using CENC"
);

private readonly Option<Uri?> _keyVaultUri = new(
aliases: new[] { "--key-vault-uri" },
description: "The key vault to store encryption keys."
);

const int SegmentDurationInSeconds = 2;

public StorageOptionsBinder()
Expand Down Expand Up @@ -141,6 +152,8 @@ public Command GetCommand(string name, string description)
command.AddOption(_workingDirectory);
command.AddOption(_copyNonStreamable);
command.AddOption(_batchSize);
command.AddOption(_encryptContent);
command.AddOption(_keyVaultUri);
return command;
}

Expand All @@ -162,7 +175,9 @@ protected override StorageOptions GetBoundValue(BindingContext bindingContext)
bindingContext.ParseResult.GetValueForOption(_breakOutputLease),
bindingContext.ParseResult.GetValueForOption(_keepWorkingFolder),
SegmentDurationInSeconds,
bindingContext.ParseResult.GetValueForOption(_batchSize)
bindingContext.ParseResult.GetValueForOption(_batchSize),
bindingContext.ParseResult.GetValueForOption(_encryptContent),
bindingContext.ParseResult.GetValueForOption(_keyVaultUri)
);
}

Expand Down
201 changes: 143 additions & 58 deletions ams/AssetAnalyzer.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using AMSMigrate.Contracts;
using AMSMigrate.Transform;
using Azure;
using Azure.Core;
using Azure.ResourceManager.Media;
using Azure.ResourceManager.Media.Models;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;
using Spectre.Console;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Threading.Channels;

namespace AMSMigrate.Ams
Expand All @@ -30,29 +33,56 @@ public AssetAnalyzer(
_tracker = tracker;
}

private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobServiceClient storage, CancellationToken cancellationToken)
private async Task<AnalysisResult> AnalyzeAsync<T>(T item, BlobServiceClient storage, CancellationToken cancellationToken)
{
var result = new AnalysisResult(asset.Data.Name, MigrationStatus.NotMigrated);
_logger.LogDebug("Analyzing asset: {asset}, container: {container}", asset.Data.Name, asset.Data.Container);
string? assetName = null;
string? containerName = null;
BlobContainerClient? container = null;
AsyncPageable<MediaAssetStreamingLocator>? locators = null;
MediaAssetStorageEncryptionFormat? format = null;
bool isForAmsAccount = false;
if (item is MediaAssetResource mediaAsset)
{
assetName = mediaAsset.Data.Name;
containerName = mediaAsset.Data.Container;
format = mediaAsset.Data.StorageEncryptionFormat;
container = storage.GetContainer(mediaAsset);
locators = mediaAsset.GetStreamingLocatorsAsync();
isForAmsAccount = true;
}
else if (item is BlobContainerItem bcItem)
{
assetName = storage.AccountName;
container = storage.GetBlobContainerClient(bcItem.Name);
containerName = container.Name;
format = MediaAssetStorageEncryptionFormat.None;
}
else
{
throw new ArgumentException("item type is not supported.");
}

var result = new AnalysisResult(assetName, MigrationStatus.NotMigrated);
_logger.LogDebug("Analyzing asset: {asset}, container: {container}", assetName, containerName);
try
{
var container = storage.GetContainer(asset);
if (!await container.ExistsAsync(cancellationToken))

if (isForAmsAccount && !await container.ExistsAsync(cancellationToken))
{
_logger.LogWarning("Container {name} missing for asset {asset}", container.Name, asset.Data.Name);
_logger.LogWarning("Container {name} missing for asset {asset}", container.Name, assetName);
result.Status = MigrationStatus.Failed;
return result;
}

// Get a list of LocatorIds if they exist.
var locators = asset.GetStreamingLocatorsAsync();

await foreach (var locator in locators)
if (locators != null)
{
if (locator.StreamingLocatorId != null && locator.StreamingLocatorId != Guid.Empty)
await foreach (var locator in locators!)
{
result.LocatorIds.Add(locator.StreamingLocatorId.Value.ToString("D"));
}
if (locator.StreamingLocatorId != null && locator.StreamingLocatorId != Guid.Empty)
{
result.LocatorIds.Add(locator.StreamingLocatorId.Value.ToString("D"));
}
}
}

// The asset container exists, try to check the metadata list first.
Expand All @@ -62,15 +92,15 @@ private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobSe
{
// Do further check only when the Status in Metadata is not Completed nor Failed.

if (asset.Data.StorageEncryptionFormat != MediaAssetStorageEncryptionFormat.None)
if (format != MediaAssetStorageEncryptionFormat.None)
{
_logger.LogWarning("Asset {name} is encrypted", asset.Data.Name);
_logger.LogWarning("Asset {name} is encrypted", assetName);

migrateResult.AssetType = AssetMigrationResult.AssetType_Encrypted;
}
else
{
var assetDetails = await container.GetDetailsAsync(_logger, cancellationToken, null, asset.Data.Name, false);
var assetDetails = await container.GetDetailsAsync(_logger, cancellationToken, null, assetName, false);

if (assetDetails.Manifest == null)
{
Expand All @@ -96,73 +126,128 @@ private async Task<AnalysisResult> AnalyzeAsync(MediaAssetResource asset, BlobSe
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to analyze asset {name}", asset.Data.Name);
_logger.LogError(ex, "Failed to analyze asset {name}", assetName);
result.Status = MigrationStatus.Failed;
}
_logger.LogDebug("Analyzed asset: {asset}, container: {container}, type: {type}, status: {status}", asset.Data.Name, asset.Data.Container, result.AssetType, result.Status);
_logger.LogDebug("Analyzed asset: {asset}, container: {container}, type: {type}, status: {status}", assetName, containerName, result.AssetType, result.Status);
return result;
}

public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var watch = Stopwatch.StartNew();
_logger.LogInformation("Begin analysis of assets for account: {name}", _analysisOptions.AccountName);
var account = await GetMediaAccountAsync(_analysisOptions.AccountName, cancellationToken);
double totalAssets = await QueryMetricAsync(account.Id.ToString(), "AssetCount", cancellationToken);
_logger.LogInformation("The total asset count of the media account is {count}.", totalAssets);

var resourceFilter = GetAssetResourceFilter(_analysisOptions.ResourceFilter,
_analysisOptions.CreationTimeStart,
_analysisOptions.CreationTimeEnd);

_logger.LogInformation("Begin analysis of items for account: {name}", _analysisOptions.AccountName);
bool isSrorageAcc = false;
MediaServicesAccountResource? account = null;
try
{
account = await GetMediaAccountAsync(_analysisOptions.AccountName, cancellationToken);
}
catch (RequestFailedException ex)
{
if (ex.ErrorCode != null && ex.ErrorCode.Equals("ResourceNotFound"))
{
isSrorageAcc = true;
}
}
var reportGenerator = new ReportGenerator(_globalOptions.HtmlReportFile, _globalOptions.JsonReportFile, _logger);
reportGenerator.WriteHeader();

await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
var assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, cancellationToken: cancellationToken);
var statistics = new AssetStats();
var assetTypes = new ConcurrentDictionary<string, int>();
if (isSrorageAcc)
{
var (storageClient, accountId) = await _resourceProvider.GetStorageAccount(_analysisOptions.AccountName, cancellationToken);

List<MediaAssetResource>? filteredList = null;
double totalItems = await GetStorageBlobMetricAsync(accountId, cancellationToken);
var containers = storageClient.GetBlobContainersAsync(
prefix: _analysisOptions.ResourceFilter, cancellationToken: cancellationToken);
_logger.LogInformation("The total contianers count of the storage account is {count}.", totalItems);
List<BlobContainerItem>? filteredList = null;

if (resourceFilter != null)
{
// When a filter is used, it usually include a small list of assets,
// The total count of asset can be extracted in advance without much perf hit.
filteredList = await assets.ToListAsync(cancellationToken);
if (_analysisOptions.ResourceFilter != null)
{
filteredList = await containers.ToListAsync();
totalItems = filteredList.Count;
}
_logger.LogInformation("The total containers to handle in this run is {count}.", totalItems);
var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Containers", "Assets", totalItems, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(containers, filteredList, async (container, cancellationToken) =>
{
// var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(container, storageClient, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator?.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of containers for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);

totalAssets = filteredList.Count;
}
else if (account != null)
{
var resourceFilter = GetAssetResourceFilter(_analysisOptions.ResourceFilter,
_analysisOptions.CreationTimeStart,
_analysisOptions.CreationTimeEnd);

_logger.LogInformation("The total assets to handle in this run is {count}.", totalAssets);
double totalAssets = await QueryMetricAsync(account.Id.ToString(), "AssetCount", cancellationToken);
_logger.LogInformation("The total asset count of the media account is {count}.", totalAssets);

var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Assets", "Assets", totalAssets, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(asset, storage, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of assets for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);
await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
var assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, cancellationToken: cancellationToken);
statistics = new AssetStats();
assetTypes = new ConcurrentDictionary<string, int>();

List<MediaAssetResource>? filteredList = null;

if (resourceFilter != null)
{
// When a filter is used, it usually include a small list of assets,
// The total count of asset can be extracted in advance without much perf hit.
filteredList = await assets.ToListAsync(cancellationToken);

totalAssets = filteredList.Count;
}

_logger.LogInformation("The total assets to handle in this run is {count}.", totalAssets);

var channel = Channel.CreateBounded<double>(1);
var progress = ShowProgressAsync("Analyzing Assets", "Assets", totalAssets, channel.Reader, cancellationToken);
var writer = channel.Writer;
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(asset, storage, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
reportGenerator.WriteRecord(result);
statistics.Update(result);
await writer.WriteAsync(statistics.Total, cancellationToken);
},
_analysisOptions.BatchSize,
cancellationToken);

writer.Complete();
await progress;
_logger.LogDebug("Finished analysis of assets for account: {name}. Time taken {elapsed}", _analysisOptions.AccountName, watch.Elapsed);
}
WriteSummary(statistics, assetTypes);
WriteDetails(assetTypes);

reportGenerator.WriteTrailer();
reportGenerator.Dispose();

}


private void WriteSummary(AssetStats statistics, IDictionary<string, int> assetTypes)
{
var table = new Table()
Expand Down
3 changes: 2 additions & 1 deletion ams/AssetMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public AssetMigrator(
public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var watch = Stopwatch.StartNew();
var account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
MediaServicesAccountResource? account = null;
account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
_logger.LogInformation("Begin migration of assets for account: {name}", account.Data.Name);
var totalAssets = await QueryMetricAsync(
account.Id.ToString(),
Expand Down
11 changes: 10 additions & 1 deletion ams/CleanupCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,16 @@ public CleanupCommand(GlobalOptions globalOptions,

public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
MediaServicesAccountResource? account = null;
try
{
account = await GetMediaAccountAsync(_options.AccountName, cancellationToken);
}
catch (Exception)
{
_logger.LogError("No valid media account was found.");
throw new Exception("No valid media account was found.");
}
_logger.LogInformation("Begin cleaning up on account: {name}", account.Data.Name);

if (_options.IsCleanUpAccount)
Expand Down
Loading

0 comments on commit 1995999

Please sign in to comment.