Skip to content

Commit

Permalink
Update native profit column in update-periods
Browse files Browse the repository at this point in the history
  • Loading branch information
IanPhilips committed Dec 6, 2024
1 parent a7d4350 commit aa1fa77
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 30 deletions.
71 changes: 50 additions & 21 deletions backend/shared/src/supabase/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,34 @@ export function bulkUpdateQuery<
ColumnValues extends Tables[T]['Update']
>(table: T, idFields: Column<T>[], values: ColumnValues[]) {
if (!values.length) return 'select 1 where false'
const columnNames = Object.keys(values[0])
const cs = new pgp.helpers.ColumnSet(columnNames, { table })

// Filter out idFields from the columns to update to avoid pg errors about generated ALWAYS columns
const updateColumns = Object.keys(values[0]).filter(
(col) => !idFields.includes(col as Column<T>)
)
const allColumns = [...idFields, ...updateColumns]
const cs = new pgp.helpers.ColumnSet(updateColumns, { table })

// Format values array to ensure correct column order
const formattedValues = values
.map(
(row) =>
`(${allColumns
.map((col) => {
const val = row[col as keyof ColumnValues]
return typeof val === 'string' ? `'${val}'` : val
})
.join(',')})`
)
.join(',')

const clause = idFields.map((f) => `v.${f} = t.${f}`).join(' and ')
const query = pgp.helpers.update(values, cs) + ` WHERE ${clause}`
const columnDefs = allColumns.map((c) => `"${c}"`).join(',')

const query =
`update ${table} as t set ${cs.assignColumns({ from: 'v' })} ` +
`from (values ${formattedValues}) as v(${columnDefs}) ` +
`WHERE ${clause}`
// Hack to properly cast values.
return query.replace(/::(\w*)'/g, "'::$1")
}
Expand Down Expand Up @@ -144,30 +168,35 @@ export function bulkUpsertQuery<
)
}

// Replacement for BulkWriter
export async function bulkUpdateData<T extends TableName>(
db: SupabaseDirectClient,
export function bulkUpdateDataQuery<T extends TableName>(
table: T,
// TODO: explicit id field
updates: (Partial<DataFor<T>> & { id: string | number })[]
) {
if (updates.length > 0) {
const values = updates
.map(
(update) =>
`(${
typeof update.id === 'string' ? `'${update.id}'` : update.id
}, '${JSON.stringify(update)}'::jsonb)`
)
.join(',\n')
if (updates.length === 0) return 'select 1 where false'

await db.none(
`update ${table} as c
set data = data || v.update
from (values ${values}) as v(id, update)
where c.id = v.id`
const values = updates
.map(
(update) =>
`(${
typeof update.id === 'string' ? `'${update.id}'` : update.id
}, '${JSON.stringify(update)}'::jsonb)`
)
}
.join(',\n')

return `update ${table} as c
set data = data || v.update
from (values ${values}) as v(id, update)
where c.id = v.id`
}

export async function bulkUpdateData<T extends TableName>(
db: SupabaseDirectClient,
table: T,
updates: (Partial<DataFor<T>> & { id: string | number })[]
) {
const query = bulkUpdateDataQuery(table, updates)
await db.none(query)
}
export function updateDataQuery<T extends TableName>(
table: T,
Expand Down
32 changes: 23 additions & 9 deletions backend/shared/src/update-user-metric-periods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { filterDefined } from 'common/util/array'
import { hasSignificantDeepChanges } from 'common/util/object'
import { convertBet } from 'common/supabase/bets'
import { ContractMetric } from 'common/contract-metric'
import { bulkUpdateData } from './supabase/utils'
import { bulkUpdateDataQuery, bulkUpdateQuery } from './supabase/utils'
import { convertAnswer, convertContract } from 'common/supabase/contracts'

const CHUNK_SIZE = isProd() ? 400 : 10
Expand Down Expand Up @@ -145,7 +145,10 @@ export async function updateUserMetricPeriods(
(m) => m.userId
)

const contractMetricUpdates: Pick<ContractMetric, 'from' | 'id'>[] = []
const contractMetricUpdates: Pick<
ContractMetric,
'from' | 'id' | 'profit' | 'payout' | 'profitPercent'
>[] = []

log('Computing metric updates...')
for (const userId of activeUserIds) {
Expand Down Expand Up @@ -205,14 +208,25 @@ export async function updateUserMetricPeriods(

if (contractMetricUpdates.length > 0 && !skipUpdates) {
log('Writing updates')
await bulkUpdateData(pg, 'user_contract_metrics', contractMetricUpdates)
.catch((e) => log.error('Error upserting contract metrics', e))
const updateDataQuery = bulkUpdateDataQuery(
'user_contract_metrics',
contractMetricUpdates
)
const updateColumnsQuery = bulkUpdateQuery(
'user_contract_metrics',
['id'],
contractMetricUpdates.map((m) => ({
id: m.id,
profit: m.profit,
})) as any[]
)
await pg
.multi(`${updateDataQuery}; ${updateColumnsQuery};`)
.catch((e) => log.error('Error updating contract metrics', e))
.then(() =>
log(
'Finished updating ' +
contractMetricUpdates.length +
' user period metrics.'
)
log('Finished updating user period metrics.', {
totalUpdates: contractMetricUpdates.length,
})
)
}
}
Expand Down

0 comments on commit aa1fa77

Please sign in to comment.