Skip to content

Commit

Permalink
Merge branch 'main' into DX-1279-context-cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda authored Nov 19, 2024
2 parents ab7f459 + 2937841 commit ce636b9
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 28 deletions.
162 changes: 146 additions & 16 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, test } from "bun:test";
import { describe, test, expect } from "bun:test";
import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../test-utils";
import { Client } from ".";
import { getWorkflowRunId, nanoid } from "../utils";
Expand All @@ -7,21 +7,151 @@ describe("workflow client", () => {
const token = nanoid();
const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });

test("should send cancel", async () => {
const workflowRunId = `wfr-${nanoid()}`;
await mockQStashServer({
execute: async () => {
await client.cancel({ workflowRunId });
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "DELETE",
url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/${workflowRunId}?cancel=true`,
token,
},
describe("cancel - mocked", () => {
test("should cancel single workflow run id", async () => {
const ids = `wfr-${nanoid()}`;
await mockQStashServer({
execute: async () => {
await client.cancel({ ids });
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "DELETE",
url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs`,
token,
body: { workflowRunIds: [ids] },
},
});
});

test("should cancel multiple workflow run ids", async () => {
const ids = [`wfr-${nanoid()}`, `wfr-${nanoid()}`];
await mockQStashServer({
execute: async () => {
await client.cancel({ ids });
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "DELETE",
url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs`,
token,
body: { workflowRunIds: ids },
},
});
});

test("should cancel workflowUrl", async () => {
const urlStartingWith = "http://workflow-endpoint.com";
await mockQStashServer({
execute: async () => {
await client.cancel({ urlStartingWith });
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "DELETE",
url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs`,
token,
body: { workflowUrl: urlStartingWith },
},
});
});

test("should cancel all", async () => {
await mockQStashServer({
execute: async () => {
await client.cancel({ all: true });
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "DELETE",
url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs`,
token,
body: {},
},
});
});

test("should throw if no option", async () => {
const throws = () => client.cancel({});
expect(throws).toThrow("The `cancel` method cannot be called without any options.");
});
});

describe("cancel - live", () => {
const liveClient = new Client({
baseUrl: process.env.QSTASH_URL,
token: process.env.QSTASH_TOKEN!,
});

test("should cancel single workflow run id", async () => {
const { workflowRunId } = await liveClient.trigger({
url: "http://requestcatcher.com",
});

const cancel = await liveClient.cancel({
ids: workflowRunId,
});
expect(cancel).toEqual({ cancelled: 1 });

const throws = () => liveClient.cancel({ ids: workflowRunId });
expect(throws).toThrow(`{"error":"workflowRun ${workflowRunId} not found"}`);
});

test("should cancel multiple workflow run ids", async () => {
const { workflowRunId: workflowRunIdOne } = await liveClient.trigger({
url: "http://requestcatcher.com",
});
const { workflowRunId: workflowRunIdTwo } = await liveClient.trigger({
url: "http://requestcatcher.com",
});

const throws = async () =>
await liveClient.cancel({
ids: [workflowRunIdOne, workflowRunIdTwo, "non-existent"],
});

// if there is any workflow which doesn't exist, we throw
expect(throws).toThrow(`{"error":"workflowRun non-existent not found"}`);

// trying to cancel the workflows one by one gives error, as they were canceled above
const throwsFirst = async () => await liveClient.cancel({ ids: workflowRunIdOne });
expect(throwsFirst).toThrow(`{"error":"workflowRun ${workflowRunIdOne} not found"}`);

// trying to cancel the workflows one by one gives error, as they were canceled above
const throwsSecond = async () => await liveClient.cancel({ ids: workflowRunIdTwo });
expect(throwsSecond).toThrow(`{"error":"workflowRun ${workflowRunIdTwo} not found"}`);
});

test("should cancel workflowUrl", async () => {
await liveClient.trigger({
url: "http://requestcatcher.com/first",
});
await liveClient.trigger({
url: "http://requestcatcher.com/second",
});

const cancel = await liveClient.cancel({
urlStartingWith: "http://requestcatcher.com",
});

expect(cancel).toEqual({ cancelled: 2 });
});

test.skip("should cancel all", async () => {
// intentionally didn't write a test for cancel.all,
// because it may break apps running on the same QStash user.
});
});

Expand Down
90 changes: 78 additions & 12 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,84 @@ export class Client {
*
* Returns true if workflow is canceled succesfully. Otherwise, throws error.
*
* There are multiple ways you can cancel workflows:
* - pass one or more workflow run ids to cancel them
* - pass a workflow url to cancel all runs starting with this url
* - cancel all pending or active workflow runs
*
* ### Cancel a set of workflow runs
*
* ```ts
* import { Client } from "@upstash/workflow";
* // cancel a single workflow
* await client.cancel({ ids: "<WORKFLOW_RUN_ID>" })
*
* const client = new Client({ token: "<QSTASH_TOKEN>" })
* await client.cancel({ workflowRunId: "<WORKFLOW_RUN_ID>" })
* // cancel a set of workflow runs
* await client.cancel({ ids: [
* "<WORKFLOW_RUN_ID_1>",
* "<WORKFLOW_RUN_ID_2>",
* ]})
* ```
*
* ### Cancel workflows starting with a url
*
* If you have an endpoint called `https://your-endpoint.com` and you
* want to cancel all workflow runs on it, you can use `urlStartingWith`.
*
* Note that this will cancel workflows in all endpoints under
* `https://your-endpoint.com`.
*
* ```ts
* await client.cancel({ urlStartingWith: "https://your-endpoint.com" })
* ```
*
* ### Cancel *all* workflows
*
* To cancel all pending and currently running workflows, you can
* do it like this:
*
* ```ts
* await client.cancel({ all: true })
* ```
*
* @param workflowRunId run id of the workflow to delete
* @param ids run id of the workflow to delete
* @param urlStartingWith cancel workflows starting with this url. Will be ignored
* if `ids` parameter is set.
* @param all set to true in order to cancel all workflows. Will be ignored
* if `ids` or `urlStartingWith` parameters are set.
* @returns true if workflow is succesfully deleted. Otherwise throws QStashError
*/
public async cancel({ workflowRunId }: { workflowRunId: string }) {
return await makeCancelRequest(this.client.http, workflowRunId);
public async cancel({
ids,
urlStartingWith,
all,
}: {
ids?: string | string[];
urlStartingWith?: string;
all?: true;
}) {
let body: string;
if (ids) {
const runIdArray = typeof ids === "string" ? [ids] : ids;

body = JSON.stringify({ workflowRunIds: runIdArray });
} else if (urlStartingWith) {
body = JSON.stringify({ workflowUrl: urlStartingWith });
} else if (all) {
body = "{}";
} else {
throw new TypeError("The `cancel` method cannot be called without any options.");
}

const result = await this.client.http.request<{ cancelled: number }>({
path: ["v2", "workflows", "runs"],
method: "DELETE",
body,
headers: {
"Content-Type": "application/json",
},
});

return result;
}

/**
Expand Down Expand Up @@ -94,13 +160,13 @@ export class Client {
* Trigger new workflow run and returns the workflow run id
*
* ```ts
* const { workflowRunId } await client.trigger({
* const { workflowRunId } = await client.trigger({
* url: "https://workflow-endpoint.com",
* body: "hello there!", // optional body
* headers: { ... }, // optional headers
* workflowRunId: "my-workflow", // optional workflow run id
* retries: 3 // optional retries in the initial request
* })
* body: "hello there!", // Optional body
* headers: { ... }, // Optional headers
* workflowRunId: "my-workflow", // Optional workflow run ID
* retries: 3 // Optional retries for the initial request
* });
*
* console.log(workflowRunId)
* // wfr_my-workflow
Expand Down

0 comments on commit ce636b9

Please sign in to comment.