diff --git a/backend/shared/src/supabase/utils.ts b/backend/shared/src/supabase/utils.ts index 00c99b349c..f49d7184b1 100644 --- a/backend/shared/src/supabase/utils.ts +++ b/backend/shared/src/supabase/utils.ts @@ -83,10 +83,34 @@ export function bulkUpdateQuery< ColumnValues extends Tables[T]['Update'] >(table: T, idFields: Column[], values: ColumnValues[]) { if (!values.length) return 'select 1 where false' - const columnNames = Object.keys(values[0]) - const cs = new pgp.helpers.ColumnSet(columnNames, { table }) + + // Filter out idFields from the columns to update to avoid pg errors about generated ALWAYS columns + const updateColumns = Object.keys(values[0]).filter( + (col) => !idFields.includes(col as Column) + ) + const allColumns = [...idFields, ...updateColumns] + const cs = new pgp.helpers.ColumnSet(updateColumns, { table }) + + // Format values array to ensure correct column order + const formattedValues = values + .map( + (row) => + `(${allColumns + .map((col) => { + const val = row[col as keyof ColumnValues] + return typeof val === 'string' ? `'${val}'` : val + }) + .join(',')})` + ) + .join(',') + const clause = idFields.map((f) => `v.${f} = t.${f}`).join(' and ') - const query = pgp.helpers.update(values, cs) + ` WHERE ${clause}` + const columnDefs = allColumns.map((c) => `"${c}"`).join(',') + + const query = + `update ${table} as t set ${cs.assignColumns({ from: 'v' })} ` + + `from (values ${formattedValues}) as v(${columnDefs}) ` + + `WHERE ${clause}` // Hack to properly cast values. return query.replace(/::(\w*)'/g, "'::$1") } @@ -144,30 +168,35 @@ export function bulkUpsertQuery< ) } -// Replacement for BulkWriter -export async function bulkUpdateData( - db: SupabaseDirectClient, +export function bulkUpdateDataQuery( table: T, // TODO: explicit id field updates: (Partial> & { id: string | number })[] ) { - if (updates.length > 0) { - const values = updates - .map( - (update) => - `(${ - typeof update.id === 'string' ? `'${update.id}'` : update.id - }, '${JSON.stringify(update)}'::jsonb)` - ) - .join(',\n') + if (updates.length === 0) return 'select 1 where false' - await db.none( - `update ${table} as c - set data = data || v.update - from (values ${values}) as v(id, update) - where c.id = v.id` + const values = updates + .map( + (update) => + `(${ + typeof update.id === 'string' ? `'${update.id}'` : update.id + }, '${JSON.stringify(update)}'::jsonb)` ) - } + .join(',\n') + + return `update ${table} as c + set data = data || v.update + from (values ${values}) as v(id, update) + where c.id = v.id` +} + +export async function bulkUpdateData( + db: SupabaseDirectClient, + table: T, + updates: (Partial> & { id: string | number })[] +) { + const query = bulkUpdateDataQuery(table, updates) + await db.none(query) } export function updateDataQuery( table: T, diff --git a/backend/shared/src/update-user-metric-periods.ts b/backend/shared/src/update-user-metric-periods.ts index 7c302bd1a9..2880d3d55a 100644 --- a/backend/shared/src/update-user-metric-periods.ts +++ b/backend/shared/src/update-user-metric-periods.ts @@ -14,7 +14,7 @@ import { filterDefined } from 'common/util/array' import { hasSignificantDeepChanges } from 'common/util/object' import { convertBet } from 'common/supabase/bets' import { ContractMetric } from 'common/contract-metric' -import { bulkUpdateData } from './supabase/utils' +import { bulkUpdateDataQuery, bulkUpdateQuery } from './supabase/utils' import { convertAnswer, convertContract } from 'common/supabase/contracts' const CHUNK_SIZE = isProd() ? 400 : 10 @@ -145,7 +145,10 @@ export async function updateUserMetricPeriods( (m) => m.userId ) - const contractMetricUpdates: Pick[] = [] + const contractMetricUpdates: Pick< + ContractMetric, + 'from' | 'id' | 'profit' | 'payout' | 'profitPercent' + >[] = [] log('Computing metric updates...') for (const userId of activeUserIds) { @@ -205,14 +208,25 @@ export async function updateUserMetricPeriods( if (contractMetricUpdates.length > 0 && !skipUpdates) { log('Writing updates') - await bulkUpdateData(pg, 'user_contract_metrics', contractMetricUpdates) - .catch((e) => log.error('Error upserting contract metrics', e)) + const updateDataQuery = bulkUpdateDataQuery( + 'user_contract_metrics', + contractMetricUpdates + ) + const updateColumnsQuery = bulkUpdateQuery( + 'user_contract_metrics', + ['id'], + contractMetricUpdates.map((m) => ({ + id: m.id, + profit: m.profit, + })) as any[] + ) + await pg + .multi(`${updateDataQuery}; ${updateColumnsQuery};`) + .catch((e) => log.error('Error updating contract metrics', e)) .then(() => - log( - 'Finished updating ' + - contractMetricUpdates.length + - ' user period metrics.' - ) + log('Finished updating user period metrics.', { + totalUpdates: contractMetricUpdates.length, + }) ) } }