Skip to content

Commit

Permalink
Finalized S3 switch-over for containers and HTTP.
Browse files Browse the repository at this point in the history
Git is tricky and will be tested with AWS MountPoint instead.
  • Loading branch information
linus-berg committed Sep 24, 2023
1 parent d3f29bc commit 5ba8d12
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 116 deletions.
24 changes: 6 additions & 18 deletions ACM.Container/Collector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,20 @@
namespace ACM.Container;

public class Collector : ICollector {
private readonly FileSystem fs_;
private readonly SkopeoClient skopeo_;

public Collector(FileSystem fs, SkopeoClient skopeo) {
fs_ = fs;
public Collector(SkopeoClient skopeo) {
skopeo_ = skopeo;
}

public async Task Consume(ConsumeContext<ArtifactCollectRequest> context) {
ArtifactCollectRequest request = context.Message;
string wd = fs_.GetModuleDir(context.Message.module);

SkopeoManifest? manifest = await skopeo_.ImageExists(request.location, wd);
bool collect = false;

/* If manifest does not exist on disk */
if (manifest == null) {
collect = true;
} else {
/* Verify all layers are present */
collect = !manifest.VerifyLayers();
}

/* Collect if missing manifest or layers */
if (collect) {
await skopeo_.CopyToOci(request.location, wd);

SkopeoManifest? manifest = await skopeo_.ImageExists(request.location);
if (manifest != null) {
return;
}
await skopeo_.CopyToRegistry(request.location);
}
}
5 changes: 2 additions & 3 deletions ACM.Container/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
using APC.Skopeo;

ModuleRegistration registration = new(ModuleType.ACM, typeof(Collector));
registration.AddEndpoint("docker", 1);
registration.AddEndpoint("oci", 1);
registration.AddEndpoint("docker", 5);
registration.AddEndpoint("oci", 5);

IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services => {
services.AddTelemetry(registration);
services.AddSingleton<SkopeoClient>();
services.AddSingleton<FileSystem>();
services.Register(registration);
services.AddHostedService<Worker>();
})
Expand Down
3 changes: 2 additions & 1 deletion ACM.Container/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"commandName": "Project",
"dotnetRunMessages": true,
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
"DOTNET_ENVIRONMENT": "Development",
"ACM_CONTAINER_REGISTRY": "localhost:6000"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ACM.Http/Collector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task Consume(ConsumeContext<ArtifactCollectRequest> context) {
RemoteFile rf = new(location, fs_);
if (await rf.Get(fp)) {
if (delta_) {
fs_.CreateDailyLink(module, location);
await fs_.CreateDeltaLink(module, location);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion ACM.Http/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
"commandName": "Project",
"dotnetRunMessages": true,
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
"DOTNET_ENVIRONMENT": "Development",
"ACM_S3_ACCESS_KEY": "minio-apc",
"ACM_S3_SECRET_KEY": "minio-apc",
"ACM_S3_REGION": "",
"ACM_S3_ENDPOINT": "localhost:9000",
"ACM_S3_BUCKET": "apc"
}
}
}
Expand Down
38 changes: 22 additions & 16 deletions ACM.Http/RemoteFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace ACM.Http;

public class RemoteFile {
private const int BUFFER_SIZE_ = 8192;
private static readonly HttpClient CLIENT_ = new();
private readonly string url_;
private readonly FileSystem fs_;
Expand All @@ -13,42 +12,49 @@ public RemoteFile(string url, FileSystem fs) {
fs_ = fs;
}

public async Task<bool> Get(string filepath) {
public async Task<bool> Get(string path) {
HttpResponseMessage response =
await CLIENT_.GetAsync(url_, HttpCompletionOption.ResponseHeadersRead);
if (!response.IsSuccessStatusCode ||
response.Content.Headers.ContentLength == null) {
return false;
}

long size = (long)response.Content.Headers.ContentLength;
await using Stream s = await response.Content.ReadAsStreamAsync();
long remote_size = (long)response.Content.Headers.ContentLength;
await using Stream remote_stream = await response.Content.ReadAsStreamAsync();
bool result;
try {
await ProcessStream(s, filepath);
result = await ProcessStream(path, remote_stream);
} catch (Exception e) {
s.Close();
await ClearFile(filepath);
remote_stream.Close();
await ClearFile(path);
throw;
}

/* If downloaded file size matches remote, its complete */
if (await fs_.GetFileSize(filepath) == size) {
return true;
if (result) {
/* If downloaded file size matches remote, its complete */
long size = await fs_.GetFileSize(path);
if (size == remote_size) {
return true;
}
} else {
await ClearFile(path);
throw new HttpRequestException($"{url_} failed to collect.");
}

await ClearFile(filepath);
return false;
}

private async Task
ProcessStream(Stream s, string filepath) {

private async Task<bool>
ProcessStream(string path, Stream remote_stream) {
bool result;
try {
await fs_.PutFile(filepath, s);
result = await fs_.PutFile(path, remote_stream);
} catch (Exception e) {
await ClearFile(filepath);
await ClearFile(path);
throw;
}
return result;
}

private async Task<bool> ClearFile(string file) {
Expand Down
19 changes: 11 additions & 8 deletions ACM.Kernel/FileSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,28 @@ public async Task<bool> PutFile(string path, Stream stream) {
}


private string GetDailyDeposit(string module) {
string daily_deposit = Path.Join(BASE_DIR_, "Daily", module);
private string GetDeltaDeposit(string module) {
string daily_deposit = Path.Join("delta", module);
return Path.Join(daily_deposit, DateTime.UtcNow.ToString("yyyy_MM_dd"));
}

public void CreateDailyLink(string module, string uri_str) {
public async Task<bool> CreateDeltaLink(string module, string uri_str) {
Uri uri = new(uri_str);
string location = GetDiskLocation(uri);
string daily_deposit = GetDailyDeposit(module);
string daily_deposit = GetDeltaDeposit(module);
string link = Path.Join(daily_deposit, location);
Directory.CreateDirectory(Path.GetDirectoryName(link));
File.CreateSymbolicLink(link, GetArtifactPath(module, uri_str));
string target = GetArtifactPath(module, uri_str);
return await CreateS3Link(link, target);
}

private async Task<bool> CreateS3Link(string link, string target) {
return await storage_backend_.SaveFileAsync(link, target);
}

public string GetArtifactPath(string module, string uri_str) {
Uri uri = new(uri_str);
string location = GetDiskLocation(uri);
string path = GetModulePath(module, location);
return path;
return GetModulePath(module, location);
}

public async Task<long> GetFileSize(string filepath) {
Expand Down
5 changes: 4 additions & 1 deletion APC.Kernel/ApcVariable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ public enum ApcVariable {
ACM_S3_SECRET_KEY,
ACM_S3_REGION,
ACM_S3_ENDPOINT,
ACM_S3_BUCKET
ACM_S3_BUCKET,

// Registry proxy for S3
ACM_CONTAINER_REGISTRY
}
21 changes: 0 additions & 21 deletions APC.Skopeo/Models/OciDir.cs

This file was deleted.

52 changes: 25 additions & 27 deletions APC.Skopeo/SkopeoClient.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
using System.Text;
using APC.Kernel;
using APC.Kernel.Extensions;
using APC.Skopeo.Models;
using CliWrap;

namespace APC.Skopeo;

public class SkopeoClient {
public async Task CopyToOci(string input, string oci_dir) {
Image image = new(input);
OciDir oci = new(oci_dir);
Directory.CreateDirectory(oci.Repositories);
Directory.CreateDirectory(Path.Join(oci.Repositories, image.Destination));
public async Task<bool> CopyToRegistry(string remote_image) {
Image image = new(remote_image);
string registry =
Configuration.GetApcVar(ApcVariable.ACM_CONTAINER_REGISTRY);

string internal_image = $"docker://{registry}/{image.Repository}";
StringBuilder std_out = new StringBuilder();
StringBuilder std_err = new StringBuilder();
Command cmd = Cli.Wrap("skopeo")
.WithWorkingDirectory(oci.Repositories)
.WithArguments(args => {
args.Add("copy");
args.Add("--quiet");
args.Add("--dest-shared-blob-dir");
args.Add(oci.Shared);
args.Add("--dest-tls-verify=false");
args.Add(image.Uri);
args.Add(
$"oci:{image.Repository}");
});
StringBuilder sb = new();
Console.WriteLine($"Pulling {image.Uri} -> oci:{image.Repository}");
args.Add(internal_image);
})
.WithStandardOutputPipe(
PipeTarget.ToStringBuilder(std_out))
.WithStandardErrorPipe(
PipeTarget.ToStringBuilder(std_err));
Console.WriteLine($"Pulling {image.Uri} -> {internal_image}");
try {
CommandResult result =
await (cmd | PipeTarget.ToStringBuilder(sb)).ExecuteAsync();
await cmd.ExecuteAsync();
} catch (Exception e) {
Console.WriteLine(e);
Console.WriteLine(std_err);
throw;
}
return true;
}


Expand All @@ -50,26 +54,20 @@ public async Task CopyToOci(string input, string oci_dir) {
return tags;
}

public async Task<SkopeoManifest?> ImageExists(string input, string oci_dir) {
public async Task<SkopeoManifest?> ImageExists(string input) {
Image image = new(input);
OciDir oci = new(oci_dir);
Directory.CreateDirectory(oci.Repositories);
if (!Directory.Exists(Path.Join(oci.Repositories, image.Destination))) {
return null;
}
string registry =
Configuration.GetApcVar(ApcVariable.ACM_CONTAINER_REGISTRY);
Command cmd = Cli.Wrap("skopeo")
.WithWorkingDirectory(oci.Repositories)
.WithArguments(args => {
args.Add("inspect");
args.Add("--shared-blob-dir");
args.Add(oci.Shared);
args.Add("--tls-verify=false");
args.Add(
$"oci:{image.Repository}");
$"docker://{registry}/{image.Repository}");
});
SkopeoManifest manifest;
try {
manifest = await cmd.ExecuteWithResult<SkopeoManifest>();
manifest.WorkingDirectory = oci_dir;
} catch (Exception e) {
return null;
}
Expand Down
21 changes: 3 additions & 18 deletions APC.Tester/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,11 @@
using Foundatio;
using Foundatio.Storage;
using IJetbrains = APM.Jetbrains.IDE.IJetbrains;
MinioFileStorageConnectionStringBuilder connection =
new MinioFileStorageConnectionStringBuilder();
connection.Region = "bleh";
connection.AccessKey = "minio-apc";
connection.SecretKey = "minio-apc";
connection.EndPoint = "localhost:9000";
connection.Bucket = "APC";
//Artifact p = await pypi.ProcessArtifact(artifact);

MinioFileStorageOptions minio_options = new MinioFileStorageOptions() {
AutoCreateBucket = true,
ConnectionString = connection.ToString()
};
IFileStorage storage = new MinioFileStorage(minio_options);
FileSystem fs = new FileSystem(storage);

SkopeoClient client = new SkopeoClient();

string url = "http://localhost:9001/static/js/main.379b5c5e.js";
RemoteFile file = new RemoteFile(url, fs);
string output = fs.GetArtifactPath("npm", url);

await file.Get(output);
//Artifact p = await pypi.ProcessArtifact(artifact);
await client.CopyToRegistry("docker://docker.io/registry:2");
Console.WriteLine("---");
3 changes: 2 additions & 1 deletion APC.Tester/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"APC.Tester": {
"commandName": "Project",
"environmentVariables": {
"APC_MONGO_STR": "mongodb://root:example@localhost:27017"
"APC_MONGO_STR": "mongodb://root:example@localhost:27017",
"ACM_CONTAINER_REGISTRY": "localhost:6000"
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions Compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ version: "3.8"
services:
# Infrastructure
# required to run container syncs
apc-prometheus:
image: prom/prometheus:latest
command:
- --config.file=/etc/prometheus.yaml
- --web.enable-remote-write-receiver
- --enable-feature=exemplar-storage
ports:
- "9090:9090"
volumes:
- "./prometheus.yml:/etc/prometheus.yaml"
apc-minio:
image: minio/minio
restart: always
Expand All @@ -13,6 +23,9 @@ services:
environment:
MINIO_ROOT_USER: minio-dev
MINIO_ROOT_PASSWORD: minio-dev
MINIO_PROMETHEUS_URL: http://apc-prometheus:9090
MINIO_PROMETHEUS_JOB_ID: minio-job
MINIO_PROMETHEUS_AUTH_TYPE: public
command: server --console-address ":9001" /data

apc-internal-registry:
Expand Down
Loading

0 comments on commit 5ba8d12

Please sign in to comment.