Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add [[pipelines]] binding in wrangler. #6677

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/angry-keys-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"wrangler": minor
---

Added new [[pipelines]] bindings. This creates a new binding that allows sending events to
the specified pipeline.

Example:

[[pipelines]]
binding = "MY_PIPELINE"
pipeline = "my-pipeline"
109 changes: 109 additions & 0 deletions packages/wrangler/src/__tests__/configuration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ describe("normalizeAndValidateConfig()", () => {
upload_source_maps: undefined,
placement: undefined,
tail_consumers: undefined,
pipelines: [],
});
expect(diagnostics.hasErrors()).toBe(false);
expect(diagnostics.hasWarnings()).toBe(false);
Expand Down Expand Up @@ -3181,6 +3182,114 @@ describe("normalizeAndValidateConfig()", () => {
});
});

describe("[pipelines]", () => {
it("should error if pipelines is an object", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: {} },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got {}."
`);
});

it("should error if pipelines is a string", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: "BAD" },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got \\"BAD\\"."
`);
});

it("should error if pipelines is a number", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: 999 },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got 999."
`);
});

it("should error if pipelines is null", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: null },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got null."
`);
});

it("should accept valid bindings", () => {
const { diagnostics } = normalizeAndValidateConfig(
{
pipelines: [
{
binding: "VALID",
pipeline: "343cd4f1d58c42fbb5bd082592fd7143",
},
],
} as unknown as RawConfig,
undefined,
{ env: undefined }
);

expect(diagnostics.hasErrors()).toBe(false);
});

it("should error if pipelines.bindings are not valid", () => {
const { diagnostics } = normalizeAndValidateConfig(
{
pipelines: [
{},
{
binding: "VALID",
pipeline: "343cd4f1d58c42fbb5bd082592fd7143",
},
{ binding: 2000, project: 2111 },
],
} as unknown as RawConfig,
undefined,
{ env: undefined }
);
expect(diagnostics.renderWarnings()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- Unexpected fields found in pipelines[2] field: \\"project\\""
`);
expect(diagnostics.hasWarnings()).toBe(true);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- \\"pipelines[0]\\" bindings should have a string \\"binding\\" field but got {}.
- \\"pipelines[0]\\" bindings must have a \\"pipeline\\" field but got {}.
- \\"pipelines[2]\\" bindings should have a string \\"binding\\" field but got {\\"binding\\":2000,\\"project\\":2111}.
- \\"pipelines[2]\\" bindings must have a \\"pipeline\\" field but got {\\"binding\\":2000,\\"project\\":2111}."
`);
});
});

describe("[unsafe.bindings]", () => {
it("should error if unsafe is an array", () => {
const { diagnostics } = normalizeAndValidateConfig(
Expand Down
41 changes: 41 additions & 0 deletions packages/wrangler/src/__tests__/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10466,6 +10466,47 @@ export default{
});
});

describe("pipelines", () => {
it("should upload pipelines bindings", async () => {
writeWranglerToml({
pipelines: [
{
binding: "MY_PIPELINE",
pipeline: "0123456789ABCDEF0123456789ABCDEF",
},
],
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest({
expectedBindings: [
{
type: "pipelines",
name: "MY_PIPELINE",
id: "0123456789ABCDEF0123456789ABCDEF",
},
],
});

await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Worker Startup Time: 100 ms
Your worker has access to the following bindings:
- Pipelines:
- MY_PIPELINE: 0123456789ABCDEF0123456789ABCDEF
Uploaded test-name (TIMINGS)
Published test-name (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Current Deployment ID: Galaxy-Class
Current Version ID: Galaxy-Class


Note: Deployment ID has been renamed to Version ID. Deployment ID is present to maintain compatibility with the previous behavior of this command. This output will change in a future version of Wrangler. To learn more visit: https://developers.cloudflare.com/workers/configuration/versions-and-deployments"
`);
});
});

describe("--keep-vars", () => {
it("should send keepVars when keep-vars is passed in", async () => {
vi.stubEnv("CLOUDFLARE_API_TOKEN", "hunter2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ function createWorkerBundleFormData(
text_blobs: undefined,
data_blobs: undefined,
dispatch_namespaces: undefined,
pipelines: undefined,
logfwdr: undefined,
unsafe: undefined,
experimental_assets: undefined,
Expand Down
2 changes: 2 additions & 0 deletions packages/wrangler/src/api/startDevWorker/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
CfLogfwdrBinding,
CfModule,
CfMTlsCertificate,
CfPipeline,
CfQueue,
CfR2Bucket,
CfScriptFormat,
Expand Down Expand Up @@ -261,6 +262,7 @@ export type Binding =
| ({ type: "analytics_engine" } & Omit<CfAnalyticsEngineDataset, "binding">)
| ({ type: "dispatch_namespace" } & Omit<CfDispatchNamespace, "binding">)
| ({ type: "mtls_certificate" } & Omit<CfMTlsCertificate, "binding">)
| ({ type: "pipeline" } & Omit<CfPipeline, "binding">)
| ({ type: "logfwdr" } & Omit<CfLogfwdrBinding, "name">)
| { type: `unsafe_${string}` }
| { type: "assets" };
Expand Down
10 changes: 10 additions & 0 deletions packages/wrangler/src/api/startDevWorker/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ export function convertCfWorkerInitBindingstoBindings(
output[info["binding"]] = { type: "assets" };
break;
}
case "pipelines": {
for (const { binding, ...x } of info) {
output[binding] = { type: "pipeline", ...x };
}
break;
}
default: {
assertNever(type);
}
Expand Down Expand Up @@ -282,6 +288,7 @@ export async function convertBindingsToCfWorkerInitBindings(
logfwdr: undefined,
unsafe: undefined,
experimental_assets: undefined,
pipelines: undefined
};

const fetchers: Record<string, ServiceFetch> = {};
Expand Down Expand Up @@ -354,6 +361,9 @@ export async function convertBindingsToCfWorkerInitBindings(
} else if (binding.type === "mtls_certificate") {
bindings.mtls_certificates ??= [];
bindings.mtls_certificates.push({ ...binding, binding: name });
} else if (binding.type === "pipeline") {
bindings.pipelines ??= [];
bindings.pipelines.push({ ...binding, binding: name });
} else if (binding.type === "logfwdr") {
bindings.logfwdr ??= { bindings: [] };
bindings.logfwdr.bindings.push({ ...binding, name: name });
Expand Down
1 change: 1 addition & 0 deletions packages/wrangler/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,5 @@ export const defaultWranglerConfig: Config = {
},
mtls_certificates: [],
tail_consumers: undefined,
pipelines: [],
};
17 changes: 17 additions & 0 deletions packages/wrangler/src/config/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,23 @@ export interface EnvironmentNonInheritable {
/** Details about the outbound Worker which will handle outbound requests from your namespace */
outbound?: DispatchNamespaceOutbound;
}[];

/**
* Specifies list of Pipelines bound to this Worker environment
*
* NOTE: This field is not automatically inherited from the top level environment,
* and so must be specified in every named environment.
*
* @default `[]`
* @nonInheritable
*/
pipelines: {
/** The binding name used to refer to the bound service. */
binding: string;

/** Name of the Pipeline to bind */
pipeline: string;
}[];
}

/**
Expand Down
31 changes: 19 additions & 12 deletions packages/wrangler/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
import fs from "node:fs";
import dotenv from "dotenv";
import { findUpSync } from "find-up";
import fs from "node:fs";
import type { CfWorkerInit } from "../deployment-bundle/worker";
import { FatalError, UserError } from "../errors";
import { getFlag } from "../experimental-flags";
import { logger } from "../logger";
import { EXIT_CODE_INVALID_PAGES_CONFIG } from "../pages/errors";
import { parseJSONC, parseTOML, readFileSync } from "../parse";
import { isPagesConfig, normalizeAndValidateConfig } from "./validation";
import { validatePagesConfig } from "./validation-pages";
import type { CfWorkerInit } from "../deployment-bundle/worker";
import type { CommonYargsOptions } from "../yargs-types";
import type { Config, OnlyCamelCase, RawConfig } from "./config";
import type { NormalizeAndValidateConfigArgs } from "./validation";
import { isPagesConfig, normalizeAndValidateConfig } from "./validation";
import { validatePagesConfig } from "./validation-pages";

export type {
Config,
RawConfig,
ConfigFields,
DevConfig,
RawDevConfig,
Config, ConfigFields,
DevConfig, RawConfig, RawDevConfig
} from "./config";
export type {
Environment,
RawEnvironment,
ConfigModuleRuleType,
ConfigModuleRuleType, Environment,
RawEnvironment
} from "./environment";

type ReadConfigCommandArgs = NormalizeAndValidateConfigArgs & {
Expand Down Expand Up @@ -232,6 +228,7 @@ export function printBindings(bindings: CfWorkerInit["bindings"]) {
wasm_modules,
dispatch_namespaces,
mtls_certificates,
pipelines,
} = bindings;

if (data_blobs !== undefined && Object.keys(data_blobs).length > 0) {
Expand Down Expand Up @@ -443,6 +440,16 @@ export function printBindings(bindings: CfWorkerInit["bindings"]) {
});
}

if (pipelines?.length) {
output.push({
type: "Pipelines",
entries: pipelines.map(({ binding, pipeline }) => ({
key: binding,
value: pipeline,
}))
})
}

if (version_metadata !== undefined) {
output.push({
type: "Worker Version Metadata",
Expand Down
Loading
Loading