Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only create pipes when pipes are enabled. #154

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"profiles": {
"AMSMigrate": {
"commandName": "Project",
"commandLineArgs": "assets -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 --break-output-lease --keep-working-folder --enable-live-asset -g pohhsuTest -n pohhsumediaservice -f \"name eq 'asset-audio-leading-audio-discontinuity'\" --skip-migrated false -t \"$web/bug/${AssetName}\" -o amsencodermsitest"
},
"Docker": {
"commandName": "Docker",
"commandLineArgs": "analyze -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g provenance -n provenanceuswc"
},
"WSL": {
"commandName": "WSL2",
"commandLineArgs": "AMSMigrate.dll assets -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 --break-output-lease --keep-working-folder --enable-live-asset -g pohhsuTest -n pohhsumediaservice -f \"name eq 'asset-audio-leading-audio-discontinuity'\" --skip-migrated false -t \"$web/bug/${AssetName}\" -o amsencodermsitest",
"distributionName": ""
}
}
}
91 changes: 0 additions & 91 deletions pipes/BlobPipe.cs

This file was deleted.

81 changes: 42 additions & 39 deletions pipes/BlobSource.cs
Original file line number Diff line number Diff line change
@@ -1,77 +1,80 @@
using AMSMigrate.Decryption;

using AMSMigrate.Decryption;
using Azure.ResourceManager.Media.Models;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using FFMpegCore.Pipes;
using Microsoft.Extensions.Logging;

namespace AMSMigrate.Pipes
{
internal class BlobSource : IPipeSource
sealed class BlobSource : IPipeSource
{
private static readonly IReadOnlyDictionary<string, string> ExtensionToFormatMap = new Dictionary<string, string>
{
{ ".ts", "mpegts" },
{ ".vtt", "webvtt" }
};

private readonly BlockBlobClient _blobClient;
private readonly StorageEncryptedAssetDecryptionInfo? _decryptInfo;
private readonly ILogger _logger;
private readonly BlockBlobClient _blob;
private readonly StorageEncryptedAssetDecryptionInfo? _decryptInfo;

public BlobSource(BlockBlobClient blobClient, StorageEncryptedAssetDecryptionInfo? decryptInfo, ILogger logger)
public BlobSource(BlobContainerClient container, string blobName, StorageEncryptedAssetDecryptionInfo? decryptInfo, ILogger logger) :
this(container.GetBlockBlobClient(blobName), decryptInfo, logger)
{
_blobClient = blobClient;
_decryptInfo = decryptInfo;
_logger = logger;
}

public string GetStreamArguments()
public BlobSource(BlockBlobClient blob, StorageEncryptedAssetDecryptionInfo? decryptInfo, ILogger logger)
{
var extension = Path.GetExtension(_blobClient.Name);
if (!ExtensionToFormatMap.TryGetValue(extension, out var format))
{
format = "mp4"; // fallback to mp4.
}
return $"-f {format}";
_blob = blob;
_logger = logger;
_decryptInfo = decryptInfo;
}

public async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
public async Task DownloadAsync(Stream stream, CancellationToken cancellationToken)
{
_logger.LogDebug("Begin downloading track: {name}", _blobClient.Name);
var options = new BlobDownloadOptions
{
ProgressHandler = new Progress<long>(
progress =>
{
_logger.LogTrace("Download progress of blob {blob} is {progress}", _blobClient.Name, progress);
})
};
using BlobDownloadStreamingResult result = await _blobClient.DownloadStreamingAsync(options, cancellationToken);
try
_logger.LogDebug("Begin downloading {name}", _blob.Name);

using var aesTransform = AssetDecryptor.GetAesCtrTransform(_decryptInfo, _blob.Name, false);

if (aesTransform != null)
{
await result.Content.CopyToAsync(outputStream, cancellationToken);
_logger.LogDebug("Finished downloading track: {name}", _blobClient.Name);
await AssetDecryptor.DecryptTo(aesTransform, _blob, stream, cancellationToken);
}
catch (Exception ex)
else
{
_logger.LogError(ex, "Failed to download {blob}", _blobClient.Name);
throw;
await _blob.DownloadToAsync(stream, cancellationToken: cancellationToken);
}

_logger.LogDebug("Finished download of {name}", _blob.Name);
}

public async Task DownloadAsync(string filePath, CancellationToken cancellationToken)
{
using var aesTransform = AssetDecryptor.GetAesCtrTransform(_decryptInfo, _blobClient.Name, false);
using var stream = File.OpenWrite(filePath);
await DownloadAsync(stream, cancellationToken);
}

if (aesTransform != null)
{
await AssetDecryptor.DecryptTo(aesTransform, _blobClient, filePath, cancellationToken);
}
else
public async Task UploadAsync(Stream stream, CancellationToken cancellationToken)
{
BlobContentInfo info = await _blob.UploadAsync(stream, cancellationToken: cancellationToken);
}

public string GetStreamArguments()
{
var extension = Path.GetExtension(_blob.Name);
if (!ExtensionToFormatMap.TryGetValue(extension, out var format))
{
await _blobClient.DownloadToAsync(filePath, cancellationToken);
format = "mp4"; // fallback to mp4.
}
return $"-f {format}";
}

public async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
{
await DownloadAsync(outputStream, cancellationToken);
}
}
}
40 changes: 5 additions & 35 deletions pipes/MultiFileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
using Azure.ResourceManager.Media.Models;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using FFMpegCore.Pipes;
using Microsoft.Extensions.Logging;
using System.IO.Pipes;
using System.Text;

namespace AMSMigrate.Pipes
{
// A stream of media track that is spread across multiple files.
public class MultiFileStream
public class MultiFileStream : IPipeSource
{
private readonly BlobContainerClient _container;
private readonly ILogger _logger;
Expand Down Expand Up @@ -254,42 +254,12 @@ private async Task DownloadClearBlobContent(BlockBlobClient sourceBlob, Stream o

generatedStream.CopyTo(outputStream);
}
}

//Writes a multi file track to a pipe stream.
internal class MultiFilePipe : Pipe
{
private readonly MultiFileStream _multiFileStream;

public MultiFilePipe(
string filePath,
MultiFileStream multiFileStream)
: base(filePath, PipeDirection.Out)
{
_multiFileStream = multiFileStream;
}

public override async Task RunAsync(CancellationToken cancellationToken)
{
await RunAsync(async (stream, token) =>
{
await _multiFileStream.DownloadAsync(stream, token);
}, cancellationToken);
}

public override string GetStreamArguments()
{
return "-f mp4";
}
public string GetStreamArguments() => string.Empty;

public override async Task WriteAsync(Stream stream, CancellationToken cancellationToken)
{
await _multiFileStream.DownloadAsync(stream, cancellationToken);
}
public async Task DownloadAsync(string path, CancellationToken cancellationToken)
public async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
{
using var file = File.OpenWrite(path);
await _multiFileStream.DownloadAsync(file, cancellationToken);
await DownloadAsync(outputStream, cancellationToken);
}
}
}
43 changes: 13 additions & 30 deletions pipes/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@

namespace AMSMigrate.Pipes
{
public interface IPipe: IPipeSource, IPipeSink
{
string PipePath { get; }

Task RunAsync(CancellationToken cancellationToken);
}

/// <summary>
/// A class to abstract platform specific pipe.
/// </summary>
abstract class Pipe : IDisposable, IPipe
abstract class Pipe : IDisposable
{
protected readonly NamedPipeServerStream? _server;
protected readonly PipeDirection _direction;
Expand Down Expand Up @@ -44,9 +37,9 @@ public Pipe(string filePath, PipeDirection direction = PipeDirection.In)
}
}

private void CreatePipe(string filePath)
private static void CreatePipe(string filePath)
{
var startInfo = new ProcessStartInfo("mkfifo", PipePath)
var startInfo = new ProcessStartInfo("mkfifo", filePath)
{
RedirectStandardError = false,
RedirectStandardOutput = false,
Expand Down Expand Up @@ -102,31 +95,21 @@ public void Dispose()
}

public abstract Task RunAsync(CancellationToken cancellationToken);
}

public virtual string GetStreamArguments()
{
return "-seekable 0";
}

public virtual async Task WriteAsync(Stream outputStream, CancellationToken cancellationToken)
{
await RunAsync(async (stream, cancellationToken) =>
{
await stream.CopyToAsync(outputStream, cancellationToken);
}, cancellationToken);
}

public async Task ReadAsync(Stream inputStream, CancellationToken cancellationToken)
class SourcePipe : Pipe
{
private readonly IPipeSource _pipeSource;
public SourcePipe(string filePath, IPipeSource source) : base(filePath)
{
await RunAsync(async (stream, cancellationToken) =>
{
await inputStream.CopyToAsync(stream, cancellationToken);
}, cancellationToken);
_pipeSource = source;
}

public string GetFormat()
public override async Task RunAsync(CancellationToken cancellationToken)
{
return string.Empty;
await RunAsync(
async (stream, token) => await _pipeSource.WriteAsync(stream, token),
cancellationToken);
}
}
}
Loading
Loading