Skip to content

Commit

Permalink
feat: let users pass sync/async functions for resolving option values…
Browse files Browse the repository at this point in the history
… for headers/params (#2184)

Fix #2181

This PR adds support for function-based options in the TypeScript
client's params and headers. Functions can be either synchronous or
asynchronous and are resolved in parallel when needed.

```typescript
const stream = new ShapeStream({
  url: 'http://localhost:3000/v1/shape',
  params: {
    table: 'items',
    userId: () => getCurrentUserId(),
    filter: async () => await getUserPreferences()
  },
  headers: {
    'Authorization': async () => `Bearer ${await getAccessToken()}`
  }
})
```

## Common Use Cases
- Authentication tokens that need to be refreshed
- User-specific parameters that may change
- Dynamic filtering based on current state
- Multi-tenant applications where context determines the request

## Design Decision
We chose to implement this using direct function values rather than a
middleware/interceptor pattern (as seen in libraries like Axios or ky)
for several reasons:

- Simplicity: Direct function values in config objects are more
intuitive than middleware chains. There's a clear 1:1 relationship
between the option and its value resolution.
- Granularity: Each param/header can be individually dynamic. No need to
set up middleware just to make a single value dynamic.
- Performance: All async functions are resolved in parallel by default,
which isn't always the case with sequential middleware chains.
- Focused API Surface: As we're not a general-purpose HTTP client, we
don't need the complexity of a full middleware system. This approach
keeps our API surface smaller and more focused on Electric's specific
needs.
  • Loading branch information
KyleAMathews authored Dec 19, 2024
1 parent b9d31ea commit dd5aeab
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 57 deletions.
26 changes: 26 additions & 0 deletions .changeset/wicked-papayas-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
"@electric-sql/client": patch
"@electric-sql/docs": patch
---

This PR adds support for function-based options in the TypeScript client's params and headers. Functions can be either synchronous or asynchronous and are resolved in parallel when needed.

```typescript
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items',
userId: () => getCurrentUserId(),
filter: async () => await getUserPreferences()
},
headers: {
'Authorization': async () => `Bearer ${await getAccessToken()}`
}
})
```

## Common Use Cases
- Authentication tokens that need to be refreshed
- User-specific parameters that may change
- Dynamic filtering based on current state
- Multi-tenant applications where context determines the request
2 changes: 1 addition & 1 deletion packages/typescript-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Real-time Postgres sync for modern apps.

Electric provides an [HTTP interface](https://electric-sql.com/docs/api/http) to Postgres to enable a massive number of clients to query and get real-time updates to subsets of the database, called [Shapes](https://electric-sql.com//docs/guides/shapes). In this way, Electric turns Postgres into a real-time database.

The TypeScript client helps ease reading Shapes from the HTTP API in the browser and other JavaScript environments, such as edge functions and server-side Node/Bun/Deno applications. It supports both fine-grained and coarse-grained reactivity patterns — you can subscribe to see every row that changes, or you can just subscribe to get the whole shape whenever it changes.
The TypeScript client helps ease reading Shapes from the HTTP API in the browser and other JavaScript environments, such as edge functions and server-side Node/Bun/Deno applications. It supports both fine-grained and coarse-grained reactivity patterns — you can subscribe to see every row that changes, or you can just subscribe to get the whole shape whenever it changes. The client also supports dynamic options through function-based params and headers, making it easy to handle auth tokens, user context, and other runtime values.

## Install

Expand Down
152 changes: 109 additions & 43 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import {
REPLICA_PARAM,
} from './constants'

const RESERVED_PARAMS = new Set([
const RESERVED_PARAMS: Set<ReservedParamKeys> = new Set([
LIVE_CACHE_BUSTER_QUERY_PARAM,
SHAPE_HANDLE_QUERY_PARAM,
LIVE_QUERY_PARAM,
Expand All @@ -50,7 +50,7 @@ type Replica = `full` | `default`
/**
* PostgreSQL-specific shape parameters that can be provided externally
*/
type PostgresParams = {
export interface PostgresParams {
/** The root table for the shape. Not required if you set the table in your proxy. */
table?: string

Expand All @@ -76,22 +76,33 @@ type PostgresParams = {
replica?: Replica
}

type ParamValue =
| string
| string[]
| (() => string | string[] | Promise<string | string[]>)

/**
* External params type - what users provide.
* Excludes reserved parameters to prevent dynamic variations that could cause stream shape changes.
*/
export type ExternalParamsRecord = {
[K in string as K extends ReservedParamKeys ? never : K]:
| ParamValue
| undefined
} & Partial<PostgresParams>

type ReservedParamKeys =
| typeof COLUMNS_QUERY_PARAM
| typeof LIVE_CACHE_BUSTER_QUERY_PARAM
| typeof SHAPE_HANDLE_QUERY_PARAM
| typeof LIVE_QUERY_PARAM
| typeof OFFSET_QUERY_PARAM
| typeof TABLE_QUERY_PARAM
| typeof WHERE_QUERY_PARAM
| typeof REPLICA_PARAM

/**
* External params type - what users provide.
* Includes documented PostgreSQL params and allows string or string[] values for any additional params.
* External headers type - what users provide.
* Allows string or function values for any header.
*/
type ExternalParamsRecord = Partial<PostgresParams> & {
[K in string as K extends ReservedParamKeys ? never : K]: string | string[]
export type ExternalHeadersRecord = {
[key: string]: string | (() => string | Promise<string>)
}

/**
Expand All @@ -103,19 +114,59 @@ type InternalParamsRecord = {
}

/**
* Helper function to convert external params to internal format
* Helper function to resolve a function or value to its final value
*/
function toInternalParams(params: ExternalParamsRecord): InternalParamsRecord {
const result: InternalParamsRecord = {}
for (const [key, value] of Object.entries(params)) {
result[key] = Array.isArray(value) ? value.join(`,`) : value
export async function resolveValue<T>(
value: T | (() => T | Promise<T>)
): Promise<T> {
if (typeof value === `function`) {
return (value as () => T | Promise<T>)()
}
return result
return value
}

/**
* Helper function to convert external params to internal format
*/
async function toInternalParams(
params: ExternalParamsRecord
): Promise<InternalParamsRecord> {
const entries = Object.entries(params)
const resolvedEntries = await Promise.all(
entries.map(async ([key, value]) => {
if (value === undefined) return [key, undefined]
const resolvedValue = await resolveValue(value)
return [
key,
Array.isArray(resolvedValue) ? resolvedValue.join(`,`) : resolvedValue,
]
})
)

return Object.fromEntries(
resolvedEntries.filter(([_, value]) => value !== undefined)
)
}

/**
* Helper function to resolve headers
*/
async function resolveHeaders(
headers?: ExternalHeadersRecord
): Promise<Record<string, string>> {
if (!headers) return {}

const entries = Object.entries(headers)
const resolvedEntries = await Promise.all(
entries.map(async ([key, value]) => [key, await resolveValue(value)])
)

return Object.fromEntries(resolvedEntries)
}

type RetryOpts = {
params?: ExternalParamsRecord
headers?: Record<string, string>
headers?: ExternalHeadersRecord
}

type ShapeStreamErrorHandler = (
Expand Down Expand Up @@ -150,12 +201,18 @@ export interface ShapeStreamOptions<T = never> {

/**
* HTTP headers to attach to requests made by the client.
* Can be used for adding authentication headers.
* Values can be strings or functions (sync or async) that return strings.
* Function values are resolved in parallel when needed, making this useful
* for authentication tokens or other dynamic headers.
*/
headers?: Record<string, string>
headers?: ExternalHeadersRecord

/**
* Additional request parameters to attach to the URL.
* Values can be strings, string arrays, or functions (sync or async) that return these types.
* Function values are resolved in parallel when needed, making this useful
* for user-specific parameters or dynamic filters.
*
* These will be merged with Electric's standard parameters.
* Note: You cannot use Electric's reserved parameter names
* (offset, handle, live, cursor).
Expand Down Expand Up @@ -320,22 +377,23 @@ export class ShapeStream<T extends Row<unknown> = Row>
) {
const { url, signal } = this.options

const fetchUrl = new URL(url)
// Resolve headers and params in parallel
const [requestHeaders, params] = await Promise.all([
resolveHeaders(this.options.headers),
this.options.params
? toInternalParams(this.options.params)
: undefined,
])

// Validate params after resolution
if (params) {
validateParams(params)
}

// Add any custom parameters first
if (this.options.params) {
// Check for reserved parameter names
const reservedParams = Object.keys(this.options.params).filter(
(key) => RESERVED_PARAMS.has(key)
)
if (reservedParams.length > 0) {
throw new Error(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
)
}
const fetchUrl = new URL(url)

// Add PostgreSQL-specific parameters from params
const params = toInternalParams(this.options.params)
// Add PostgreSQL-specific parameters
if (params) {
if (params.table)
fetchUrl.searchParams.set(TABLE_QUERY_PARAM, params.table)
if (params.where)
Expand Down Expand Up @@ -383,7 +441,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
try {
response = await this.#fetchClient(fetchUrl.toString(), {
signal,
headers: this.options.headers,
headers: requestHeaders,
})
this.#connected = true
} catch (e) {
Expand Down Expand Up @@ -550,6 +608,21 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
}

/**
* Validates that no reserved parameter names are used in the provided params object
* @throws {ReservedParamError} if any reserved parameter names are found
*/
function validateParams(params: Record<string, unknown> | undefined): void {
if (!params) return

const reservedParams = Object.keys(params).filter((key) =>
RESERVED_PARAMS.has(key as ReservedParamKeys)
)
if (reservedParams.length > 0) {
throw new ReservedParamError(reservedParams)
}
}

function validateOptions<T>(options: Partial<ShapeStreamOptions<T>>): void {
if (!options.url) {
throw new MissingShapeUrlError()
Expand All @@ -566,14 +639,7 @@ function validateOptions<T>(options: Partial<ShapeStreamOptions<T>>): void {
throw new MissingShapeHandleError()
}

// Check for reserved parameter names
if (options.params) {
const reservedParams = Object.keys(options.params).filter((key) =>
RESERVED_PARAMS.has(key)
)
if (reservedParams.length > 0) {
throw new ReservedParamError(reservedParams)
}
}
validateParams(options.params)

return
}
43 changes: 43 additions & 0 deletions packages/typescript-client/test/client.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@ import {
Message,
isChangeMessage,
ShapeData,
ExternalParamsRecord,
} from '../src'
import {
COLUMNS_QUERY_PARAM,
LIVE_CACHE_BUSTER_QUERY_PARAM,
SHAPE_HANDLE_QUERY_PARAM,
LIVE_QUERY_PARAM,
OFFSET_QUERY_PARAM,
} from '../src/constants'

type CustomRow = {
foo: number
Expand Down Expand Up @@ -51,6 +59,41 @@ describe(`client`, () => {
}
})
})

describe(`params validation`, () => {
it(`should allow valid params`, () => {
const validParams: ExternalParamsRecord = {
// PostgreSQL params
table: `users`,
columns: [`id`, `name`],
where: `id > 0`,
replica: `full`,

// Custom params
customParam: `value`,
customArrayParam: [`value1`, `value2`],
customFunctionParam: () => `value`,
customAsyncFunctionParam: async () => [`value1`, `value2`],
}
expectTypeOf(validParams).toEqualTypeOf<ExternalParamsRecord>()
})

it(`should not allow reserved params`, () => {
// Test that reserved parameters are not allowed in ExternalParamsRecord
type WithReservedParam1 = { [COLUMNS_QUERY_PARAM]: string[] }
type WithReservedParam2 = { [LIVE_CACHE_BUSTER_QUERY_PARAM]: string }
type WithReservedParam3 = { [SHAPE_HANDLE_QUERY_PARAM]: string }
type WithReservedParam4 = { [LIVE_QUERY_PARAM]: string }
type WithReservedParam5 = { [OFFSET_QUERY_PARAM]: string }

// These should all not be equal to ExternalParamsRecord (not assignable)
expectTypeOf<WithReservedParam1>().not.toEqualTypeOf<ExternalParamsRecord>()
expectTypeOf<WithReservedParam2>().not.toEqualTypeOf<ExternalParamsRecord>()
expectTypeOf<WithReservedParam3>().not.toEqualTypeOf<ExternalParamsRecord>()
expectTypeOf<WithReservedParam4>().not.toEqualTypeOf<ExternalParamsRecord>()
expectTypeOf<WithReservedParam5>().not.toEqualTypeOf<ExternalParamsRecord>()
})
})
})

describe(`Shape`, () => {
Expand Down
49 changes: 48 additions & 1 deletion packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { testWithIssuesTable as it } from './support/test-context'
import { ShapeStream, Shape, FetchError } from '../src'
import { Message, Row, ChangeMessage } from '../src/types'
import { MissingHeadersError } from '../src/error'
import { resolveValue } from '../src'

const BASE_URL = inject(`baseUrl`)

Expand Down Expand Up @@ -188,7 +189,6 @@ describe(`Shape`, () => {
fetchClient: fetchWrapper,
})
const shape = new Shape(shapeStream)

let dataUpdateCount = 0
await new Promise<void>((resolve, reject) => {
setTimeout(() => reject(`Timed out waiting for data changes`), 1000)
Expand Down Expand Up @@ -671,6 +671,53 @@ describe(`Shape`, () => {
await clearIssuesShape(shapeStream.shapeHandle)
}
})

it(`should support function-based params and headers`, async ({
issuesTableUrl,
}) => {
const mockParamFn = vi.fn().mockReturnValue(`test-value`)
const mockAsyncParamFn = vi.fn().mockResolvedValue(`test-value`)
const mockHeaderFn = vi.fn().mockReturnValue(`test-value`)
const mockAsyncHeaderFn = vi.fn().mockResolvedValue(`test-value`)

// Test with synchronous functions
const shapeStream1 = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
customParam: mockParamFn,
},
headers: {
'X-Custom-Header': mockHeaderFn,
},
})
const shape1 = new Shape(shapeStream1)
await shape1.value

expect(mockParamFn).toHaveBeenCalled()
expect(mockHeaderFn).toHaveBeenCalled()

// Test with async functions
const shapeStream2 = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
customParam: mockAsyncParamFn,
},
headers: {
'X-Custom-Header': mockAsyncHeaderFn,
},
})
const shape2 = new Shape(shapeStream2)
await shape2.value

expect(mockAsyncParamFn).toHaveBeenCalled()
expect(mockAsyncHeaderFn).toHaveBeenCalled()

// Verify the resolved values
expect(await resolveValue(mockParamFn())).toBe(`test-value`)
expect(await resolveValue(mockAsyncParamFn())).toBe(`test-value`)
})
})

function waitForFetch(stream: ShapeStream): Promise<void> {
Expand Down
Loading

0 comments on commit dd5aeab

Please sign in to comment.