Skip to content

Commit

Permalink
feat: update flows interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Dvorak <toomas2d@gmail.com>
  • Loading branch information
Tomas2D committed Dec 19, 2024
1 parent 358ffb2 commit d955f7d
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 56 deletions.
1 change: 1 addition & 0 deletions eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export default tseslint.config(
],
},
],
"@typescript-eslint/no-non-null-asserted-optional-chain": "off",
"@typescript-eslint/no-explicit-any": "off",
"@typescript-eslint/no-unused-vars": "off",
"@typescript-eslint/no-empty-function": "off",
Expand Down
2 changes: 1 addition & 1 deletion examples/flows/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { WikipediaTool } from "bee-agent-framework/tools/search/wikipedia";
import { OpenMeteoTool } from "bee-agent-framework/tools/weather/openMeteo";
import { ReadOnlyMemory } from "bee-agent-framework/memory/base";
import { UnconstrainedMemory } from "bee-agent-framework/memory/unconstrainedMemory";
import { Flow } from "bee-agent-framework/flows";
import { Flow } from "bee-agent-framework/experimental/flows";
import { createConsoleReader } from "examples/helpers/io.js";

const schema = z.object({
Expand Down
4 changes: 2 additions & 2 deletions examples/flows/contentCreator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import "dotenv/config.js";
import { Flow } from "bee-agent-framework/flows";
import "dotenv/config";
import { Flow } from "bee-agent-framework/experimental/flows";
import { z } from "zod";
import { BeeAgent } from "bee-agent-framework/agents/bee/agent";
import { UnconstrainedMemory } from "bee-agent-framework/memory/unconstrainedMemory";
Expand Down
37 changes: 37 additions & 0 deletions examples/flows/nesting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Flow } from "bee-agent-framework/experimental/flows";
import { z } from "zod";

const schema = z.object({
counter: z.number().default(0),
});

const addFlow = new Flow({ schema }).addStep("run", async (state) => ({
next: Math.random() > 0.5 ? Flow.SELF : Flow.END,
update: { counter: state.counter + 1 },
}));

const subtractFlow = new Flow({
schema,
}).addStep("run", async (state) => ({
update: { counter: state.counter - 1 },
next: Math.random() > 0.5 ? Flow.SELF : Flow.END,
}));

const flow = new Flow({
schema: z.object({
threshold: z.number().min(0).max(1),
counter: z.number().default(0),
}),
})
.addStep("start", (state) => ({
next: Math.random() > state.threshold ? "addFlow" : "subtractFlow",
}))
.addStep("addFlow", addFlow.asStep({ next: Flow.END }))
.addStep("subtractFlow", subtractFlow);

const response = await flow.run({ threshold: 0.5 }).observe((emitter) => {
emitter.on("start", (data, event) =>
console.log(`-> step ${data.step}`, event.trace?.parentRunId ? "(nested flow)" : ""),
);
});
console.info(`Counter:`, response.result);
34 changes: 6 additions & 28 deletions examples/flows/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,21 @@ const schema = z.object({
hops: z.number().default(0),
});

const sumFlow = new Flow({ schema })
const flow = new Flow({ schema })
.addStep("a", async (state) => ({})) // does nothing

Check failure on line 9 in examples/flows/simple.ts

View workflow job for this annotation

GitHub Actions / Lint & Build & Test

Parameter 'state' implicitly has an 'any' type.
.addStep("b", async (state) => ({

Check failure on line 10 in examples/flows/simple.ts

View workflow job for this annotation

GitHub Actions / Lint & Build & Test

Parameter 'state' implicitly has an 'any' type.
// adds one and moves to b
update: { hops: state.hops + 1 },
}))
.addStep("c", async () => ({
.addStep("c", async (state) => ({

Check failure on line 14 in examples/flows/simple.ts

View workflow job for this annotation

GitHub Actions / Lint & Build & Test

Parameter 'state' implicitly has an 'any' type.
update: { hops: state.hops + 1 },
next: Math.random() > 0.5 ? "b" : Flow.END,
}));

const multipleFlow = new Flow({
schema: schema.extend({ multiplier: z.number().int().min(1) }),
})
.addStep("a", async (state) => ({
// adds one and moves to b
update: { hops: state.hops * state.multiplier },
}))
.addStep("b", async () => ({
next: Math.random() > 0.5 ? "a" : Flow.END,
}));

const flow = new Flow({ schema })
.addStep("start", () => ({
next: Math.random() > 0.5 ? "sum" : "multiple",
}))
.addStep(
"multiple",
multipleFlow.asStep({ next: Flow.END, input: ({ hops }) => ({ hops, multiplier: 2 }) }),
)
.addStep("sum", sumFlow);

const response = await flow.run({ hops: 0 }).observe((emitter) => {

Check failure on line 19 in examples/flows/simple.ts

View workflow job for this annotation

GitHub Actions / Lint & Build & Test

Parameter 'emitter' implicitly has an 'any' type.
emitter.on("start", (data, event) =>
console.log(`-> step ${data.step}`, event.path, event.trace),
);
//emitter.on("error", (data) => console.log(`-> error ${data.step}`));
//emitter.on("success", (data) => console.log(`-> finish ${data.step}`));
emitter.on("start", (data) => console.log(`-> start ${data.step}`));

Check failure on line 20 in examples/flows/simple.ts

View workflow job for this annotation

GitHub Actions / Lint & Build & Test

Parameter 'data' implicitly has an 'any' type.
emitter.on("error", (data) => console.log(`-> error ${data.step}`));

Check failure on line 21 in examples/flows/simple.ts

View workflow job for this annotation

GitHub Actions / Lint & Build & Test

Parameter 'data' implicitly has an 'any' type.
emitter.on("success", (data) => console.log(`-> finish ${data.step}`));

Check failure on line 22 in examples/flows/simple.ts

View workflow job for this annotation

GitHub Actions / Lint & Build & Test

Parameter 'data' implicitly has an 'any' type.
});

console.log(`Hops: ${response.result.hops}`);
Expand Down
125 changes: 100 additions & 25 deletions src/flows.ts → src/experimental/flows.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* Copyright 2024 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { z, ZodSchema } from "zod";
import { Serializable } from "@/internals/serializable.js";
import { Callback, Emitter } from "@/emitter/emitter.js";
Expand All @@ -8,7 +24,7 @@ import { FrameworkError, ValueError } from "@/errors.js";

export interface FlowStepResponse<T extends ZodSchema, K extends string> {
update?: Partial<z.output<T>>;
next?: FlowNextStep<K>;
next?: FlowStepName<K>;
}

export interface FlowRun<T extends ZodSchema, T2 extends ZodSchema, K extends string> {
Expand Down Expand Up @@ -63,7 +79,14 @@ interface FlowInput<TS extends ZodSchema, TS2 extends ZodSchema = TS> {
outputSchema?: TS2;
}

type FlowNextStep<K extends string> = K | typeof Flow.END;
type RunStep<K extends string> = K | typeof Flow.END;
type ReservedStep =
| typeof Flow.START
| typeof Flow.NEXT
| typeof Flow.PREV
| typeof Flow.END
| typeof Flow.SELF;
export type FlowStepName<K extends string> = K | ReservedStep;

export class FlowError<
T extends ZodSchema,
Expand All @@ -84,11 +107,16 @@ export class Flow<
TOutput extends ZodSchema = TInput,
TKeys extends string = string,
> extends Serializable {
public static readonly START = "__start__";
public static readonly SELF = "__self__";
public static readonly PREV = "__prev__";
public static readonly NEXT = "__next__";
public static readonly END = "__end__";

public readonly emitter: Emitter<FlowEvents<TInput, TOutput, TKeys>>;

protected readonly steps = new Map<string, FlowStepDef<TInput, TKeys>>();
protected startStep: TKeys | null = null;
protected startStep: TKeys | undefined = undefined;

constructor(protected readonly input: FlowInput<TInput, TOutput>) {
super();
Expand All @@ -98,30 +126,55 @@ export class Flow<
});
}

getSteps() {
return Array.from(this.steps.keys()) as TKeys[];
}

get name() {
return this.input.name ?? "";
}

get schemas() {
return pick(this.input, ["schema", "outputSchema"]);
}

addStep<L extends string>(
name: L,
step: FlowStepHandler<TInput, TKeys> | Flow<TInput, TInput, TKeys>,
handler: FlowStepHandler<TInput, TKeys>,
): Flow<TInput, TOutput, L | TKeys>;
addStep<L extends string, TFlow extends AnyFlow>(
name: L,
flow: Flow.pipeable<this, TFlow>,
): Flow<TInput, TOutput, L | TKeys>;
addStep<L extends string>(
name: L,
step: FlowStepHandler<TInput, TKeys> | AnyFlow,
): Flow<TInput, TOutput, L | TKeys> {
return this._addStep(name, step);
}

addStrictStep<L extends string, TI2 extends ZodSchema>(
name: L,
schema: TI2,
step: FlowStepHandler<TI2, TKeys> | Flow<TInput, TInput, TKeys>,
handler: FlowStepHandler<TI2, TKeys>,
): Flow<TInput, TOutput, L | TKeys>;
addStrictStep<L extends string, TI2 extends ZodSchema, TFlow extends AnyFlow>(
name: L,
schema: TI2,
flow: Flow.pipeable<Flow<TI2, TOutput, TKeys>, TFlow>,
): Flow<TInput, TOutput, L | TKeys>;
addStrictStep<L extends string, TI2 extends ZodSchema>(
name: L,
schema: TI2,
step: FlowStepHandler<TI2, TKeys> | AnyFlow,
): Flow<TInput, TOutput, L | TKeys> {
return this._addStep(name, schema, step);
}

protected _addStep<TI2 extends ZodSchema = TInput, L extends string = TKeys>(
name: L,
schemaOrStep: TI2 | FlowStepHandler<TInput, TKeys> | Flow<TInput, TInput, TKeys>,
stepOrEmpty?: FlowStepHandler<TI2, TKeys> | Flow<TInput, TInput, TKeys>,
next?: FlowNextStep<TKeys>,
schemaOrStep: TI2 | FlowStepHandler<TInput, TKeys> | AnyFlow,
stepOrEmpty?: FlowStepHandler<TI2, TKeys> | AnyFlow,
): Flow<TInput, TOutput, L | TKeys> {
if (!name.trim()) {
throw new ValueError(`Step name cannot be empty!`);
Expand All @@ -138,7 +191,7 @@ export class Flow<

this.steps.set(name, {
schema,
handler: stepOrFlow instanceof Flow ? stepOrFlow.asStep({ next }) : stepOrFlow,
handler: stepOrFlow instanceof Flow ? stepOrFlow.asStep({}) : stepOrFlow,
} as FlowStepDef<TInput, TKeys>);

return this as unknown as Flow<TInput, TOutput, L | TKeys>;
Expand All @@ -165,35 +218,48 @@ export class Flow<
abort: (reason) => runContext.abort(reason),
};

let stepName = options?.start || this.startStep || this.findNextStep();
while (stepName !== Flow.END) {
const step = this.steps.get(stepName);
let next: RunStep<TKeys> =
this.findStep(options?.start || this.startStep || this.getSteps()[0]).current ?? Flow.END;

while (next && next !== Flow.END) {
const step = this.steps.get(next);
if (!step) {
throw new FlowError(`Step '${stepName}' was not found.`, { run });
throw new FlowError(`Step '${next}' was not found.`, { run });
}
run.steps.push({ name: stepName, state: run.state });
await runContext.emitter.emit("start", { run, step: stepName });
run.steps.push({ name: next, state: run.state });
await runContext.emitter.emit("start", { run, step: next });
try {
const stepInput = await step.schema.parseAsync(run.state).catch((err: Error) => {
throw new FlowError(
`Step '${stepName}' cannot be executed because the provided input doesn't adhere to the step's schema.`,
`Step '${next}' cannot be executed because the provided input doesn't adhere to the step's schema.`,
{ run: shallowCopy(run), errors: [err] },
);
});
const response = await step.handler(stepInput, handlers);
await runContext.emitter.emit("success", {
run: shallowCopy(run),
response,
step: stepName as TKeys,
step: next,
});
if (response.update) {
run.state = { ...run.state, ...response.update };
}
stepName = response.next || this.findNextStep(stepName);

if (response.next === Flow.START) {
next = run.steps.at(0)?.name!;
} else if (response.next === Flow.PREV) {
next = run.steps.at(-2)?.name!;
} else if (response.next === Flow.NEXT) {
next = this.findStep(next).next;
} else if (response.next === Flow.SELF) {
next = run.steps.at(-1)?.name!;
} else {
next = response.next || Flow.END;
}
} catch (error) {
await runContext.emitter.emit("error", {
run: shallowCopy(run),
step: stepName as TKeys,
step: next as TKeys,
error,
});
throw error;
Expand All @@ -208,7 +274,7 @@ export class Flow<

delStep<L extends TKeys>(name: L): Flow<TInput, TOutput, Exclude<TKeys, L>> {
if (this.startStep === name) {
this.startStep = null;
this.startStep = undefined;
}
this.steps.delete(name);
return this as unknown as Flow<TInput, TOutput, Exclude<TKeys, L>>;
Expand All @@ -222,7 +288,7 @@ export class Flow<
input?: (input: z.output<TInput2>) => z.output<TInput> | z.input<TInput>;
output?: (output: z.output<TOutput>) => z.output<TOutput2> | z.input<TOutput2>;
start?: TKeys;
next?: FlowNextStep<TKeys2>;
next?: FlowStepName<TKeys2>;
}): FlowStepHandler<TInput2, TKeys | TKeys2> {
return async (input, ctx) => {
const mappedInput = overrides?.input ? overrides.input(input) : input;
Expand All @@ -236,10 +302,14 @@ export class Flow<
};
}

protected findNextStep(start: TKeys | null = null): FlowNextStep<TKeys> {
const keys = Array.from(this.steps.keys()) as TKeys[];
const curIndex = start ? keys.indexOf(start) : -1;
return keys[curIndex + 1] ?? Flow.END;
protected findStep(current: TKeys) {
const steps = this.getSteps();
const index = steps.indexOf(current);
return {
prev: steps[index - 1],
current: steps[index],
next: steps[index + 1],
};
}

createSnapshot() {
Expand All @@ -260,7 +330,12 @@ export class Flow<

// eslint-disable-next-line @typescript-eslint/no-namespace
export namespace Flow {
export type pipeable<T extends AnyFlow, T2 extends AnyFlow> =
Flow.state<T> extends Flow.input<T2> ? T2 : never;
export type input<T> = T extends Flow<infer A, any, any> ? z.input<A> : never;
export type state<T> = T extends Flow<infer A, any, any> ? z.output<A> : never;
export type output<T> = T extends Flow<any, infer A, any> ? z.output<A> : never;
export type run<T> = T extends Flow<infer A, infer B, infer C> ? FlowRun<A, B, C> : never;
}

export type AnyFlow = Flow<any, any, any>;

0 comments on commit d955f7d

Please sign in to comment.