diff --git a/packages/cli/lib/services/__snapshots__/model.service.unit.test.ts.snap b/packages/cli/lib/services/__snapshots__/model.service.unit.test.ts.snap index ca02182728e..f7630fa0e08 100644 --- a/packages/cli/lib/services/__snapshots__/model.service.unit.test.ts.snap +++ b/packages/cli/lib/services/__snapshots__/model.service.unit.test.ts.snap @@ -312,6 +312,11 @@ export interface NangoProps { runnerFlags: RunnerFlags; debug: boolean; startedAt: Date; + endUser: { + id: number; + endUserId: string | null; + orgId: string | null; + } | null; axios?: { request?: AxiosInterceptorManager; response?: { diff --git a/packages/cli/lib/services/dryrun.service.ts b/packages/cli/lib/services/dryrun.service.ts index 8f62c22fd0f..e6f57cbbd87 100644 --- a/packages/cli/lib/services/dryrun.service.ts +++ b/packages/cli/lib/services/dryrun.service.ts @@ -329,7 +329,8 @@ export class DryRunService { validateSyncRecords: this.validation, validateSyncMetadata: false }, - startedAt: new Date() + startedAt: new Date(), + endUser: null }; if (options.saveResponses) { nangoProps.rawSaveOutput = new Map(); diff --git a/packages/data-ingestion/lib/index.ts b/packages/data-ingestion/lib/index.ts index a39691e9235..c817dd96906 100644 --- a/packages/data-ingestion/lib/index.ts +++ b/packages/data-ingestion/lib/index.ts @@ -1,6 +1,6 @@ import { BigQuery } from '@google-cloud/bigquery'; import type { BigQuery as BigQueryType } from '@google-cloud/bigquery'; -import { getLogger, isCloud } from '@nangohq/utils'; +import { getLogger, flagHasBigQuery } from '@nangohq/utils'; const logger = getLogger('BigQueryClient'); @@ -21,8 +21,44 @@ interface RunScriptRow { runTimeInSeconds: number; createdAt: number; internalIntegrationId: number | null; + endUser: { id: number; endUserId: string | null; orgId: string | null } | null; } +const fields = [ + { name: 'executionType', type: 'STRING' }, + { name: 'internalConnectionId', type: 'INTEGER' }, + { name: 'connectionId', type: 'STRING' }, + { name: 'accountId', type: 'INTEGER' }, + { name: 'accountName', type: 'STRING' }, + { name: 'scriptName', type: 'STRING' }, + { name: 'scriptType', type: 'STRING' }, + { name: 'environmentId', type: 'INTEGER' }, + { name: 'environmentName', type: 'STRING' }, + { name: 'providerConfigKey', type: 'STRING' }, + { name: 'status', type: 'STRING' }, + { name: 'syncId', type: 'STRING' }, + { name: 'content', type: 'STRING' }, + { name: 'runTimeInSeconds', type: 'FLOAT' }, + { name: 'createdAt', type: 'INTEGER' }, + { name: 'internalIntegrationId', type: 'INTEGER' }, + { name: 'endUserId', type: 'INTEGER' }, + { name: 'endUserUserId', type: 'STRING' }, + { name: 'endUserOrgId', type: 'STRING' } +] as const; + +interface TypeMap { + STRING: string; + INTEGER: number; + FLOAT: number; +} + +type RecordType = { + [K in T[number] as K['name']]: K['type'] extends keyof TypeMap ? TypeMap[K['type']] | null | undefined : never; +}; + +// Use the utility type to infer the result +type Schema = RecordType; + class BigQueryClient { private client: BigQuery; private datasetName: string; @@ -44,11 +80,13 @@ class BigQueryClient { } private async initialize() { + if (!flagHasBigQuery) { + return; + } + try { - if (isCloud) { - await this.createDataSet(); - await this.createTable(); - } + await this.createDataSet(); + await this.createTable(); } catch (e) { logger.error('Error initializing', e); } @@ -67,38 +105,59 @@ class BigQueryClient { const [exists] = await table.exists(); if (!exists) { await table.create({ - schema: { - fields: [ - { name: 'executionType', type: 'STRING' }, - { name: 'internalConnectionId', type: 'INTEGER' }, - { name: 'connectionId', type: 'STRING' }, - { name: 'accountId', type: 'INTEGER' }, - { name: 'accountName', type: 'STRING' }, - { name: 'scriptName', type: 'STRING' }, - { name: 'scriptType', type: 'STRING' }, - { name: 'environmentId', type: 'INTEGER' }, - { name: 'environmentName', type: 'STRING' }, - { name: 'providerConfigKey', type: 'STRING' }, - { name: 'status', type: 'STRING' }, - { name: 'syncId', type: 'STRING' }, - { name: 'content', type: 'STRING' }, - { name: 'runTimeInSeconds', type: 'FLOAT' }, - { name: 'createdAt', type: 'INTEGER' }, - { name: 'internalIntegrationId', type: 'INTEGER' } - ] - } + schema: { fields } }); + } else { + // If the table exists, retrieve the current schema + const [metadata] = await table.getMetadata(); + const existingFields = metadata.schema.fields as { name: string }[]; + + // Add new fields that don't already exist in the schema + const existingFieldNames = existingFields.map((field) => field.name); + const newFields = fields.filter((field) => !existingFieldNames.includes(field.name)); + + if (newFields.length > 0) { + // Update the schema with the new fields + const updatedFields = [...existingFields, ...newFields]; + await table.setMetadata({ + schema: { fields: updatedFields } + }); + logger.info('Schema updated successfully with new fields:', newFields); + } } } public async insert(data: RunScriptRow, tableName?: string) { + if (!flagHasBigQuery) { + return; + } + const table = tableName || this.tableName; try { - if (isCloud) { - await this.client.dataset(this.datasetName).table(table).insert(data); - } - } catch (e) { - logger.error('Error inserting into BigQuery', e); + const insertData: Schema = { + executionType: data.executionType, + internalConnectionId: data.internalConnectionId, + connectionId: data.connectionId, + accountId: data.accountId, + accountName: data.accountName, + scriptName: data.scriptName, + scriptType: data.scriptType, + environmentId: data.environmentId, + environmentName: data.environmentName, + providerConfigKey: data.providerConfigKey, + status: data.status, + syncId: data.syncId, + content: data.content, + runTimeInSeconds: data.runTimeInSeconds, + createdAt: data.createdAt, + internalIntegrationId: data.internalIntegrationId, + endUserId: data.endUser?.id, + endUserOrgId: data.endUser?.orgId, + endUserUserId: data.endUser?.endUserId + }; + await this.client.dataset(this.datasetName).table(table).insert(insertData); + } catch (err) { + logger.error('Error inserting into BigQuery', err); } } } diff --git a/packages/jobs/lib/execution/action.ts b/packages/jobs/lib/execution/action.ts index ff01d6029ca..325807dca8f 100644 --- a/packages/jobs/lib/execution/action.ts +++ b/packages/jobs/lib/execution/action.ts @@ -11,6 +11,7 @@ import { errorManager, featureFlags, getApiUrl, + getEndUserByConnectionId, getSyncConfigRaw } from '@nangohq/shared'; import { logContextGetter } from '@nangohq/logs'; @@ -18,12 +19,15 @@ import type { DBEnvironment, DBTeam } from '@nangohq/types'; import { startScript } from './operations/start.js'; import { bigQueryClient, slackService } from '../clients.js'; import { getRunnerFlags } from '../utils/flags.js'; +import db from '@nangohq/database'; export async function startAction(task: TaskAction): Promise> { let account: DBTeam | undefined; let environment: DBEnvironment | undefined; let providerConfig: Config | undefined | null; let syncConfig: SyncConfig | null = null; + let endUser: NangoProps['endUser'] | null = null; + try { const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: task.connection.environment_id }); if (!accountAndEnv) { @@ -47,6 +51,11 @@ export async function startAction(task: TaskAction): Promise> { throw new Error(`Action config not found: ${task.id}`); } + const getEndUser = await getEndUserByConnectionId(db.knex, { connectionId: task.connection.id }); + if (getEndUser.isOk()) { + endUser = { id: getEndUser.value.id, endUserId: getEndUser.value.endUserId, orgId: getEndUser.value.organization?.organizationId || null }; + } + const logCtx = await logContextGetter.get({ id: String(task.activityLogId) }); await logCtx.info(`Starting action '${task.actionName}'`, { input: task.input, @@ -74,7 +83,8 @@ export async function startAction(task: TaskAction): Promise> { syncConfig: syncConfig, debug: false, runnerFlags: await getRunnerFlags(featureFlags), - startedAt: new Date() + startedAt: new Date(), + endUser }; metrics.increment(metrics.Types.ACTION_EXECUTION, 1, { accountId: account.id }); @@ -108,7 +118,8 @@ export async function startAction(task: TaskAction): Promise> { error, syncConfig, environment: { id: task.connection.environment_id, name: environment?.name || 'unknown' }, - ...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {}) + ...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {}), + endUser }); return Err(error); } @@ -145,7 +156,8 @@ export async function handleActionSuccess({ nangoProps }: { nangoProps: NangoPro content: `The action "${nangoProps.syncConfig.sync_name}" has been completed successfully.`, runTimeInSeconds: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, createdAt: Date.now(), - internalIntegrationId: nangoProps.syncConfig.nango_config_id + internalIntegrationId: nangoProps.syncConfig.nango_config_id, + endUser: nangoProps.endUser }); } @@ -165,7 +177,8 @@ export async function handleActionError({ nangoProps, error }: { nangoProps: Nan error, environment: { id: nangoProps.environmentId, name: nangoProps.environmentName || 'unknown' }, syncConfig: nangoProps.syncConfig, - ...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {}) + ...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {}), + endUser: nangoProps.endUser }); } @@ -179,7 +192,8 @@ async function onFailure({ activityLogId, syncConfig, runTime, - error + error, + endUser }: { connection: NangoConnection; team?: { id: number; name: string }; @@ -191,6 +205,7 @@ async function onFailure({ syncConfig: SyncConfig | null; runTime: number; error: NangoError; + endUser: NangoProps['endUser']; }): Promise { if (team) { void bigQueryClient.insert({ @@ -209,7 +224,8 @@ async function onFailure({ content: error.message, runTimeInSeconds: runTime, createdAt: Date.now(), - internalIntegrationId: syncConfig?.nango_config_id || null + internalIntegrationId: syncConfig?.nango_config_id || null, + endUser }); } const logCtx = await logContextGetter.get({ id: activityLogId }); diff --git a/packages/jobs/lib/execution/onEvent.ts b/packages/jobs/lib/execution/onEvent.ts index 1b6183d59ca..8bd206b99ab 100644 --- a/packages/jobs/lib/execution/onEvent.ts +++ b/packages/jobs/lib/execution/onEvent.ts @@ -2,11 +2,12 @@ import { Err, metrics, Ok } from '@nangohq/utils'; import type { Result } from '@nangohq/utils'; import type { TaskOnEvent } from '@nangohq/nango-orchestrator'; import type { Config, SyncConfig, NangoConnection, NangoProps } from '@nangohq/shared'; -import { configService, environmentService, featureFlags, getApiUrl, NangoError } from '@nangohq/shared'; +import { configService, environmentService, featureFlags, getApiUrl, getEndUserByConnectionId, NangoError } from '@nangohq/shared'; import { logContextGetter } from '@nangohq/logs'; import type { DBEnvironment, DBTeam } from '@nangohq/types'; import { startScript } from './operations/start.js'; import { bigQueryClient } from '../clients.js'; +import db from '@nangohq/database'; import { getRunnerFlags } from '../utils/flags.js'; export async function startOnEvent(task: TaskOnEvent): Promise> { @@ -14,6 +15,8 @@ export async function startOnEvent(task: TaskOnEvent): Promise> { let environment: DBEnvironment | undefined; let providerConfig: Config | undefined | null; let syncConfig: SyncConfig | null = null; + let endUser: NangoProps['endUser'] | null = null; + try { const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: task.connection.environment_id }); if (!accountAndEnv) { @@ -27,6 +30,11 @@ export async function startOnEvent(task: TaskOnEvent): Promise> { throw new Error(`Provider config not found for connection: ${task.connection.connection_id}`); } + const getEndUser = await getEndUserByConnectionId(db.knex, { connectionId: task.connection.id }); + if (getEndUser.isOk()) { + endUser = { id: getEndUser.value.id, endUserId: getEndUser.value.endUserId, orgId: getEndUser.value.organization?.organizationId || null }; + } + const logCtx = await logContextGetter.get({ id: String(task.activityLogId) }); await logCtx.info(`Starting script '${task.onEventName}'`, { @@ -72,7 +80,8 @@ export async function startOnEvent(task: TaskOnEvent): Promise> { syncConfig: syncConfig, debug: false, runnerFlags: await getRunnerFlags(featureFlags), - startedAt: new Date() + startedAt: new Date(), + endUser }; metrics.increment(metrics.Types.ON_EVENT_SCRIPT_EXECUTION, 1, { accountId: account.id }); @@ -104,7 +113,8 @@ export async function startOnEvent(task: TaskOnEvent): Promise> { error, environment: { id: task.connection.environment_id, name: environment?.name || 'unknown' }, syncConfig, - ...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {}) + ...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {}), + endUser }); return Err(error); } @@ -128,7 +138,8 @@ export async function handleOnEventSuccess({ nangoProps }: { nangoProps: NangoPr content, runTimeInSeconds: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, createdAt: Date.now(), - internalIntegrationId: nangoProps.syncConfig.nango_config_id + internalIntegrationId: nangoProps.syncConfig.nango_config_id, + endUser: nangoProps.endUser }); const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) }); await logCtx.success(); @@ -149,7 +160,8 @@ export async function handleOnEventError({ nangoProps, error }: { nangoProps: Na error, environment: { id: nangoProps.environmentId, name: nangoProps.environmentName || 'unknown' }, syncConfig: nangoProps.syncConfig, - ...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {}) + ...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {}), + endUser: nangoProps.endUser }); } @@ -162,7 +174,8 @@ async function onFailure({ activityLogId, syncConfig, runTime, - error + error, + endUser }: { connection: NangoConnection; team?: { id: number; name: string }; @@ -173,6 +186,7 @@ async function onFailure({ syncConfig: SyncConfig | null; runTime: number; error: NangoError; + endUser: NangoProps['endUser']; }): Promise { if (team) { void bigQueryClient.insert({ @@ -191,7 +205,8 @@ async function onFailure({ content: error.message, runTimeInSeconds: runTime, createdAt: Date.now(), - internalIntegrationId: syncConfig?.nango_config_id || null + internalIntegrationId: syncConfig?.nango_config_id || null, + endUser }); } const logCtx = await logContextGetter.get({ id: activityLogId }); diff --git a/packages/jobs/lib/execution/sync.ts b/packages/jobs/lib/execution/sync.ts index 43b1f74861a..c1df840c37c 100644 --- a/packages/jobs/lib/execution/sync.ts +++ b/packages/jobs/lib/execution/sync.ts @@ -21,6 +21,7 @@ import { createSyncJob, getSyncConfigRaw, getSyncJobByRunId, + getEndUserByConnectionId, featureFlags } from '@nangohq/shared'; import { Err, Ok, metrics } from '@nangohq/utils'; @@ -35,6 +36,7 @@ import { records } from '@nangohq/records'; import type { TaskSync, TaskSyncAbort } from '@nangohq/nango-orchestrator'; import { abortScript } from './operations/abort.js'; import { logger } from '../logger.js'; +import db from '@nangohq/database'; import { getRunnerFlags } from '../utils/flags.js'; export async function startSync(task: TaskSync, startScriptFn = startScript): Promise> { @@ -46,6 +48,8 @@ export async function startSync(task: TaskSync, startScriptFn = startScript): Pr let syncType: SyncType = SyncType.FULL; let providerConfig: Config | null = null; let syncConfig: SyncConfig | null = null; + let endUser: NangoProps['endUser'] | null = null; + try { lastSyncDate = await getLastSyncDate(task.syncId); providerConfig = await configService.getProviderConfig(task.connection.provider_config_key, task.connection.environment_id); @@ -71,6 +75,11 @@ export async function startSync(task: TaskSync, startScriptFn = startScript): Pr team = accountAndEnv.account; environment = accountAndEnv.environment; + const getEndUser = await getEndUserByConnectionId(db.knex, { connectionId: task.connection.id }); + if (getEndUser.isOk()) { + endUser = { id: getEndUser.value.id, endUserId: getEndUser.value.endUserId, orgId: getEndUser.value.organization?.organizationId || null }; + } + syncType = syncConfig.sync_type?.toLowerCase() === SyncType.INCREMENTAL.toLowerCase() && lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL; logCtx = await logContextGetter.create( @@ -133,7 +142,8 @@ export async function startSync(task: TaskSync, startScriptFn = startScript): Pr debug: task.debug || false, runnerFlags: await getRunnerFlags(featureFlags), startedAt: new Date(), - ...(lastSyncDate ? { lastSyncDate } : {}) + ...(lastSyncDate ? { lastSyncDate } : {}), + endUser }; if (task.debug) { @@ -168,7 +178,8 @@ export async function startSync(task: TaskSync, startScriptFn = startScript): Pr syncConfig, runTime: 0, models: syncConfig?.models || [], - error + error, + endUser }); return Err(error); } @@ -245,7 +256,8 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps models: [model], runTime, syncConfig: nangoProps.syncConfig, - error: new NangoError('sync_job_update_failure', { syncJobId: nangoProps.syncJobId, model }) + error: new NangoError('sync_job_update_failure', { syncJobId: nangoProps.syncJobId, model }), + endUser: nangoProps.endUser }); return; } @@ -381,7 +393,8 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps content: `The sync "${nangoProps.syncConfig.sync_name}" has been completed successfully.`, runTimeInSeconds: runTime, createdAt: Date.now(), - internalIntegrationId: nangoProps.syncConfig.nango_config_id + internalIntegrationId: nangoProps.syncConfig.nango_config_id, + endUser: nangoProps.endUser }); metrics.duration(metrics.Types.SYNC_TRACK_RUNTIME, Date.now() - nangoProps.startedAt.getTime()); @@ -410,7 +423,8 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps runTime: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, failureSource: ErrorSourceEnum.CUSTOMER, isCancel: false, - error: new NangoError('sync_script_failure', { error: err instanceof Error ? err.message : err }) + error: new NangoError('sync_script_failure', { error: err instanceof Error ? err.message : err }), + endUser: nangoProps.endUser }); } } @@ -444,7 +458,8 @@ export async function handleSyncError({ nangoProps, error }: { nangoProps: Nango runTime: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, failureSource: ErrorSourceEnum.CUSTOMER, isCancel: false, - error + error, + endUser: nangoProps.endUser }); } @@ -465,6 +480,7 @@ export async function abortSync(task: TaskSyncAbort): Promise> { if (!syncJob) { throw new Error(`Sync job not found for syncId: ${task.syncId}`); } + const providerConfig = await configService.getProviderConfig(task.connection.provider_config_key, task.connection.environment_id); if (providerConfig === null) { throw new Error(`Provider config not found for connection: ${task.connection}. TaskId: ${task.id}`); @@ -481,6 +497,8 @@ export async function abortSync(task: TaskSyncAbort): Promise> { throw new Error(`Sync config not found. TaskId: ${task.id}`); } + const getEndUser = await getEndUserByConnectionId(db.knex, { connectionId: task.connection.id }); + const isCancel = task.abortedTask.state === 'CANCELLED'; await onFailure({ connection: { @@ -503,7 +521,10 @@ export async function abortSync(task: TaskSyncAbort): Promise> { failureSource: ErrorSourceEnum.CUSTOMER, syncConfig, runTime: 0, - error: new NangoError('sync_script_failure', task.reason) + error: new NangoError('sync_script_failure', task.reason), + endUser: getEndUser.isOk() + ? { id: getEndUser.value.id, endUserId: getEndUser.value.endUserId, orgId: getEndUser.value.organization?.organizationId || null } + : null }); const setSuccess = await orchestratorClient.succeed({ taskId: task.id, output: {} }); if (setSuccess.isErr()) { @@ -537,7 +558,8 @@ async function onFailure({ runTime, isCancel, failureSource, - error + error, + endUser }: { team?: DBTeam | undefined; environment?: DBEnvironment | undefined; @@ -556,6 +578,7 @@ async function onFailure({ syncConfig: SyncConfig | null; failureSource?: ErrorSourceEnum; error: NangoError; + endUser: NangoProps['endUser']; }): Promise { if (team && environment) { void bigQueryClient.insert({ @@ -574,7 +597,8 @@ async function onFailure({ content: error.message, runTimeInSeconds: runTime, createdAt: Date.now(), - internalIntegrationId: syncConfig?.nango_config_id || null + internalIntegrationId: syncConfig?.nango_config_id || null, + endUser }); } diff --git a/packages/jobs/lib/execution/webhook.ts b/packages/jobs/lib/execution/webhook.ts index 7ceb78a12b2..92fba9c3c3c 100644 --- a/packages/jobs/lib/execution/webhook.ts +++ b/packages/jobs/lib/execution/webhook.ts @@ -13,6 +13,7 @@ import { externalWebhookService, featureFlags, getApiUrl, + getEndUserByConnectionId, getSyncByIdAndName, getSyncConfigRaw, updateSyncJobStatus @@ -22,6 +23,7 @@ import { logContextGetter } from '@nangohq/logs'; import type { DBEnvironment, DBTeam } from '@nangohq/types'; import { startScript } from './operations/start.js'; import { sendSync as sendSyncWebhook } from '@nangohq/webhooks'; +import db from '@nangohq/database'; import { getRunnerFlags } from '../utils/flags.js'; export async function startWebhook(task: TaskWebhook): Promise> { @@ -31,6 +33,7 @@ export async function startWebhook(task: TaskWebhook): Promise> { let sync: Sync | undefined | null; let syncJob: Pick | null = null; let syncConfig: SyncConfig | null = null; + let endUser: NangoProps['endUser'] | null = null; try { const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: task.connection.environment_id }); @@ -60,6 +63,11 @@ export async function startWebhook(task: TaskWebhook): Promise> { throw new Error(`Webhook config not found: ${task.id}`); } + const getEndUser = await getEndUserByConnectionId(db.knex, { connectionId: task.connection.id }); + if (getEndUser.isOk()) { + endUser = { id: getEndUser.value.id, endUserId: getEndUser.value.endUserId, orgId: getEndUser.value.organization?.organizationId || null }; + } + const logCtx = await logContextGetter.get({ id: String(task.activityLogId) }); await logCtx.info(`Starting webhook '${task.webhookName}'`, { @@ -104,7 +112,8 @@ export async function startWebhook(task: TaskWebhook): Promise> { syncJobId: syncJob.id, debug: false, runnerFlags: await getRunnerFlags(featureFlags), - startedAt: new Date() + startedAt: new Date(), + endUser }; metrics.increment(metrics.Types.WEBHOOK_EXECUTION, 1, { accountId: team.id }); @@ -140,7 +149,8 @@ export async function startWebhook(task: TaskWebhook): Promise> { models: syncConfig?.models || [], runTime: 0, error, - syncConfig + syncConfig, + endUser }); return Err(error); } @@ -164,7 +174,8 @@ export async function handleWebhookSuccess({ nangoProps }: { nangoProps: NangoPr content, runTimeInSeconds: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, createdAt: Date.now(), - internalIntegrationId: nangoProps.syncConfig.nango_config_id + internalIntegrationId: nangoProps.syncConfig.nango_config_id, + endUser: nangoProps.endUser }); const syncJob = await updateSyncJobStatus(nangoProps.syncJobId!, SyncStatus.SUCCESS); @@ -246,7 +257,8 @@ export async function handleWebhookError({ nangoProps, error }: { nangoProps: Na models: nangoProps.syncConfig.models, runTime: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, error, - syncConfig: nangoProps.syncConfig + syncConfig: nangoProps.syncConfig, + endUser: nangoProps.endUser }); } @@ -262,7 +274,8 @@ async function onFailure({ models, activityLogId, runTime, - error + error, + endUser }: { connection: NangoConnection; team: DBTeam | undefined; @@ -276,6 +289,7 @@ async function onFailure({ activityLogId: string; runTime: number; error: NangoError; + endUser: NangoProps['endUser']; }): Promise { if (team && environment) { void bigQueryClient.insert({ @@ -294,7 +308,8 @@ async function onFailure({ content: error.message, runTimeInSeconds: runTime, createdAt: Date.now(), - internalIntegrationId: syncConfig?.nango_config_id || null + internalIntegrationId: syncConfig?.nango_config_id || null, + endUser }); } diff --git a/packages/jobs/lib/routes/tasks/putTask.ts b/packages/jobs/lib/routes/tasks/putTask.ts index 59c0102bc80..ba12e21c2aa 100644 --- a/packages/jobs/lib/routes/tasks/putTask.ts +++ b/packages/jobs/lib/routes/tasks/putTask.ts @@ -70,6 +70,7 @@ const nangoPropsSchema = z secretKey: z.string().min(1), debug: z.boolean(), startedAt: z.coerce.date(), + endUser: z.object({ id: z.number(), endUserId: z.string().nullable(), orgId: z.string().nullable() }).nullable(), runnerFlags: z .object({ validateActionInput: z.boolean().default(false), diff --git a/packages/runner/lib/client.unit.test.ts b/packages/runner/lib/client.unit.test.ts index be465c86ae7..622cf3371fa 100644 --- a/packages/runner/lib/client.unit.test.ts +++ b/packages/runner/lib/client.unit.test.ts @@ -31,7 +31,8 @@ describe('Runner client', () => { debug: false, startedAt: new Date(), runnerFlags: {} as any, - stubbedMetadata: {} + stubbedMetadata: {}, + endUser: null }; beforeAll(() => { diff --git a/packages/runner/lib/exec.unit.test.ts b/packages/runner/lib/exec.unit.test.ts index f22805b6a7c..8c566bc3116 100644 --- a/packages/runner/lib/exec.unit.test.ts +++ b/packages/runner/lib/exec.unit.test.ts @@ -27,7 +27,8 @@ function getNangoProps(): NangoProps { debug: false, startedAt: new Date(), runnerFlags: {} as any, - stubbedMetadata: {} + stubbedMetadata: {}, + endUser: null }; } diff --git a/packages/server/lib/controllers/apiAuth.controller.ts b/packages/server/lib/controllers/apiAuth.controller.ts index 47694dcc478..8d35a4a4ddc 100644 --- a/packages/server/lib/controllers/apiAuth.controller.ts +++ b/packages/server/lib/controllers/apiAuth.controller.ts @@ -9,7 +9,8 @@ import { getConnectionConfig, ErrorSourceEnum, LogActionEnum, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import type { LogContext } from '@nangohq/logs'; import { defaultOperationExpiration, flushLogsBuffer, logContextGetter } from '@nangohq/logs'; @@ -20,7 +21,6 @@ import { connectionCreationFailed as connectionCreationFailedHook, connectionTest as connectionTestHook } from '../hooks/hooks.js'; -import { linkConnection } from '../services/endUser.service.js'; import db from '@nangohq/database'; import { hmacCheck } from '../utils/hmac.js'; import { isIntegrationAllowed } from '../utils/auth.js'; diff --git a/packages/server/lib/controllers/appAuth.controller.ts b/packages/server/lib/controllers/appAuth.controller.ts index 90ae1b10758..9c0312da66b 100644 --- a/packages/server/lib/controllers/appAuth.controller.ts +++ b/packages/server/lib/controllers/appAuth.controller.ts @@ -10,7 +10,8 @@ import { LogActionEnum, telemetry, LogTypes, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import { missesInterpolationParam } from '../utils/utils.js'; import * as WSErrBuilder from '../utils/web-socket-error.js'; @@ -19,7 +20,6 @@ import publisher from '../clients/publisher.client.js'; import { logContextGetter } from '@nangohq/logs'; import { stringifyError } from '@nangohq/utils'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js'; -import { linkConnection } from '../services/endUser.service.js'; import db from '@nangohq/database'; import type { ConnectSessionAndEndUser } from '../services/connectSession.service.js'; import { getConnectSession } from '../services/connectSession.service.js'; diff --git a/packages/server/lib/controllers/appStoreAuth.controller.ts b/packages/server/lib/controllers/appStoreAuth.controller.ts index dd3da094aa6..8f82f85b290 100644 --- a/packages/server/lib/controllers/appStoreAuth.controller.ts +++ b/packages/server/lib/controllers/appStoreAuth.controller.ts @@ -1,12 +1,21 @@ import type { Request, Response, NextFunction } from 'express'; import type { AuthCredentials } from '@nangohq/shared'; -import { errorManager, analytics, AnalyticsTypes, configService, connectionService, ErrorSourceEnum, LogActionEnum, getProvider } from '@nangohq/shared'; +import { + errorManager, + analytics, + AnalyticsTypes, + configService, + connectionService, + ErrorSourceEnum, + LogActionEnum, + getProvider, + linkConnection +} from '@nangohq/shared'; import type { LogContext } from '@nangohq/logs'; import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { stringifyError } from '@nangohq/utils'; import type { RequestLocals } from '../utils/express.js'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js'; -import { linkConnection } from '../services/endUser.service.js'; import db from '@nangohq/database'; import { hmacCheck } from '../utils/hmac.js'; import { isIntegrationAllowed } from '../utils/auth.js'; diff --git a/packages/server/lib/controllers/auth/postBill.ts b/packages/server/lib/controllers/auth/postBill.ts index ea5678741d0..1eacb75cc5d 100644 --- a/packages/server/lib/controllers/auth/postBill.ts +++ b/packages/server/lib/controllers/auth/postBill.ts @@ -11,7 +11,8 @@ import { errorManager, ErrorSourceEnum, LogActionEnum, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import type { PostPublicBillAuthorization } from '@nangohq/types'; import type { LogContext } from '@nangohq/logs'; @@ -19,7 +20,6 @@ import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { hmacCheck } from '../../utils/hmac.js'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../../hooks/hooks.js'; import { connectionCredential, connectionIdSchema, providerConfigKeySchema } from '../../helpers/validation.js'; -import { linkConnection } from '../../services/endUser.service.js'; import db from '@nangohq/database'; import { isIntegrationAllowed } from '../../utils/auth.js'; diff --git a/packages/server/lib/controllers/auth/postJwt.ts b/packages/server/lib/controllers/auth/postJwt.ts index 08992fc1eab..f619e3e58ff 100644 --- a/packages/server/lib/controllers/auth/postJwt.ts +++ b/packages/server/lib/controllers/auth/postJwt.ts @@ -11,7 +11,8 @@ import { errorManager, ErrorSourceEnum, LogActionEnum, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import type { MessageRowInsert, PostPublicJwtAuthorization, ProviderJwt } from '@nangohq/types'; import type { LogContext } from '@nangohq/logs'; @@ -23,7 +24,6 @@ import { connectionTest as connectionTestHook } from '../../hooks/hooks.js'; import { connectionCredential, connectionIdSchema, providerConfigKeySchema } from '../../helpers/validation.js'; -import { linkConnection } from '../../services/endUser.service.js'; import db from '@nangohq/database'; import { isIntegrationAllowed } from '../../utils/auth.js'; diff --git a/packages/server/lib/controllers/auth/postSignature.ts b/packages/server/lib/controllers/auth/postSignature.ts index 3658ebe45cf..1cd0ce5c66e 100644 --- a/packages/server/lib/controllers/auth/postSignature.ts +++ b/packages/server/lib/controllers/auth/postSignature.ts @@ -11,7 +11,8 @@ import { errorManager, ErrorSourceEnum, LogActionEnum, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import type { MessageRowInsert, PostPublicSignatureAuthorization, ProviderSignature } from '@nangohq/types'; import type { LogContext } from '@nangohq/logs'; @@ -23,7 +24,6 @@ import { connectionTest as connectionTestHook } from '../../hooks/hooks.js'; import { connectionCredential, connectionIdSchema, providerConfigKeySchema } from '../../helpers/validation.js'; -import { linkConnection } from '../../services/endUser.service.js'; import db from '@nangohq/database'; import { isIntegrationAllowed } from '../../utils/auth.js'; diff --git a/packages/server/lib/controllers/auth/postTableau.ts b/packages/server/lib/controllers/auth/postTableau.ts index 7938a524843..bdbdfafe21d 100644 --- a/packages/server/lib/controllers/auth/postTableau.ts +++ b/packages/server/lib/controllers/auth/postTableau.ts @@ -11,7 +11,8 @@ import { errorManager, ErrorSourceEnum, LogActionEnum, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import type { PostPublicTableauAuthorization } from '@nangohq/types'; import type { LogContext } from '@nangohq/logs'; @@ -19,7 +20,6 @@ import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { hmacCheck } from '../../utils/hmac.js'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../../hooks/hooks.js'; import { connectionCredential, connectionIdSchema, providerConfigKeySchema } from '../../helpers/validation.js'; -import { linkConnection } from '../../services/endUser.service.js'; import db from '@nangohq/database'; import { isIntegrationAllowed } from '../../utils/auth.js'; diff --git a/packages/server/lib/controllers/auth/postTba.ts b/packages/server/lib/controllers/auth/postTba.ts index 30ad4beb9f5..875af8b9f2e 100644 --- a/packages/server/lib/controllers/auth/postTba.ts +++ b/packages/server/lib/controllers/auth/postTba.ts @@ -10,7 +10,8 @@ import { getProvider, errorManager, ErrorSourceEnum, - LogActionEnum + LogActionEnum, + linkConnection } from '@nangohq/shared'; import type { TbaCredentials, PostPublicTbaAuthorization, MessageRowInsert } from '@nangohq/types'; import type { LogContext } from '@nangohq/logs'; @@ -22,7 +23,6 @@ import { connectionCreationFailed as connectionCreationFailedHook } from '../../hooks/hooks.js'; import { connectionCredential, connectionIdSchema, providerConfigKeySchema } from '../../helpers/validation.js'; -import { linkConnection } from '../../services/endUser.service.js'; import db from '@nangohq/database'; import { isIntegrationAllowed } from '../../utils/auth.js'; diff --git a/packages/server/lib/controllers/auth/postTwoStep.ts b/packages/server/lib/controllers/auth/postTwoStep.ts index 1537fa758de..2b54cf8111d 100644 --- a/packages/server/lib/controllers/auth/postTwoStep.ts +++ b/packages/server/lib/controllers/auth/postTwoStep.ts @@ -11,7 +11,8 @@ import { errorManager, ErrorSourceEnum, LogActionEnum, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import type { PostPublicTwoStepAuthorization, ProviderTwoStep } from '@nangohq/types'; import type { LogContext } from '@nangohq/logs'; @@ -19,7 +20,6 @@ import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { hmacCheck } from '../../utils/hmac.js'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../../hooks/hooks.js'; import { connectionIdSchema, providerConfigKeySchema, connectionCredential } from '../../helpers/validation.js'; -import { linkConnection } from '../../services/endUser.service.js'; import db from '@nangohq/database'; import { isIntegrationAllowed } from '../../utils/auth.js'; diff --git a/packages/server/lib/controllers/auth/postUnauthenticated.ts b/packages/server/lib/controllers/auth/postUnauthenticated.ts index a02ed6bc133..ad6fe9d6b47 100644 --- a/packages/server/lib/controllers/auth/postUnauthenticated.ts +++ b/packages/server/lib/controllers/auth/postUnauthenticated.ts @@ -4,12 +4,11 @@ import { requireEmptyBody, stringifyError, zodErrorToHTTP } from '@nangohq/utils import { connectionCredential, connectionIdSchema, providerConfigKeySchema } from '../../helpers/validation.js'; import type { PostPublicUnauthenticatedAuthorization } from '@nangohq/types'; -import { AnalyticsTypes, analytics, configService, connectionService, errorManager, getProvider } from '@nangohq/shared'; +import { AnalyticsTypes, analytics, configService, connectionService, errorManager, getProvider, linkConnection } from '@nangohq/shared'; import { logContextGetter } from '@nangohq/logs'; import type { LogContext } from '@nangohq/logs'; import { hmacCheck } from '../../utils/hmac.js'; import { connectionCreated, connectionCreationFailed } from '../../hooks/hooks.js'; -import { linkConnection } from '../../services/endUser.service.js'; import db from '@nangohq/database'; import { isIntegrationAllowed } from '../../utils/auth.js'; diff --git a/packages/server/lib/controllers/connect/getSession.ts b/packages/server/lib/controllers/connect/getSession.ts index f39df00bb69..9f9d420f083 100644 --- a/packages/server/lib/controllers/connect/getSession.ts +++ b/packages/server/lib/controllers/connect/getSession.ts @@ -1,7 +1,7 @@ import type { GetConnectSession } from '@nangohq/types'; import db from '@nangohq/database'; import { asyncWrapper } from '../../utils/asyncWrapper.js'; -import * as endUserService from '../../services/endUser.service.js'; +import * as endUserService from '@nangohq/shared'; import { requireEmptyQuery, requireEmptyBody, zodErrorToHTTP } from '@nangohq/utils'; export const getConnectSession = asyncWrapper(async (req, res) => { diff --git a/packages/server/lib/controllers/connect/postSessions.integration.test.ts b/packages/server/lib/controllers/connect/postSessions.integration.test.ts index 7bb45a27845..85fb875f54b 100644 --- a/packages/server/lib/controllers/connect/postSessions.integration.test.ts +++ b/packages/server/lib/controllers/connect/postSessions.integration.test.ts @@ -3,7 +3,7 @@ import { runServer, shouldBeProtected, isError, isSuccess } from '../../utils/te import { seeders } from '@nangohq/shared'; import db from '@nangohq/database'; import type { DBEnvironment } from '@nangohq/types'; -import * as endUserService from '../../services/endUser.service.js'; +import * as endUserService from '@nangohq/shared'; let api: Awaited>; diff --git a/packages/server/lib/controllers/connect/postSessions.ts b/packages/server/lib/controllers/connect/postSessions.ts index 83c7840ebca..f173b93f945 100644 --- a/packages/server/lib/controllers/connect/postSessions.ts +++ b/packages/server/lib/controllers/connect/postSessions.ts @@ -3,7 +3,7 @@ import { z } from 'zod'; import db from '@nangohq/database'; import { asyncWrapper } from '../../utils/asyncWrapper.js'; import * as keystore from '@nangohq/keystore'; -import * as endUserService from '../../services/endUser.service.js'; +import * as endUserService from '@nangohq/shared'; import * as connectSessionService from '../../services/connectSession.service.js'; import { requireEmptyQuery, zodErrorToHTTP } from '@nangohq/utils'; diff --git a/packages/server/lib/controllers/oauth.controller.ts b/packages/server/lib/controllers/oauth.controller.ts index b7749807826..eb4104d020d 100644 --- a/packages/server/lib/controllers/oauth.controller.ts +++ b/packages/server/lib/controllers/oauth.controller.ts @@ -37,7 +37,8 @@ import { hmacService, ErrorSourceEnum, interpolateObjectValues, - getProvider + getProvider, + linkConnection } from '@nangohq/shared'; import publisher from '../clients/publisher.client.js'; import * as WSErrBuilder from '../utils/web-socket-error.js'; @@ -47,7 +48,6 @@ import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { errorToObject, stringifyError } from '@nangohq/utils'; import type { RequestLocals } from '../utils/express.js'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js'; -import { linkConnection } from '../services/endUser.service.js'; import db from '@nangohq/database'; import type { ConnectSessionAndEndUser } from '../services/connectSession.service.js'; import { getConnectSession } from '../services/connectSession.service.js'; diff --git a/packages/server/lib/services/connectSession.service.ts b/packages/server/lib/services/connectSession.service.ts index 504d650cfc5..30d594c7f01 100644 --- a/packages/server/lib/services/connectSession.service.ts +++ b/packages/server/lib/services/connectSession.service.ts @@ -3,7 +3,7 @@ import * as keystore from '@nangohq/keystore'; import type { ConnectSession, DBEndUser, EndUser } from '@nangohq/types'; import { Err, Ok } from '@nangohq/utils'; import type { Result } from '@nangohq/utils'; -import { EndUserMapper } from './endUser.service.js'; +import { EndUserMapper } from '@nangohq/shared'; const CONNECT_SESSIONS_TABLE = 'connect_sessions'; diff --git a/packages/shared/lib/index.ts b/packages/shared/lib/index.ts index eeb4526572f..f8b73de8a60 100644 --- a/packages/shared/lib/index.ts +++ b/packages/shared/lib/index.ts @@ -27,6 +27,7 @@ export * from './services/sync/job.service.js'; export * from './services/sync/config/config.service.js'; export * from './services/sync/config/endpoint.service.js'; export * from './services/sync/config/deploy.service.js'; +export * from './services/endUser.service.js'; export * from './services/onboarding.service.js'; export * from './services/invitations.js'; export * from './services/providers.js'; diff --git a/packages/shared/lib/sdk/sync.integration.test.ts b/packages/shared/lib/sdk/sync.integration.test.ts index ac2fc6cea61..666ec77728b 100644 --- a/packages/shared/lib/sdk/sync.integration.test.ts +++ b/packages/shared/lib/sdk/sync.integration.test.ts @@ -57,7 +57,8 @@ describe('Connection service integration tests', () => { syncConfig: {} as SyncConfig, debug: false, runnerFlags: {} as any, - startedAt: new Date() + startedAt: new Date(), + endUser: null }; const nango = new NangoAction(nangoProps); diff --git a/packages/shared/lib/sdk/sync.ts b/packages/shared/lib/sdk/sync.ts index 23209ce9e93..914c1fe4500 100644 --- a/packages/shared/lib/sdk/sync.ts +++ b/packages/shared/lib/sdk/sync.ts @@ -384,6 +384,7 @@ export interface NangoProps { runnerFlags: RunnerFlags; debug: boolean; startedAt: Date; + endUser: { id: number; endUserId: string | null; orgId: string | null } | null; axios?: { request?: AxiosInterceptorManager; diff --git a/packages/shared/lib/sdk/sync.unit.test.ts b/packages/shared/lib/sdk/sync.unit.test.ts index 56d9ace0d8c..e5ffa9863f1 100644 --- a/packages/shared/lib/sdk/sync.unit.test.ts +++ b/packages/shared/lib/sdk/sync.unit.test.ts @@ -35,7 +35,8 @@ const nangoProps: NangoProps = { nangoConnectionId: 1, debug: false, runnerFlags: {} as any, - startedAt: new Date() + startedAt: new Date(), + endUser: null }; describe('cache', () => { diff --git a/packages/server/lib/services/endUser.service.ts b/packages/shared/lib/services/endUser.service.ts similarity index 91% rename from packages/server/lib/services/endUser.service.ts rename to packages/shared/lib/services/endUser.service.ts index 97304812f7b..299d77c5d63 100644 --- a/packages/server/lib/services/endUser.service.ts +++ b/packages/shared/lib/services/endUser.service.ts @@ -158,3 +158,16 @@ export async function linkConnection(db: Knex, { endUserId, connection }: { endU export async function unlinkConnection(db: Knex, { connection }: { connection: Pick }) { await db('_nango_connections').where({ id: connection.id! }).update({ end_user_id: null }); } + +export async function getEndUserByConnectionId(db: Knex, props: { connectionId: number }): Promise> { + const endUser = await db(END_USERS_TABLE) + .select(`${END_USERS_TABLE}.*`) + .join('_nango_connections', '_nango_connections.end_user_id', `${END_USERS_TABLE}.id`) + .where('_nango_connections.id', '=', props.connectionId) + .first(); + if (!endUser) { + return Err(new EndUserError({ code: 'not_found', message: `End user not found`, payload: props })); + } + + return Ok(EndUserMapper.from(endUser)); +} diff --git a/packages/utils/lib/environment/detection.ts b/packages/utils/lib/environment/detection.ts index 759074298bc..c6ba7fe3e8a 100644 --- a/packages/utils/lib/environment/detection.ts +++ b/packages/utils/lib/environment/detection.ts @@ -24,3 +24,4 @@ export const flagHasAuth = process.env['FLAG_AUTH_ENABLED'] !== 'false'; export const flagHasManagedAuth = process.env['FLAG_MANAGED_AUTH_ENABLED'] === 'true' && Boolean(process.env['WORKOS_API_KEY'] && process.env['WORKOS_CLIENT_ID']); export const flagHasAPIRateLimit = process.env['FLAG_API_RATE_LIMIT_ENABLED'] !== 'false'; +export const flagHasBigQuery = process.env['FLAG_BIG_QUERY_EXPORT_ENABLED'] !== 'false'; diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index c898985d9e5..b54cbafa9e9 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -62,6 +62,10 @@ export const ENVS = z.object({ AWS_BUCKET_NAME: z.string().optional(), AWS_ACCESS_KEY_ID: z.string().optional(), + // BQ + GOOGLE_APPLICATION_CREDENTIALS: z.string().optional(), + FLAG_BIG_QUERY_EXPORT_ENABLED: bool, + // Datadog DD_ENV: z.string().optional(), DD_SITE: z.string().optional(),