Skip to content

Commit

Permalink
Add explicit prompt/completion logging/attributes, skip tracing for s…
Browse files Browse the repository at this point in the history
…ynchronous components (#240)

1. Add explicit (conversational) prompt/completion logging and span
attributes, including token counts
2. Skip OpenTelemetry tracing for non-root synchronous components, which
drastically simplifies the traces
3. Remove universal `ai.jsx.result.tokenCount` span attribute, which
doesn't have the opportunity to handle conversational messages correctly
  • Loading branch information
petersalas authored Aug 22, 2023
1 parent 096847f commit 219aebe
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 81 deletions.
2 changes: 1 addition & 1 deletion packages/ai-jsx/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"repository": "fixie-ai/ai-jsx",
"bugs": "https://github.com/fixie-ai/ai-jsx/issues",
"homepage": "https://ai-jsx.com",
"version": "0.9.1",
"version": "0.9.2",
"volta": {
"extends": "../../package.json"
},
Expand Down
22 changes: 19 additions & 3 deletions packages/ai-jsx/src/core/conversation.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ function toConversationMessages(partialRendering: AI.PartiallyRendered[]): Conve
export async function renderToConversation(
conversation: AI.Node,
render: AI.ComponentContext['render'],
logger?: AI.ComponentContext['logger'],
logType?: 'prompt' | 'completion',
cost?: (message: ConversationMessage, render: AI.ComponentContext['render']) => Promise<number>,
budget?: number
) {
Expand All @@ -215,7 +217,21 @@ export async function renderToConversation(
) : (
conversation
);
return toConversationMessages(await render(conversationToUse, { stop: isConversationalComponent }));
const messages = toConversationMessages(await render(conversationToUse, { stop: isConversationalComponent }));

if (logger && logType) {
const loggableMessages = await Promise.all(
messages.map(async (m) => ({
element: debug(m.element, true),
...(cost && { cost: await cost(m, render) }),
}))
);

logger.setAttribute(`ai.jsx.${logType}`, JSON.stringify(loggableMessages));
logger.info({ [logType]: { messages: loggableMessages } }, `Got ${logType} conversation`);
}

return messages;
}

/**
Expand Down Expand Up @@ -258,14 +274,14 @@ export async function* Converse(
reply: (messages: ConversationMessage[], fullConversation: ConversationMessage[]) => AI.Renderable;
children: AI.Node;
},
{ render, memo }: AI.ComponentContext
{ render, memo, logger }: AI.ComponentContext
): AI.RenderableStream {
yield AI.AppendOnlyStream;

const fullConversation = [] as ConversationMessage[];
let next = memo(children);
while (true) {
const newMessages = await renderToConversation(next, render);
const newMessages = await renderToConversation(next, render, logger);
if (newMessages.length === 0) {
break;
}
Expand Down
37 changes: 26 additions & 11 deletions packages/ai-jsx/src/core/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import _ from 'lodash';
import pino from 'pino';
import { Element } from './node.js';
import opentelemetry from '@opentelemetry/api';
import { logs, type Logger as OpenTelemetryLoggerInterface } from '@opentelemetry/api-logs';
import { getEnvVar } from '../lib/util.js';

Expand All @@ -14,7 +15,9 @@ export type LogLevel = 'fatal' | 'error' | 'warn' | 'info' | 'debug' | 'trace';
/**
* A Logger represents an object that can log messages.
*/
export type Logger = Record<LogLevel, (obj: object | string, msg?: string) => void>;
export type Logger = Record<LogLevel, (obj: object | string, msg?: string) => void> & {
setAttribute: (key: string, value: string) => void;
};

/**
* An abstract class that represents an implementation of {@link Logger}.
Expand Down Expand Up @@ -64,6 +67,11 @@ export abstract class LogImplementation {
`Rendering element ${elementTag} failed with exception: ${exception}`
);
}

/**
* Sets an attribute to be associated with the rendering of a particular element.
*/
setAttribute(_element: Element<any>, _renderId: string, _key: string, _value: string) {}
}

/**
Expand Down Expand Up @@ -129,6 +137,14 @@ export class OpenTelemetryLogger extends LogImplementation {
},
});
}

setAttribute(element: Element<any>, renderId: string, key: string, value: string): void {
// Set the attribute on the current span.
const span = opentelemetry.trace.getActiveSpan();
if (span) {
span.setAttribute(key, value);
}
}
}

/**
Expand All @@ -139,18 +155,16 @@ export class CombinedLogger extends LogImplementation {
super();
}

log(
level: LogLevel,
element: Element<any>,
renderId: string,
metadataOrMessage: string | object,
message?: string | undefined
): void {
this.loggers.forEach((l) => l.log(level, element, renderId, metadataOrMessage, message));
log(...args: Parameters<LogImplementation['log']>): void {
this.loggers.forEach((l) => l.log(...args));
}

logException(...args: Parameters<LogImplementation['logException']>): void {
this.loggers.forEach((l) => l.logException(...args));
}

logException(element: Element<object>, renderId: string, exception: unknown): void {
this.loggers.forEach((l) => l.logException(element, renderId, exception));
setAttribute(...args: Parameters<LogImplementation['setAttribute']>): void {
this.loggers.forEach((l) => l.setAttribute(...args));
}
}

Expand All @@ -170,4 +184,5 @@ export class BoundLogger implements Logger {
info = (obj: object | string, msg?: string) => this.impl.log('info', this.element, this.renderId, obj, msg);
debug = (obj: object | string, msg?: string) => this.impl.log('debug', this.element, this.renderId, obj, msg);
trace = (obj: object | string, msg?: string) => this.impl.log('trace', this.element, this.renderId, obj, msg);
setAttribute = (key: string, value: string) => this.impl.setAttribute(this.element, this.renderId, key, value);
}
6 changes: 5 additions & 1 deletion packages/ai-jsx/src/core/memoize.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Renderable, RenderContext, AppendOnlyStream, RenderableStream } from './render.js';
import { Node, getReferencedNode, isIndirectNode, makeIndirectNode, isElement } from './node.js';
import { Logger } from './log.js';
import { bindAsyncGeneratorToActiveContext } from './opentelemetry.js';

let lastMemoizedId = 0;
/** @hidden */
Expand Down Expand Up @@ -71,7 +72,10 @@ export function partialMemo(renderable: Renderable, existingId?: number): Node |

// It's an async iterable (which might be mutable). We set up some machinery to buffer the
// results so that we can create memoized iterators as necessary.
const generator = renderable[Symbol.asyncIterator]();
const unboundGenerator = renderable[Symbol.asyncIterator]();

// N.B. Async context doesn't get bound to the generator, so we need to do that manually.
const generator = bindAsyncGeneratorToActiveContext(unboundGenerator);
const sink: (Renderable | typeof AppendOnlyStream)[] = [];
let finalResult: Renderable | typeof AppendOnlyStream = null;
let completed = false;
Expand Down
62 changes: 35 additions & 27 deletions packages/ai-jsx/src/core/opentelemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import { PartiallyRendered, StreamRenderer } from './render.js';
import { Element, isElement } from './node.js';
import { memoizedIdSymbol } from './memoize.js';
import { debug } from './debug.js';
import { getEncoding } from 'js-tiktoken';
import _ from 'lodash';
import { getEnvVar } from '../lib/util.js';

function bindAsyncGeneratorToActiveContext<T = unknown, TReturn = any, TNext = unknown>(
export function bindAsyncGeneratorToActiveContext<T = unknown, TReturn = any, TNext = unknown>(
generator: AsyncGenerator<T, TReturn, TNext>
): AsyncGenerator<T, TReturn, TNext> {
const activeContext = context.active();
Expand All @@ -31,24 +30,37 @@ interface MemoizedIdAndOwner {
isOwner: boolean;
}

// Indicates whether we're currently tracing _any_ component so that we can skip
// tracing non-root synchronous components.
const isTracingComponent = createContextKey('Whether the current async context is tracing a component');

// Indicates whether the current async context is responsible for tracing the memoized subtree.
const activeMemoizedIdStorage = createContextKey('The active MemoizedIdAndOwner');

function spanAttributesForElement(element: Element<any>): opentelemetry.Attributes {
return { 'ai.jsx.tag': element.tag.name, 'ai.jsx.tree': debug(element, true) };
}

const getEncoder = _.once(() => getEncoding('cl100k_base'));

export function openTelemetryStreamRenderer(streamRenderer: StreamRenderer): StreamRenderer {
const traceEveryComponent = Boolean(getEnvVar('AIJSX_OPENTELEMETRY_TRACE_ALL', false));
const tracer = opentelemetry.trace.getTracer('ai.jsx');

return (renderContext, renderable, shouldStop, appendOnly) => {
const wrappedRender: StreamRenderer = (renderContext, renderable, shouldStop, appendOnly) => {
if (!isElement(renderable)) {
return streamRenderer(renderContext, renderable, shouldStop, appendOnly);
}

const activeContext = context.active();
let activeContext = context.active();
if (!traceEveryComponent && renderable.tag.constructor === Function && activeContext.getValue(isTracingComponent)) {
// Don't trace synchronous functions, though note that this also means that non-async
// functions that simply return promises won't be traced.
return streamRenderer(renderContext, renderable, shouldStop, appendOnly);
}

if (!activeContext.getValue(isTracingComponent)) {
activeContext = activeContext.setValue(isTracingComponent, true);
}

let startup = (_span: opentelemetry.Span) => {};
let cleanup = (_span: opentelemetry.Span) => {};
const renderInSpan = (span: opentelemetry.Span) => {
Expand All @@ -66,9 +78,6 @@ export function openTelemetryStreamRenderer(streamRenderer: StreamRenderer): Str
const resultIsPartial = result.find(isElement);
// Record the rendered value.
span.setAttribute('ai.jsx.result', resultIsPartial ? debug(result, true) : result.join(''));
if (!resultIsPartial) {
span.setAttribute('ai.jsx.result.tokenCount', getEncoder().encode(result.join('')).length);
}
}
cleanup(span);
span.end();
Expand All @@ -91,13 +100,11 @@ export function openTelemetryStreamRenderer(streamRenderer: StreamRenderer): Str
// Nothing is currently tracing this memoized subtree.
startup = (span) => memoizedIdsToActiveSpanContexts.set(memoizedId, span.spanContext());
cleanup = () => memoizedIdsToActiveSpanContexts.delete(memoizedId);
const newContext = activeContext.setValue(activeMemoizedIdStorage, { id: memoizedId, isOwner: true });
return context.with(newContext, () =>
tracer.startActiveSpan(
`<${renderable.tag.name}>`,
{ attributes: spanAttributesForElement(renderable) },
renderInSpan
)
return tracer.startActiveSpan(
`<${renderable.tag.name}>`,
{ attributes: spanAttributesForElement(renderable) },
activeContext.setValue(activeMemoizedIdStorage, { id: memoizedId, isOwner: true }),
renderInSpan
);
}

Expand All @@ -107,16 +114,14 @@ export function openTelemetryStreamRenderer(streamRenderer: StreamRenderer): Str
if (activeMemoizedIdAndOwner === undefined || activeMemoizedIdAndOwner.id !== memoizedId) {
// We're entering a memoized subtree that's already being actively rendered. Create a
// span that shows that we're blocked on the memoized subtree.
const newContext = context.active().setValue(activeMemoizedIdStorage, { id: memoizedId, isOwner: false });
return context.with(newContext, () =>
tracer.startActiveSpan(
`<Memoized.${renderable.tag.name}>`,
{
attributes: { 'ai.jsx.memoized': true, ...spanAttributesForElement(renderable) },
links: [{ context: activeSpanContext }],
},
renderInSpan
)
return tracer.startActiveSpan(
`<Memoized.${renderable.tag.name}>`,
{
attributes: { 'ai.jsx.memoized': true, ...spanAttributesForElement(renderable) },
links: [{ context: activeSpanContext }],
},
activeContext.setValue(activeMemoizedIdStorage, { id: memoizedId, isOwner: false }),
renderInSpan
);
}

Expand All @@ -131,7 +136,10 @@ export function openTelemetryStreamRenderer(streamRenderer: StreamRenderer): Str
return tracer.startActiveSpan(
`<${renderable.tag.name}>`,
{ attributes: spanAttributesForElement(renderable) },
activeContext,
renderInSpan
);
};

return wrappedRender;
}
2 changes: 1 addition & 1 deletion packages/ai-jsx/src/core/render.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const AppendOnlyStream = Symbol('AI.appendOnlyStream');
* A RenderableStream represents an async iterable that yields {@link Renderable}s.
*/
export interface RenderableStream {
[Symbol.asyncIterator]: () => AsyncIterator<
[Symbol.asyncIterator]: () => AsyncGenerator<
Renderable | typeof AppendOnlyStream,
Renderable | typeof AppendOnlyStream
>;
Expand Down
12 changes: 7 additions & 5 deletions packages/ai-jsx/src/lib/anthropic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export async function* AnthropicChatModel(
const messages = await Promise.all(
// TODO: Support token budget/conversation shrinking
(
await renderToConversation(props.children, render)
await renderToConversation(props.children, render, logger, 'prompt')
)
.flatMap<Exclude<ConversationMessage, { type: 'system' }>>((message) => {
if (message.type === 'system') {
Expand Down Expand Up @@ -195,12 +195,14 @@ export async function* AnthropicChatModel(
logger.debug({ completion: accumulatedContent }, 'Anthropic completion finished');
return AI.AppendOnlyStream;
};
const assistantStream = memo(
<Stream {...debugRepresentation(() => `${accumulatedContent}${complete ? '' : '▮'}`)} />
const assistantMessage = memo(
<AssistantMessage>
{<Stream {...debugRepresentation(() => `${accumulatedContent}${complete ? '' : '▮'}`)} />}
</AssistantMessage>
);
yield <AssistantMessage>{assistantStream}</AssistantMessage>;
yield assistantMessage;

// Flush the stream to ensure that this element completes rendering only after the stream has completed.
await render(assistantStream);
await renderToConversation(assistantMessage, render, logger, 'completion');
return AI.AppendOnlyStream;
}
27 changes: 17 additions & 10 deletions packages/ai-jsx/src/lib/openai.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { Node } from '../index.js';
import { ChatOrCompletionModelOrBoth } from './model.js';
import { getEnvVar, patchedUntruncateJson } from './util.js';
import { CreateChatCompletionRequest } from 'openai';
import { debug, debugRepresentation } from '../core/debug.js';
import { debugRepresentation } from '../core/debug.js';
import { getEncoding } from 'js-tiktoken';
import _ from 'lodash';

Expand Down Expand Up @@ -406,10 +406,12 @@ export async function* OpenAIChatModel(
const conversationMessages = await renderToConversation(
props.children,
render,
logger,
'prompt',
tokenCountForConversationMessage,
promptTokenLimit
);
logger.debug({ messages: conversationMessages.map((m) => debug(m.element, true)) }, 'Got input conversation');

const messages: ChatCompletionRequestMessage[] = await Promise.all(
conversationMessages.map(async (message) => {
switch (message.type) {
Expand Down Expand Up @@ -510,6 +512,7 @@ export async function* OpenAIChatModel(

let isAssistant = false;
let delta = await advance();
const outputMessages = [] as AI.Node[];
while (delta !== null) {
if (delta.role === 'assistant') {
isAssistant = true;
Expand All @@ -536,16 +539,19 @@ export async function* OpenAIChatModel(

return AI.AppendOnlyStream;
};
const assistantStream = memo(
<Stream {...debugRepresentation(() => `${accumulatedContent}${complete ? '' : '▮'}`)} />
const assistantMessage = memo(
<AssistantMessage>
<Stream {...debugRepresentation(() => `${accumulatedContent}${complete ? '' : '▮'}`)} />
</AssistantMessage>
);
yield <AssistantMessage>{assistantStream}</AssistantMessage>;
yield assistantMessage;

// Ensure the assistantStream is flushed by rendering it.
await render(assistantStream);
// Ensure the assistant stream is flushed by rendering it.
await render(assistantMessage);
outputMessages.push(assistantMessage);
}

// TS doesn't realize that the assistantStream closure can make `delta` be `null`.
// TS doesn't realize that the Stream closure can make `delta` be `null`.
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (delta?.function_call) {
// Memoize the stream to ensure it renders only once.
Expand Down Expand Up @@ -578,6 +584,7 @@ export async function* OpenAIChatModel(

// Ensure the functionCallStream is flushed by rendering it.
await render(functionCallStream);
outputMessages.push(functionCallStream);
}

// TS doesn't realize that the functionCallStream closure can make `delta` be `null`.
Expand All @@ -587,8 +594,8 @@ export async function* OpenAIChatModel(
}
}

logger.debug('Finished createChatCompletion');

// Render the completion conversation to log it.
await renderToConversation(outputMessages, render, logger, 'completion', tokenCountForConversationMessage);
return AI.AppendOnlyStream;
}

Expand Down
Loading

3 comments on commit 219aebe

@vercel
Copy link

@vercel vercel bot commented on 219aebe Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ai-jsx-docs – ./packages/docs

ai-jsx-docs-fixie-ai.vercel.app
ai-jsx-docs-git-main-fixie-ai.vercel.app
ai-jsx-docs.vercel.app
docs.ai-jsx.com

@vercel
Copy link

@vercel vercel bot commented on 219aebe Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ai-jsx-tutorial-nextjs – ./packages/tutorial-nextjs

ai-jsx-tutorial-nextjs.vercel.app
ai-jsx-tutorial-nextjs-fixie-ai.vercel.app
ai-jsx-tutorial-nextjs-git-main-fixie-ai.vercel.app

@vercel
Copy link

@vercel vercel bot commented on 219aebe Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ai-jsx-nextjs-demo – ./packages/nextjs-demo

ai-jsx-nextjs-demo.vercel.app
ai-jsx-nextjs-demo-fixie-ai.vercel.app
ai-jsx-nextjs-demo-git-main-fixie-ai.vercel.app

Please sign in to comment.