Skip to content

Commit

Permalink
rules of steps
Browse files Browse the repository at this point in the history
  • Loading branch information
elithrar committed Oct 19, 2024
1 parent 3f6db1d commit 6d2439f
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 7 deletions.
181 changes: 176 additions & 5 deletions src/content/docs/workflows/build/rules-of-steps.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,193 @@ title: Rules of Steps
pcx_content_type: concept
sidebar:
order: 10

---

A Workflow step is self-contained, individually retriable component of a Workflow. Steps may emit (optional) state that allows a Workflow to persist and continue from that step, even if a Workflow fails due to a network or infrastructure issue. A Workflow is comprised of one or more steps.

This is a small guidebook on how to build more resilient and correct Workflows.

### Ensure API/Binding calls are idempotent

Because a step might be retried multiple times, your steps should (ideally) be idempotent. For context, idempotency is a logical property where the operation (in this case a step),
can be applied multiple times without changing the result beyond the intial application.

As an example, let's assume you have a Workflow that charges your customers and you really don't want to charge them twice by accident, before charging them, you should
check if they were already charged:

```ts
export class MyWorkflow extends Workflow<Env, Params> {
async run(events: WorkflowEvent[], step: WorkflowStep) {
const customer_id = 123456;
// ✅ Good: Non-idempotent API/Binding calls are always done **after** checking if the operation is
// still needed.
await step.do(
`charge ${customer_id} for it's montly subscription`,
async () => {
// API call to check if customer was already charged
const subscription = await fetch(
`https://payment.processor/subscriptions/${customer_id}`,
).then((res) => res.json());

// return early if the customer was already charged, this can happen if the destination service dies
// in the middle of the request but still commits it, or if the Workflows Engine restarts.
if (subscription.charged) {
return;
}

// non-idempotent call, this operation can fail and retry but still commit in the payment
// processor - which means that, on retry, it would mischarge the customer again if the above checks
// were not in place.
return await fetch(
`https://payment.processor/subscriptions/${customer_id}`,
{
method: "POST",
body: JSON.stringify({ amount: 10.0 }),
},
);
},
);
}
}
```

:::note

Guaranteeing idempotency might be optional in your specific use-case and implementaion, although we recommend it to always try to guarantee it.

:::

### Make your steps granular

TODO - step is a transaction, should be a self-contained logic. If you have multiple API calls, seperate them into their own steps.
Steps should be as self-contained as possible, this allows your own logic to be more durable in case of failures in third-party APIs, network errors, and so on.
You can also think of it as a transaction, or a unit of work.

### Ensure API calls are idempotent
- ✅ Minimize the number of API/binding calls per step (unless you need multiple calls to prove idempotency).

TODO
```ts
export class MyWorkflow extends Workflow<Env, Params> {
async run(events: WorkflowEvent[], step: WorkflowStep) {
// ✅ Good: Unrelated API/Binding calls are self-contained, so that in case one of them fails
// it can retry them individually. It also has an extra advantage: you can control retry or
// timeout policies for each granular step - you might not to want to overload http.cat in
// case of it being down.
const httpCat = await step.do("get cutest cat from KV", async () => {
return await env.KV.get("cutest-http-cat");
});

const image = await step.do("fetch cat image from http.cat", async () => {
return await fetch(`https://http.cat/${httpCat}`);
});
}
}
```

Otherwise your entire workflow might not be as durable as you might think, and encounter into some undefined behaviour and you can avoid them by:

- 🔴 Do not encapsulate your entire logic in one single step.
- 🔴 Do not call seperate services in the same step (unless you need it to prove idempotency)
- 🔴 Do not make too many service calls in the same step (unless you need it to prove idempotency)
- 🔴 Do not do too much CPU-intensive work inside of a single step - sometimes engine might have to restart and it will start over from that step.

```ts
export class MyWorkflow extends Workflow<Env, Params> {
async run(events: WorkflowEvent[], step: WorkflowStep) {
// 🔴 Bad: you're calling two seperate services from within the same step. This might cause
// some extra calls to the first service in case the second one fails, and in some cases, makes
// the step non-idempotent altogether
const image = await step.do("get cutest cat from KV", async () => {
const httpCat = await env.KV.get("cutest-http-cat");
return fetch(`https://http.cat/${httpCat}`);
});
}
}
```

### Don't rely on state outside of a step

TODO
Sometimes, our Engine will hibernate and lose all in-memory state - this will happen when engine detects that there's no pending work and can hibernate
until it needs to wake-up (because of a sleep, retry or event). This means that you can't do something like this:

```ts
function getRandomInt(min, max) {
const minCeiled = Math.ceil(min);
const maxFloored = Math.floor(max);
return Math.floor(Math.random() * (maxFloored - minCeiled) + minCeiled); // The maximum is exclusive and the minimum is inclusive
}

export class MyWorkflow extends Workflow<Env, Params> {
async run(events: WorkflowEvent[], step: WorkflowStep) {
// 🔴 Bad: `imageList` will be not persisted across engine's lifetimes. Which means that after hibernation,
// `imageList` will be empty again, even though the following two steps have already ran.
const imageList: string[] = [];

await step.do("get first cutest cat from KV", async () => {
const httpCat = await env.KV.get("cutest-http-cat-1");

imageList.append(httpCat);
});

await step.do("get second cutest cat from KV", async () => {
const httpCat = await env.KV.get("cutest-http-cat-2");

imageList.append(httpCat);
});

// A long sleep can (and probably will) hibernate the engine which means that the first engine lifetime ends here
await step.sleep("💤💤💤💤", "3 hours");

// When this runs, it will be on the second engine lifetime - which means `imageList` will be empty.
await step.do(
"choose a random cat from the list and download it",
async () => {
const randomCat = imageList.at(getRandomInt(0, imageList.length));
// this will fail since `randomCat` is undefined because `imageList` is empty
return await fetch(`https://http.cat/${randomCat}`);
},
);
}
}
```

Instead, you should build top-level state exclusively comprised of `step.do` returns:

```ts
function getRandomInt(min, max) {
const minCeiled = Math.ceil(min);
const maxFloored = Math.floor(max);
return Math.floor(Math.random() * (maxFloored - minCeiled) + minCeiled); // The maximum is exclusive and the minimum is inclusive
}

export class MyWorkflow extends Workflow<Env, Params> {
async run(events: WorkflowEvent[], step: WorkflowStep) {
// ✅ Good: imageList state is exclusively comprised of step returns - this means that in the event of
// multiple engine lifetimes, imageList will be built accordingly
const imageList: string[] = await Promise.all([
step.do("get first cutest cat from KV", async () => {
return await env.KV.get("cutest-http-cat-1");
}),

step.do("get second cutest cat from KV", async () => {
return await env.KV.get("cutest-http-cat-2");
}),
]);

// A long sleep can (and probably will) hibernate the engine which means that the first engine lifetime ends here
await step.sleep("💤💤💤💤", "3 hours");

// When this runs, it will be on the second engine lifetime - but this time, imageList will contain
// the two most cutest cats
await step.do(
"choose a random cat from the list and download it",
async () => {
const randomCat = imageList.at(getRandomInt(0, imageList.length));
// this will eventually succeed since `randomCat` is defined
return await fetch(`https://http.cat/${randomCat}`);
},
);
}
}
```

### Set sensible retry parameters

Expand Down
62 changes: 62 additions & 0 deletions src/content/docs/workflows/get-started/cli-quick-start.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,75 @@ TODO - basic trigger steps
### CLI

```sh
# Trigger a Workflow from the CLI, and pass (optional) parameters as an event to the Workflow.
npx wrangler@latest workflows trigger workflows-tutorial --params={"hello":"world"}
```

Refer to the [events and parameters documentation](/workflows/build/events-and-parameters/) to understand how events are passed to Workflows.

### Worker binding

TODO - trigger from a Worker

```toml title="wrangler.toml"
[[workflows]]
# name of your workflow
name = "workflows-starter"
# binding name env.MYWORKFLOW
binding = "MY_WORKFLOW"
# this is class that extends the Workflow class in src/index.ts
class_name = "MyWorkflow"
# script_name is required during for the beta.
# Must match the "name" of your Worker at the top of wrangler.toml
script_name = "workflows-starter"
```

You can then invoke the methods on this binding directly from your Worker script. The `Workflow` type has methods for:

* `create(newId, params)` - creating (triggering) a new instance of the Workflow, with optional
* `get(id)`- retrieve a Workflow instance by ID
* `status()` - get the current status of a unique Workflow instance.

For example, the following Worker will fetch the status of an existing Workflow instance by ID (if supplied), else it will create a new Workflow instance and return its ID:

```ts title="src/index.ts"
// Import the Workflow definition
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from "cloudflare:workflows"

interface Env {
// Matches the binding definition in your wrangler.toml
MY_WORKFLOW: Workflow;
}

export default {
async fetch(req: Request, env: Env) {
//
const instanceId = new URL(req.url).searchParams.get("instanceId")

// If an ?instanceId=<id> query parameter is provided, fetch the status
// of an existing Workflow by its ID.
if (instanceId) {
let instance = await env.MYWORKFLOW.get(id);
return Response.json({
status: await instance.status(),
});
}

// Else, create a new instance of our Workflow, passing in any (optional) params
// and return the ID.
const newId = await crypto.randomUUID();
let instance = await env.MYWORKFLOW.create(newId, {});
return Response.json({
id: instance.id,
details: await instance.status(),
});

return Response.json({ result });
},
};
```

Refer to the [triggering Workflows] documentation for how to trigger a Workflow from other Workers handler functions.

## 4. Managing Workflows

Expand Down
10 changes: 8 additions & 2 deletions src/content/docs/workflows/get-started/guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ binding = "MY_WORKFLOW"
# this is class that extends the Workflow class in src/index.ts
class_name = "MyWorkflow"
# script_name is required during for the beta.
# Must match the "name" of your Worker at the top of wrangler.toml
# Must match the "name" of your Worker at the top of wrangler.toml
script_name = "workflows-starter"
```

Expand Down Expand Up @@ -228,9 +228,15 @@ In a production application, you might choose to put authentication in front of

### Review your Workflow code

:::note

This is the full contents of the `src/index.ts` file pulled down when you used the `cloudflare/workflows-starter` template at the beginning of this guide.

:::

Before you deploy, you can review the full Workflows code and the `fetch` handler that will allow you to trigger your Workflow over HTTP:

```ts
```ts title="src/index.ts"
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';

type Env = {
Expand Down

0 comments on commit 6d2439f

Please sign in to comment.