Skip to content

Commit

Permalink
Only create pipes when pipes are enabled.
Browse files Browse the repository at this point in the history
Remove duplication between BlobStream and BlobSource.
Refactor the pipe code. Fixes #151
  • Loading branch information
duggaraju committed Aug 24, 2023
1 parent 345f9ab commit e1a1e7e
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 199 deletions.
17 changes: 17 additions & 0 deletions Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"profiles": {
"AMSMigrate": {
"commandName": "Project",
"commandLineArgs": "assets -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 --break-output-lease --keep-working-folder --enable-live-asset -g pohhsuTest -n pohhsumediaservice -f \"name eq 'asset-audio-leading-audio-discontinuity'\" --skip-migrated false -t \"$web/bug/${AssetName}\" -o amsencodermsitest"
},
"Docker": {
"commandName": "Docker",
"commandLineArgs": "analyze -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g provenance -n provenanceuswc"
},
"WSL": {
"commandName": "WSL2",
"commandLineArgs": "AMSMigrate.dll assets -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 --break-output-lease --keep-working-folder --enable-live-asset -g pohhsuTest -n pohhsumediaservice -f \"name eq 'asset-audio-leading-audio-discontinuity'\" --skip-migrated false -t \"$web/bug/${AssetName}\" -o amsencodermsitest",
"distributionName": ""
}
}
}
91 changes: 0 additions & 91 deletions pipes/BlobPipe.cs

This file was deleted.

81 changes: 42 additions & 39 deletions pipes/BlobSource.cs
Original file line number Diff line number Diff line change
@@ -1,77 +1,80 @@
using AMSMigrate.Decryption;

using AMSMigrate.Decryption;
using Azure.ResourceManager.Media.Models;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using FFMpegCore.Pipes;
using Microsoft.Extensions.Logging;

namespace AMSMigrate.Pipes
{
internal class BlobSource : IPipeSource
sealed class BlobSource : IPipeSource
{
private static readonly IReadOnlyDictionary<string, string> ExtensionToFormatMap = new Dictionary<string, string>
{
{ ".ts", "mpegts" },
{ ".vtt", "webvtt" }
};

private readonly BlockBlobClient _blobClient;
private readonly StorageEncryptedAssetDecryptionInfo? _decryptInfo;
private readonly ILogger _logger;
private readonly BlockBlobClient _blob;
private readonly StorageEncryptedAssetDecryptionInfo? _decryptInfo;

public BlobSource(BlockBlobClient blobClient, StorageEncryptedAssetDecryptionInfo? decryptInfo, ILogger logger)
public BlobSource(BlobContainerClient container, string blobName, StorageEncryptedAssetDecryptionInfo? decryptInfo, ILogger logger) :
this(container.GetBlockBlobClient(blobName), decryptInfo, logger)
{
_blobClient = blobClient;
_decryptInfo = decryptInfo;
_logger = logger;
}

public string GetStreamArguments()
public BlobSource(BlockBlobClient blob, StorageEncryptedAssetDecryptionInfo? decryptInfo, ILogger logger)
{
var extension = Path.GetExtension(_blobClient.Name);
if (!ExtensionToFormatMap.TryGetValue(extension, out var format))
{
format = "mp4"; // fallback to mp4.
}
return $"-f {format}";
_blob = blob;
_logger = logger;
_decryptInfo = decryptInfo;
}

public async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
public async Task DownloadAsync(Stream stream, CancellationToken cancellationToken)
{
_logger.LogDebug("Begin downloading track: {name}", _blobClient.Name);
var options = new BlobDownloadOptions
{
ProgressHandler = new Progress<long>(
progress =>
{
_logger.LogTrace("Download progress of blob {blob} is {progress}", _blobClient.Name, progress);
})
};
using BlobDownloadStreamingResult result = await _blobClient.DownloadStreamingAsync(options, cancellationToken);
try
_logger.LogDebug("Begin downloading {name}", _blob.Name);

using var aesTransform = AssetDecryptor.GetAesCtrTransform(_decryptInfo, _blob.Name, false);

if (aesTransform != null)
{
await result.Content.CopyToAsync(outputStream, cancellationToken);
_logger.LogDebug("Finished downloading track: {name}", _blobClient.Name);
await AssetDecryptor.DecryptTo(aesTransform, _blob, stream, cancellationToken);
}
catch (Exception ex)
else
{
_logger.LogError(ex, "Failed to download {blob}", _blobClient.Name);
throw;
await _blob.DownloadToAsync(stream, cancellationToken: cancellationToken);
}

_logger.LogDebug("Finished download of {name}", _blob.Name);
}

public async Task DownloadAsync(string filePath, CancellationToken cancellationToken)
{
using var aesTransform = AssetDecryptor.GetAesCtrTransform(_decryptInfo, _blobClient.Name, false);
using var stream = File.OpenWrite(filePath);
await DownloadAsync(stream, cancellationToken);
}

if (aesTransform != null)
{
await AssetDecryptor.DecryptTo(aesTransform, _blobClient, filePath, cancellationToken);
}
else
public async Task UploadAsync(Stream stream, CancellationToken cancellationToken)
{
BlobContentInfo info = await _blob.UploadAsync(stream, cancellationToken: cancellationToken);
}

public string GetStreamArguments()
{
var extension = Path.GetExtension(_blob.Name);
if (!ExtensionToFormatMap.TryGetValue(extension, out var format))
{
await _blobClient.DownloadToAsync(filePath, cancellationToken);
format = "mp4"; // fallback to mp4.
}
return $"-f {format}";
}

public async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
{
await DownloadAsync(outputStream, cancellationToken);
}
}
}
40 changes: 5 additions & 35 deletions pipes/MultiFileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
using Azure.ResourceManager.Media.Models;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using FFMpegCore.Pipes;
using Microsoft.Extensions.Logging;
using System.IO.Pipes;
using System.Text;

namespace AMSMigrate.Pipes
{
// A stream of media track that is spread across multiple files.
public class MultiFileStream
public class MultiFileStream : IPipeSource
{
private readonly BlobContainerClient _container;
private readonly ILogger _logger;
Expand Down Expand Up @@ -254,42 +254,12 @@ private async Task DownloadClearBlobContent(BlockBlobClient sourceBlob, Stream o

generatedStream.CopyTo(outputStream);
}
}

//Writes a multi file track to a pipe stream.
internal class MultiFilePipe : Pipe
{
private readonly MultiFileStream _multiFileStream;

public MultiFilePipe(
string filePath,
MultiFileStream multiFileStream)
: base(filePath, PipeDirection.Out)
{
_multiFileStream = multiFileStream;
}

public override async Task RunAsync(CancellationToken cancellationToken)
{
await RunAsync(async (stream, token) =>
{
await _multiFileStream.DownloadAsync(stream, token);
}, cancellationToken);
}

public override string GetStreamArguments()
{
return "-f mp4";
}
public string GetStreamArguments() => string.Empty;

public override async Task WriteAsync(Stream stream, CancellationToken cancellationToken)
{
await _multiFileStream.DownloadAsync(stream, cancellationToken);
}
public async Task DownloadAsync(string path, CancellationToken cancellationToken)
public async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
{
using var file = File.OpenWrite(path);
await _multiFileStream.DownloadAsync(file, cancellationToken);
await DownloadAsync(outputStream, cancellationToken);
}
}
}
43 changes: 13 additions & 30 deletions pipes/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@

namespace AMSMigrate.Pipes
{
public interface IPipe: IPipeSource, IPipeSink
{
string PipePath { get; }

Task RunAsync(CancellationToken cancellationToken);
}

/// <summary>
/// A class to abstract platform specific pipe.
/// </summary>
abstract class Pipe : IDisposable, IPipe
abstract class Pipe : IDisposable
{
protected readonly NamedPipeServerStream? _server;
protected readonly PipeDirection _direction;
Expand Down Expand Up @@ -44,9 +37,9 @@ public Pipe(string filePath, PipeDirection direction = PipeDirection.In)
}
}

private void CreatePipe(string filePath)
private static void CreatePipe(string filePath)
{
var startInfo = new ProcessStartInfo("mkfifo", PipePath)
var startInfo = new ProcessStartInfo("mkfifo", filePath)
{
RedirectStandardError = false,
RedirectStandardOutput = false,
Expand Down Expand Up @@ -102,31 +95,21 @@ public void Dispose()
}

public abstract Task RunAsync(CancellationToken cancellationToken);
}

public virtual string GetStreamArguments()
{
return "-seekable 0";
}

public virtual async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
{
await RunAsync(async (stream, cancellationToken) =>
{
await stream.CopyToAsync(outputStream, cancellationToken);
}, cancellationToken);
}

public async Task ReadAsync(Stream inputStream, CancellationToken cancellationToken)
class SourcePipe : Pipe
{
private readonly IPipeSource _pipeSource;
public SourcePipe(string filePath, IPipeSource source) : base(filePath)
{
await RunAsync(async (stream, cancellationToken) =>
{
await inputStream.CopyToAsync(stream, cancellationToken);
}, cancellationToken);
_pipeSource = source;
}

public string GetFormat()
public override async Task RunAsync(CancellationToken cancellationToken)
{
return string.Empty;
await RunAsync(
async (stream, token) => await _pipeSource.WriteAsync(stream, token),
cancellationToken);
}
}
}
Loading

0 comments on commit e1a1e7e

Please sign in to comment.