Skip to content

Commit

Permalink
Merge pull request #12 from fluentci-io/feat/flu-12-fluentci-runner
Browse files Browse the repository at this point in the history
feat: add fluentci `agent` subcommand
  • Loading branch information
tsirysndr authored Jan 8, 2024
2 parents 33d3063 + 53b1431 commit 942a6d0
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 93 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fluentci # Run the pipeline
fluentci --help

Usage: fluentci [pipeline] [jobs...]
Version: 0.9.1
Version: 0.10.0

Description:

Expand Down Expand Up @@ -99,6 +99,9 @@ Commands:
docs, man [pipeline] - Show documentation for a pipeline
doctor - Check if FluentCI CLI is installed correctly
env - Show environment variables (read from .fluentci/.env file)
login - Login to FluentCI
publish - Publish a pipeline to FluentCI Registry
agent - Start FluentCI Runner Agent
```
## 📚 Documentation
Expand Down
55 changes: 54 additions & 1 deletion deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export {
TextWriter,
ZipReader,
ZipWriter,
Uint8ArrayReader,
} from "https://deno.land/x/zipjs@v2.7.32/index.js";
import introspect from "https://cdn.jsdelivr.net/gh/fluentci-io/daggerverse@main/deno-sdk/sdk/src/mod/introspect.ts";
export { introspect };
Expand All @@ -36,3 +37,11 @@ import * as R from "https://deno.land/x/ramda@v0.27.2/mod.ts";
export { R };
import * as _ from "https://cdn.skypack.dev/lodash";
export { _ };
import Logger from "https://deno.land/x/logger@v1.1.3/logger.ts";
export { Logger };
export { generateName } from "https://deno.land/x/docker_names@v1.1.0/mod.ts";
import dayjs from "npm:dayjs";
export { dayjs };
import { Buffer } from "npm:buffer";
export { Buffer };
export { mergeReadableStreams } from "https://deno.land/std@0.211.0/streams/merge_readable_streams.ts";
7 changes: 6 additions & 1 deletion main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import doctor from "./src/cmd/doctor.ts";
import showEnvs from "./src/cmd/env.ts";
import login from "./src/cmd/login.ts";
import publish from "./src/cmd/publish.ts";
import startAgent from "./src/cmd/agent.ts";
import { brightGreen } from "./deps.ts";

export async function main() {
await new Command()
.name("fluentci")
.version("0.9.1")
.version("0.10.0")
.description(
`
.
Expand Down Expand Up @@ -198,6 +199,10 @@ export async function main() {
.action(async function () {
await publish();
})
.command("agent", "Start FluentCI Runner Agent")
.action(async function () {
await startAgent();
})
.parse(Deno.args);
}

Expand Down
172 changes: 172 additions & 0 deletions src/cmd/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import {
Logger,
brightGreen,
dir,
generateName,
ZipReader,
BlobReader,
} from "../../deps.ts";
import {
FLUENTCI_WS_URL,
RUNNER_URL,
FLUENTCI_EVENTS_URL,
BUILD_DIR,
} from "../consts.ts";

async function startAgent() {
console.log(`
.
______ __ _________
/ __/ /_ _____ ___ / /_/ ___/ _/
/ _// / // / -_) _ \\/ __/ /___/ /
/_/ /_/\\_,_/\\__/_//_/\\__/\\___/___/
https://fluentci.io
`);
const logger = new Logger();
logger.info("Starting FluentCI Agent ...");

let id = `${generateName()}_${Math.floor(Math.random() * 1000)}`;
try {
const data = await Deno.readFile(`${dir("home")}/.fluentci/agent-id`);
id = new TextDecoder().decode(data);
} catch (_) {
await Deno.mkdir(`${dir("home")}/.fluentci`, { recursive: true });
await Deno.writeTextFile(`${dir("home")}/.fluentci/agent-id`, id);
}

const websocket = new WebSocket(`${FLUENTCI_WS_URL}?agent_id=${id}`);
websocket.onopen = function () {
logger.info(`Connected to FluentCI server as ${brightGreen(id)}`);
logger.info("FluentCI Agent started successfully ✅");
logger.info("Waiting for jobs ...");
logger.info("Press Ctrl+C to exit");
};
websocket.addEventListener("message", async function (event) {
try {
logger.info(`Message from server ${event.data}`);
const data = JSON.parse(event.data);
const { action, src, query, buildId } = JSON.parse(data.event);
const { jobs, pipeline } = JSON.parse(query);

if (action === "build") {
const project_id = src.split("/")[0];
const sha256 = src.split("/")[1].replace(".zip", "");

Deno.mkdirSync(`${dir("home")}/.fluentci/builds`, { recursive: true });

if (exists(`${dir("home")}/.fluentci/builds/${project_id}/${sha256}`)) {
logger.info(
`${brightGreen(src)} already exists, skipping download ...`
);
await spawnFluentCI(
logger,
project_id,
sha256,
pipeline,
jobs,
buildId
);
return;
}

logger.info(`Downloading ${brightGreen(src)} ...`);

const response = fetch(
`${RUNNER_URL}?project_id=${project_id}&sha256=${sha256}`
);
const blob = await response.then((res) => res.blob());
logger.info(`Extracting ${brightGreen(src)} ...`);
await extractZipBlob(blob, project_id, sha256);
await spawnFluentCI(
logger,
project_id,
sha256,
pipeline,
jobs,
buildId
);
}
} catch (e) {
logger.error(`Failed to parse message from server ${event.data}`);
logger.error(e.message);
}
});
}

const exists = (path: string) => {
try {
Deno.statSync(path);
return true;
} catch (_) {
return false;
}
};

const extractZipBlob = async (
blob: Blob,
project_id: string,
sha256: string
) => {
const zipReader = new ZipReader(new BlobReader(blob));
for (const entry of await zipReader.getEntries()) {
const [_, ...path] = entry.filename.split("/").reverse();
const stream = new TransformStream();
const arrayBuffer = new Response(stream.readable).arrayBuffer();
await entry.getData(stream.writable);
const data = await arrayBuffer;
await Deno.mkdir(
`${BUILD_DIR}/${project_id}/${sha256}/${path.reverse().join("/")}`,
{
recursive: true,
}
);
await Deno.writeFile(
`${BUILD_DIR}/${project_id}/${sha256}/${entry.filename}`,
new Uint8Array(data)
);
}
};

const spawnFluentCI = async (
logger: Logger,
project_id: string,
sha256: string,
pipeline: string,
jobs: [string, ...Array<string>],
clientId: string
) => {
const command = new Deno.Command("dagger", {
args: ["--progress", "plain", "run", "fluentci", "run", pipeline, ...jobs],
cwd: `${dir("home")}/.fluentci/builds/${project_id}/${sha256}`,
stdout: "piped",
stderr: "piped",
env: {
_EXPERIMENTAL_DAGGER_CLOUD_URL: `https://events.fluentci.io?id=${
"build-" + clientId
}&project_id=${project_id}`,
DAGGER_CLOUD_TOKEN: Deno.env.get("DAGGER_TOKEN") || "123",
},
});
const process = command.spawn();
const writable = new WritableStream({
write: (chunk) => {
const text = new TextDecoder().decode(chunk);
logger.info(text);
fetch(`${FLUENTCI_EVENTS_URL}?client_id=${clientId}`, {
method: "POST",
body: text,
}).catch((e) => logger.error(e.message));
},
});

await process.stderr?.pipeTo(writable);
const { code } = await process.status;

fetch(`${FLUENTCI_EVENTS_URL}?client_id=${clientId}`, {
method: "POST",
body: `fluentci_exit=${code}`,
}).catch((e) => logger.error(e.message));
};

export default startAgent;
Loading

0 comments on commit 942a6d0

Please sign in to comment.