diff --git a/config/custom-environment-variables.json b/config/custom-environment-variables.json index a76feebc231..fa7f5477029 100644 --- a/config/custom-environment-variables.json +++ b/config/custom-environment-variables.json @@ -1,11 +1,16 @@ { "port": "PORT", "env": "OC_ENV", + "services": { + "server": "ENABLE_SERVICE_SERVER", + "searchSync": "ENABLE_SERVICE_SEARCH_SYNC" + }, "mailpit": { "client": "MAILPIT_CLIENT" }, "elasticSearch": { - "url": "ELASTICSEARCH_URL" + "url": "ELASTICSEARCH_URL", + "maxSyncDelay": "ELASTICSEARCH_MAX_SYNC_DELAY" }, "database": { "url": "PG_URL", diff --git a/config/default.json b/config/default.json index 2e720647bd2..6437a40bbdb 100644 --- a/config/default.json +++ b/config/default.json @@ -1,5 +1,9 @@ { "port": "3060", + "services": { + "server": true, + "searchSync": true + }, "mailpit": { "client": false }, @@ -16,6 +20,9 @@ }, "readOnly": false }, + "elasticSearch": { + "maxSyncDelay": 5000 + }, "maintenancedb": { "url": "postgres://127.0.0.1:5432/postgres" }, diff --git a/config/e2e.json b/config/e2e.json index e3e4c31f317..f6870e5425d 100644 --- a/config/e2e.json +++ b/config/e2e.json @@ -2,6 +2,9 @@ "database": { "url": "postgres://opencollective@127.0.0.1:5432/opencollective_e2e" }, + "services": { + "searchSync": false + }, "mailpit": { "client": true }, diff --git a/config/test.json b/config/test.json index 60ec13e2174..a17a1ecd112 100644 --- a/config/test.json +++ b/config/test.json @@ -1,5 +1,9 @@ { "port": "3061", + "services": { + "server": true, + "searchSync": false + }, "database": { "url": "postgres://opencollective@127.0.0.1:5432/opencollective_test" }, diff --git a/package-lock.json b/package-lock.json index f2d480fc517..aeeaa17af7b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -95,6 +95,7 @@ "pg": "8.13.1", "pg-connection-string": "2.7.0", "pg-format": "1.0.4", + "pg-listen": "1.7.0", "plaid": "29.0.0", "prepend-http": "3.0.1", "redis": "4.6.6", @@ -22938,6 +22939,20 @@ "node": ">=4.0.0" } }, + "node_modules/pg-listen": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-listen/-/pg-listen-1.7.0.tgz", + "integrity": "sha512-MKDwKLm4ryhy7iq1yw1K1MvUzBdTkaT16HZToddX9QaT8XSdt3Kins5mYH6DLECGFzFWG09VdXvWOIYogjXrsg==", + "license": "MIT", + "dependencies": { + "debug": "^4.1.1", + "pg-format": "^1.0.4", + "typed-emitter": "^0.1.0" + }, + "peerDependencies": { + "pg": "7.x || 8.x" + } + }, "node_modules/pg-pool": { "version": "3.7.0", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.0.tgz", @@ -25953,6 +25968,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/typed-emitter": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/typed-emitter/-/typed-emitter-0.1.0.tgz", + "integrity": "sha512-Tfay0l6gJMP5rkil8CzGbLthukn+9BN/VXWcABVFPjOoelJ+koW8BuPZYk+h/L+lEeIp1fSzVRiWRPIjKVjPdg==", + "license": "MIT" + }, "node_modules/typedarray": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz", diff --git a/package.json b/package.json index 28c0fa86947..35a33beb330 100644 --- a/package.json +++ b/package.json @@ -116,6 +116,7 @@ "pg": "8.13.1", "pg-connection-string": "2.7.0", "pg-format": "1.0.4", + "pg-listen": "1.7.0", "plaid": "29.0.0", "prepend-http": "3.0.1", "redis": "4.6.6", diff --git a/server/index.js b/server/index.js index 0277b08d228..d5ba55e31a6 100644 --- a/server/index.js +++ b/server/index.js @@ -7,14 +7,16 @@ import config from 'config'; import express from 'express'; import throng from 'throng'; +import { startElasticSearchPostgresSync } from './lib/elastic-search/sync-postgres'; import expressLib from './lib/express'; import logger from './lib/logger'; import { updateCachedFidoMetadata } from './lib/two-factor-authentication/fido-metadata'; +import { parseToBoolean } from './lib/utils'; import routes from './routes'; const workers = process.env.WEB_CONCURRENCY || 1; -async function start(i) { +async function startExpressServer(workerId) { const expressApp = express(); await updateCachedFidoMetadata(); @@ -35,7 +37,7 @@ async function start(i) { host, server.address().port, config.env, - i, + workerId, ); }); @@ -45,15 +47,49 @@ async function start(i) { return expressApp; } -let app; +// Start the express server +let appPromise; +if (parseToBoolean(config.services.server)) { + if (['production', 'staging'].includes(config.env) && workers > 1) { + throng({ worker: startExpressServer, count: workers }); // TODO: Thong is not compatible with the shutdown logic below + } else { + appPromise = startExpressServer(1); + } +} -if (['production', 'staging'].includes(config.env) && workers > 1) { - throng({ worker: start, count: workers }); -} else { - app = start(1); +// Start the search sync job +if (parseToBoolean(config.services.searchSync)) { + startElasticSearchPostgresSync(); } +// let isShuttingDown = false; +// const gracefullyShutdown = async signal => { +// if (!isShuttingDown) { +// logger.info(`Received ${signal}. Shutting down.`); +// isShuttingDown = true; + +// if (appPromise) { +// await appPromise.then(app => { +// if (app.__server__) { +// logger.info('Closing express server'); +// app.__server__.close(); +// } +// }); +// } + +// if (parseToBoolean(config.services.searchSync)) { +// await stopElasticSearchPostgresSync(); +// } + +// process.exit(); +// } +// }; + +// process.on('exit', () => gracefullyShutdown('exit')); +// process.on('SIGINT', () => gracefullyShutdown('SIGINT')); +// process.on('SIGTERM', () => gracefullyShutdown('SIGTERM')); + // This is used by tests -export default async function () { - return app ? app : start(1); +export default async function startServerForTest() { + return appPromise ?? startExpressServer(1); } diff --git a/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts index ae4e2f4ce98..8ba63fa2df3 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts @@ -2,10 +2,9 @@ import models, { Op } from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Collective; public readonly index = ElasticSearchIndexName.COLLECTIVES; public readonly mappings = { properties: { @@ -30,15 +29,11 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte }, } as const; - public async findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ): Promise>> { + public getModel() { + return models.Collective; + } + + public async findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Collective.findAll({ attributes: Object.keys(this.mappings.properties), order: [['id', 'DESC']], @@ -49,6 +44,15 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { id: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + { ParentCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, }); } @@ -72,7 +76,7 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte isActive: instance.isActive, isHostAccount: instance.isHostAccount, deactivatedAt: instance.deactivatedAt, - HostCollectiveId: instance.HostCollectiveId, + HostCollectiveId: !instance.isActive ? null : instance.HostCollectiveId, ParentCollectiveId: instance.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts index 7848270d7d0..f12dfcc7847 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts @@ -7,10 +7,9 @@ import { CommentType } from '../../../models/Comment'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Comment; public readonly index = ElasticSearchIndexName.COMMENTS; public readonly mappings = { properties: { @@ -29,15 +28,11 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Comment; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Comment.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -47,12 +42,20 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, { association: 'expense', @@ -84,7 +87,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { HostCollectiveId: instance.expense?.HostCollectiveId ?? instance.hostApplication?.HostCollectiveId ?? - instance.collective.HostCollectiveId, + (!instance.collective.isActive ? null : instance.collective.HostCollectiveId), }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts index da46b533ebf..bc94a7017c3 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts @@ -5,10 +5,9 @@ import models from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Expense; public readonly index = ElasticSearchIndexName.EXPENSES; public readonly mappings = { properties: { @@ -34,15 +33,11 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Expense; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Expense.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['ParentCollectiveId', 'items']), order: [['id', 'DESC']], @@ -51,13 +46,27 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], + }, + { + association: 'items', + required: true, + attributes: ['description'], }, { association: 'items', @@ -87,8 +96,9 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { ParentCollectiveId: instance.collective.ParentCollectiveId, FromCollectiveId: instance.FromCollectiveId, UserId: instance.UserId, - HostCollectiveId: instance.HostCollectiveId || instance.collective.HostCollectiveId, - items: instance.items.map(item => item.description).join(' '), + items: instance.items.map(item => item.description).join(', '), + HostCollectiveId: + instance.HostCollectiveId || (!instance.collective.isActive ? null : instance.collective.HostCollectiveId), }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts index a6430de094d..e400a062da4 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts @@ -4,10 +4,9 @@ import { Op } from 'sequelize'; import models from '../../../models'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.HostApplication; public readonly index = ElasticSearchIndexName.HOST_APPLICATIONS; public readonly mappings = { properties: { @@ -24,15 +23,11 @@ export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelA }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.HostApplication; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.HostApplication.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['ParentCollectiveId']), order: [['id', 'DESC']], @@ -41,7 +36,15 @@ export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelA where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { diff --git a/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts index 8f27874de5d..fa5f672d774 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts @@ -12,20 +12,24 @@ type ElasticSearchModelPermissions = { fields?: Record; }; +export type FindEntriesToIndexOptions = { + offset?: number; + limit?: number; + fromDate?: Date; + maxId?: number; + ids?: number[]; + relatedToCollectiveIds?: number[]; +}; + export interface ElasticSearchModelAdapter { - readonly model: ModelStatic; readonly index: ElasticSearchIndexName; readonly mappings: MappingTypeMapping; readonly settings?: IndicesIndexSettings; + getModel(): ModelStatic; + /** Returns the attributes that `mapModelInstanceToDocument` needs to build the document */ - findEntriesToIndex(options?: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - }): Promise>>; + findEntriesToIndex(options?: FindEntriesToIndexOptions): Promise>>; /** Maps a model instance to an ElasticSearch document */ mapModelInstanceToDocument( diff --git a/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts index 8eb250bf547..a5b7e7a4278 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts @@ -1,13 +1,12 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import models, { Subscription } from '../../../models'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Order; public readonly index = ElasticSearchIndexName.ORDERS; public readonly mappings = { properties: { @@ -18,6 +17,7 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { // Relationships CollectiveId: { type: 'keyword' }, FromCollectiveId: { type: 'keyword' }, + CreatedByUserId: { type: 'keyword' }, // Special fields HostCollectiveId: { type: 'keyword' }, ParentCollectiveId: { type: 'keyword' }, @@ -25,15 +25,11 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Order; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Order.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -42,16 +38,24 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, { - model: models.Subscription, + model: Subscription, required: false, attributes: ['paypalSubscriptionId'], }, @@ -69,8 +73,9 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { description: instance.description, CollectiveId: instance.CollectiveId, FromCollectiveId: instance.FromCollectiveId, - HostCollectiveId: instance.collective.HostCollectiveId, + HostCollectiveId: !instance.collective.isActive ? null : instance.collective.HostCollectiveId, ParentCollectiveId: instance.collective.ParentCollectiveId, + CreatedByUserId: instance.CreatedByUserId, paypalSubscriptionId: instance.Subscription?.paypalSubscriptionId, }; } @@ -83,6 +88,9 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { paypalSubscriptionId: { terms: { HostCollectiveId: adminOfAccountIds }, }, + CreatedByUserId: { + terms: { HostCollectiveId: adminOfAccountIds }, + }, }, }; /* eslint-enable camelcase */ diff --git a/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts index d96509c5b26..652c0629021 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts @@ -5,10 +5,9 @@ import models from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Tier; public readonly index = ElasticSearchIndexName.TIERS; public readonly mappings = { properties: { @@ -28,15 +27,11 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Tier; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Tier.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -45,13 +40,14 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length ? { CollectiveId: options.relatedToCollectiveIds } : null), }, include: [ { association: 'Collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, ], }); @@ -70,7 +66,7 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { description: instance.description, longDescription: stripHTMLOrEmpty(instance.longDescription), CollectiveId: instance.CollectiveId, - HostCollectiveId: instance.Collective.HostCollectiveId, + HostCollectiveId: !instance.Collective.isActive ? null : instance.Collective.HostCollectiveId, ParentCollectiveId: instance.Collective.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts index 1c8fef34f5f..355c45332d4 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts @@ -4,10 +4,9 @@ import { Op } from 'sequelize'; import models from '../../../models'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Transaction; public readonly index = ElasticSearchIndexName.TRANSACTIONS; public readonly mappings = { properties: { @@ -28,15 +27,11 @@ export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapt }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Transaction; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Transaction.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['merchantId']), order: [['id', 'DESC']], @@ -45,7 +40,16 @@ export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapt where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, }); } diff --git a/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts index 3d56ef81b57..5e7085c525d 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts @@ -5,10 +5,9 @@ import models from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Update; public readonly index = ElasticSearchIndexName.UPDATES; public readonly mappings = { properties: { @@ -29,15 +28,11 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Update; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Update.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -47,12 +42,20 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, ], }); @@ -72,7 +75,7 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { CollectiveId: instance.CollectiveId, FromCollectiveId: instance.FromCollectiveId, CreatedByUserId: instance.CreatedByUserId, - HostCollectiveId: instance.collective.HostCollectiveId, + HostCollectiveId: !instance.collective.isActive ? null : instance.collective.HostCollectiveId, ParentCollectiveId: instance.collective.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/index.ts b/server/lib/elastic-search/adapters/index.ts index 2db4bfa7494..ec42f45cb06 100644 --- a/server/lib/elastic-search/adapters/index.ts +++ b/server/lib/elastic-search/adapters/index.ts @@ -20,3 +20,19 @@ export const ElasticSearchModelsAdapters: Record; + +export const getAdapterFromTableName = (table: string): ElasticSearchModelAdapter | undefined => { + if (!AdaptersFromTableNames) { + AdaptersFromTableNames = Object.values(ElasticSearchModelsAdapters).reduce( + (acc, adapter) => { + acc[adapter.getModel().tableName] = adapter; + return acc; + }, + {} as Record, + ); + } + + return AdaptersFromTableNames[table]; +}; diff --git a/server/lib/elastic-search/batch-processor.ts b/server/lib/elastic-search/batch-processor.ts new file mode 100644 index 00000000000..1d014934304 --- /dev/null +++ b/server/lib/elastic-search/batch-processor.ts @@ -0,0 +1,279 @@ +import { Client } from '@elastic/elasticsearch'; +import { BulkOperationContainer, DeleteByQueryRequest } from '@elastic/elasticsearch/lib/api/types'; +import config from 'config'; +import debugLib from 'debug'; +import { groupBy, keyBy } from 'lodash'; + +import logger from '../logger'; + +import { ElasticSearchModelsAdapters, getAdapterFromTableName } from './adapters'; +import { getElasticSearchClient } from './client'; +import { ElasticSearchIndexName } from './constants'; +import { ElasticSearchRequest, ElasticSearchRequestType, isFullAccountReIndexRequest } from './types'; + +const debug = debugLib('elasticsearch-batch-processor'); + +/** + * This class processes ElasticSearch requests in batches, to reduce the number of requests sent to + * the server. + */ +export class ElasticSearchBatchProcessor { + private static instance: ElasticSearchBatchProcessor; + private client: Client; + private queue: ElasticSearchRequest[] = []; + private maxBatchSize: number = 1_000; + private maxWaitTimeInSeconds: number = config.elasticSearch.maxSyncDelay; + private timeoutHandle: NodeJS.Timeout | null = null; + private isStarted: boolean = false; + private isProcessing: boolean = false; + private processBatchPromise: Promise | null = null; + + static getInstance(): ElasticSearchBatchProcessor { + if (!ElasticSearchBatchProcessor.instance) { + ElasticSearchBatchProcessor.instance = new ElasticSearchBatchProcessor(); + } + + return ElasticSearchBatchProcessor.instance; + } + + start() { + this.isStarted = true; + } + + async flushAndClose() { + debug('Flushing and closing Elastic Search Batch Processor'); + this.isStarted = false; + return this.callProcessBatch(); + } + + async addToQueue(request: ElasticSearchRequest) { + if (!this.isStarted) { + return; + } + + debug('New request:', request.type, request['table'] || '', request.payload); + this.queue.push(request); + + if (this.queue.length >= this.maxBatchSize || isFullAccountReIndexRequest(request)) { + this.callProcessBatch(); + } else if (!this.timeoutHandle) { + this.scheduleCallProcessBatch(); + } + } + + // ---- Private methods ---- + private constructor() { + this.client = getElasticSearchClient({ throwIfUnavailable: true }); + } + + private scheduleCallProcessBatch(wait = this.maxWaitTimeInSeconds) { + if (!this.timeoutHandle) { + this.timeoutHandle = setTimeout(() => this.callProcessBatch(), wait); + } + } + + /** + * A wrapper around `processBatch` that either calls it immediately or return a promise that resolves + * once the batch is fully processed or after a timeout. + */ + private async callProcessBatch(): Promise { + // Scenario 1: we are already processing a batch. + if (this.processBatchPromise) { + debug('callProcessBatch: waiting on existing batch processing'); + await this.processBatchPromise; + } + // Scenario 2: there is a pending batch processing. We cancel the timeout and run the batch immediately. + else if (this.timeoutHandle) { + debug('callProcessBatch: running batch early'); + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + this.processBatchPromise = this._processBatch(); + await this.processBatchPromise; + } + // Scenario 3: there is no pending batch processing and no timeout, but there are requests in the queue. + else if (this.queue.length) { + debug('callProcessBatch: running batch now'); + this.processBatchPromise = this._processBatch(); + await this.processBatchPromise; + } + // Scenario 4: there is no pending batch processing, no timeout and no requests in the queue. We're done. + else { + debug('callProcessBatch: all done'); + return; + } + + await this.callProcessBatch(); + } + + private async _processBatch() { + // Clear the timeout + if (this.timeoutHandle) { + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + } + + // Skip if no messages + if (this.queue.length === 0) { + debug('No messages to process'); + return; + } + + // Skip if already processing + if (this.isProcessing) { + return; + } + + // Immediately move up to maxBatchSize items from the queue to the processing queue + this.isProcessing = true; + const processingQueue = this.queue.splice(0, this.maxBatchSize); + debug('Processing batch of', processingQueue.length, 'requests'); + + try { + // Prepare bulk indexing body + const { operations, deleteQuery } = await this.convertRequestsToBulkOperations(processingQueue); + + if (deleteQuery) { + debug('Running delete query for', deleteQuery.query.bool.should[0].bool.must[1].terms._id); + const deleteQueryResult = await this.client.deleteByQuery(deleteQuery); + debug('Delete query took', deleteQueryResult.took, 'ms'); + } + + if (operations.length > 0) { + const bulkResponse = await this.client.bulk({ operations }); + debug('Synchronized', bulkResponse.items.length, 'items in', bulkResponse.took, 'ms'); + + // Handle any indexing errors + if (bulkResponse.errors) { + // TODO better logs + logger.error( + 'Bulk indexing errors:', + bulkResponse.items.filter(item => item.index.status >= 400), + ); + } + } + } catch (error) { + console.error('Batch processing failed', error); + // TODO: implement retry or dead-letter queue logic + } + + // End of processing + this.isProcessing = false; + + // If the queue is ready to be processed again, do it + if (this.queue.length && !this.timeoutHandle) { + const wait = this.queue.length >= this.maxBatchSize ? 0 : this.maxWaitTimeInSeconds; + this.scheduleCallProcessBatch(wait); + } + } + + private async convertRequestsToBulkOperations( + requests: ElasticSearchRequest[], + ): Promise<{ operations: BulkOperationContainer[]; deleteQuery: DeleteByQueryRequest }> { + const { accountsToReIndex, requestsGroupedByTableName } = this.preprocessRequests(requests); + const operations: BulkOperationContainer[] = []; + let deleteQuery: DeleteByQueryRequest | null = null; + + // Start with FULL_ACCOUNT_RE_INDEX requests + if (accountsToReIndex.length > 0) { + deleteQuery = this.getAccountsReIndexDeleteQuery(accountsToReIndex); + for (const adapter of Object.values(ElasticSearchModelsAdapters)) { + const entriesToIndex = await adapter.findEntriesToIndex({ relatedToCollectiveIds: accountsToReIndex }); + for (const entry of entriesToIndex) { + operations.push( + { index: { _index: adapter.index, _id: entry['id'].toString() } }, + adapter.mapModelInstanceToDocument(entry), + ); + } + } + } + + // Then process the rest + for (const [table, requests] of Object.entries(requestsGroupedByTableName)) { + const adapter = getAdapterFromTableName(table); + if (!adapter) { + logger.error(`No ElasticSearch adapter found for table ${table}`); + continue; + } + + // Preload all updated entries + const updateRequests = requests.filter(request => request.type === ElasticSearchRequestType.UPDATE); + const updateRequestsIds = updateRequests.map(request => request.payload.id); + const entriesToIndex = await adapter.findEntriesToIndex({ ids: updateRequestsIds }); + const groupedEntriesToIndex = keyBy(entriesToIndex, 'id'); + + // Iterate over requests and create bulk indexing operations + for (const request of requests) { + if (request.type === ElasticSearchRequestType.UPDATE) { + const entry = groupedEntriesToIndex[request.payload.id]; + if (!entry) { + operations.push({ delete: { _index: adapter.index, _id: request.payload.id.toString() } }); + } else { + operations.push( + { index: { _index: adapter.index, _id: request.payload.id.toString() } }, + adapter.mapModelInstanceToDocument(entry), + ); + } + } else if (request.type === ElasticSearchRequestType.DELETE) { + operations.push({ delete: { _index: adapter.index, _id: request.payload.id.toString() } }); + } + } + } + + return { operations, deleteQuery }; + } + + private getAccountsReIndexDeleteQuery(accountIds: Array): DeleteByQueryRequest { + if (!accountIds.length) { + return null; + } + + const allIndexes = Object.values(ElasticSearchModelsAdapters).map(adapter => adapter.index); + return { + index: allIndexes.join(','), + wait_for_completion: true, // eslint-disable-line camelcase + query: { + bool: { + should: [ + // Delete all collectives + { + bool: { + must: [{ term: { _index: ElasticSearchIndexName.COLLECTIVES } }, { terms: { _id: accountIds } }], + }, + }, + // Delete all relationships + { bool: { must: [{ terms: { HostCollectiveId: accountIds } }] } }, + { bool: { must: [{ terms: { ParentCollectiveId: accountIds } }] } }, + { bool: { must: [{ terms: { FromCollectiveId: accountIds } }] } }, + { bool: { must: [{ terms: { CollectiveId: accountIds } }] } }, + ], + }, + }, + }; + } + + /** + * Deduplicates requests, returning only the latest request for each entity, unless it's a + * FULL_ACCOUNT_RE_INDEX request - which always takes maximum priority - then groups them by table. + */ + private preprocessRequests(requests: ElasticSearchRequest[]): { + accountsToReIndex: Array; + requestsGroupedByTableName: Record; + } { + const accountsToReIndex = new Set(); + const otherRequests: Record = {}; + + for (const request of requests) { + if (isFullAccountReIndexRequest(request)) { + accountsToReIndex.add(request.payload.id); + delete otherRequests[request.payload.id]; // FULL_ACCOUNT_RE_INDEX requests take priority + } else if (request.table !== 'Collectives' || !accountsToReIndex.has(request.payload.id)) { + otherRequests[request.payload.id] = request; + } + } + + return { + accountsToReIndex: Array.from(accountsToReIndex), + requestsGroupedByTableName: groupBy(Object.values(otherRequests), request => request['table']), + }; + } +} diff --git a/server/lib/elastic-search/sync-postgres.ts b/server/lib/elastic-search/sync-postgres.ts new file mode 100644 index 00000000000..9c1ddf16f5d --- /dev/null +++ b/server/lib/elastic-search/sync-postgres.ts @@ -0,0 +1,154 @@ +import createSubscriber from 'pg-listen'; + +import { getDBUrl } from '../db'; +import logger from '../logger'; +import { runWithTimeout } from '../promises'; +import { HandlerType, reportErrorToSentry } from '../sentry'; +import sequelize from '../sequelize'; + +import { ElasticSearchModelsAdapters } from './adapters'; +import { ElasticSearchBatchProcessor } from './batch-processor'; +import { ElasticSearchRequestType } from './types'; + +const CHANNEL_NAME = 'elasticsearch-requests'; + +const setupPostgresTriggers = async () => { + await sequelize.query(` + -- Create a trigger function to send notifications on table changes + CREATE OR REPLACE FUNCTION notify_elasticsearch_on_change() + RETURNS TRIGGER AS $$ + DECLARE + notification JSON; + BEGIN + -- Determine the type of operation + IF (TG_OP = 'INSERT') THEN + notification = json_build_object('type', 'UPDATE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + ELSIF (TG_OP = 'UPDATE') THEN + IF (OLD."deletedAt" IS NULL AND NEW."deletedAt" IS NOT NULL) THEN + notification = json_build_object('type', 'DELETE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + ELSIF (OLD."deletedAt" IS NOT NULL AND NEW."deletedAt" IS NOT NULL) THEN + RETURN NULL; -- Do not notify on updates of deleted rows + ELSE + notification = json_build_object('type', 'UPDATE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + END IF; + ELSIF (TG_OP = 'DELETE') THEN + notification = json_build_object('type', 'DELETE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', OLD.id)); + END IF; + + -- Publish the notification to the Elastic Search requests channel + PERFORM pg_notify('${CHANNEL_NAME}', notification::text); + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + + ${Object.values(ElasticSearchModelsAdapters) + .map( + adapter => ` + -- Create the trigger for INSERT operations + CREATE OR REPLACE TRIGGER ${adapter.getModel().tableName}_insert_trigger + AFTER INSERT ON "${adapter.getModel().tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + + -- Create the trigger for UPDATE operations + CREATE OR REPLACE TRIGGER ${adapter.getModel().tableName}_update_trigger + AFTER UPDATE ON "${adapter.getModel().tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + + -- Create the trigger for DELETE operations + CREATE OR REPLACE TRIGGER ${adapter.getModel().tableName}_delete_trigger + AFTER DELETE ON "${adapter.getModel().tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + `, + ) + .join('\n')} + `); +}; + +const removePostgresTriggers = async () => { + await sequelize.query(` + ${Object.values(ElasticSearchModelsAdapters) + .map( + adapter => ` + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_insert_trigger ON "${adapter.getModel().tableName}"; + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_update_trigger ON "${adapter.getModel().tableName}"; + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_delete_trigger ON "${adapter.getModel().tableName}"; + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_truncate_trigger ON "${adapter.getModel().tableName}"; + `, + ) + .join('\n')} + + DROP FUNCTION IF EXISTS notify_elasticsearch_on_change(); + `); +}; + +// Some shared variables +let shutdownPromise: Promise | null = null; +let subscriber: ReturnType; + +export const startElasticSearchPostgresSync = async () => { + const elasticSearchBatchProcessor = ElasticSearchBatchProcessor.getInstance(); + elasticSearchBatchProcessor.start(); + + // Setup DB message queue + subscriber = createSubscriber({ connectionString: getDBUrl('database') }); + subscriber.notifications.on(CHANNEL_NAME, async event => { + try { + // TODO: Check message format + await elasticSearchBatchProcessor.addToQueue(event); + // await handleElasticSearchRequest(event); + } catch (error) { + // TODO: maybe error handling in the batch processor? + reportErrorToSentry(error, { handler: HandlerType.ELASTICSEARCH_SYNC_JOB }); + } + }); + + subscriber.events.on('error', error => { + reportErrorToSentry(error, { handler: HandlerType.ELASTICSEARCH_SYNC_JOB }); + }); + + await subscriber.connect(); + await subscriber.listenTo(CHANNEL_NAME); + + // Setup postgres triggers + await setupPostgresTriggers(); + + logger.info('ElasticSearch <-> Postgres sync job started'); +}; + +export const stopElasticSearchPostgresSync = async () => { + if (!shutdownPromise) { + logger.info('Shutting down ElasticSearch <-> Postgres sync job'); + if (subscriber) { + subscriber.close(); + } + + shutdownPromise = runWithTimeout( + (async () => { + await removePostgresTriggers(); + const elasticSearchBatchProcessor = ElasticSearchBatchProcessor.getInstance(); + await elasticSearchBatchProcessor.flushAndClose(); + logger.info('ElasticSearch <-> Postgres sync job shutdown complete'); + })(), + 30_000, + 'Elastic Search <-> Postgres sync job took too long to shutdown, forcing exit', + ); + } + + return shutdownPromise; +}; + +/** + * Re-indexes all entries across all indexes related to this `collectiveId`, either through `CollectiveId`, + * `HostCollectiveId`, `FromCollectiveId`...etc. + */ +export const elasticSearchFullAccountReIndex = async (collectiveId: number): Promise => { + const elasticSearchBatchProcessor = ElasticSearchBatchProcessor.getInstance(); + await elasticSearchBatchProcessor.addToQueue({ + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX, + payload: { id: collectiveId }, + }); +}; diff --git a/server/lib/elastic-search/sync.ts b/server/lib/elastic-search/sync.ts index 7cd5f07ef32..95a6d7cc6d2 100644 --- a/server/lib/elastic-search/sync.ts +++ b/server/lib/elastic-search/sync.ts @@ -31,7 +31,7 @@ async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: let offset = 0; let deletedEntries = []; do { - deletedEntries = await adapter.model.findAll({ + deletedEntries = await adapter.getModel().findAll({ attributes: ['id'], where: { deletedAt: { [Op.gt]: fromDate } }, raw: true, @@ -83,7 +83,7 @@ export async function restoreUndeletedEntries(indexName: ElasticSearchIndexName, /* eslint-enable camelcase */ // Search for entries that are not marked as deleted in the database - const undeletedEntries = (await adapter.model.findAll({ + const undeletedEntries = (await adapter.getModel().findAll({ attributes: ['id'], where: { id: { [Op.not]: allIds } }, raw: true, diff --git a/server/lib/elastic-search/types.ts b/server/lib/elastic-search/types.ts new file mode 100644 index 00000000000..d2315e8a261 --- /dev/null +++ b/server/lib/elastic-search/types.ts @@ -0,0 +1,27 @@ +export enum ElasticSearchRequestType { + FULL_ACCOUNT_RE_INDEX = 'FULL_ACCOUNT_RE_INDEX', + UPDATE = 'UPDATE', + DELETE = 'DELETE', +} + +type ElasticSearchRequestBase = { + type: ElasticSearchRequestType; +}; + +export type ElasticSearchRequest = ElasticSearchRequestBase & + ( + | { + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX; + payload: { id: number }; + } + | { + type: ElasticSearchRequestType.UPDATE | ElasticSearchRequestType.DELETE; + table: string; + payload: { id: number }; + } + ); + +export const isFullAccountReIndexRequest = ( + request: ElasticSearchRequest, +): request is ElasticSearchRequest & { type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX } => + request?.type === ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX; diff --git a/server/lib/promises.ts b/server/lib/promises.ts new file mode 100644 index 00000000000..c0a37594ae8 --- /dev/null +++ b/server/lib/promises.ts @@ -0,0 +1,27 @@ +class PromiseTimeoutError extends Error { + constructor(message: string) { + super(message); + this.name = 'PromiseTimeoutError'; + } +} + +export const runWithTimeout = async ( + promise: Promise, + timeoutInMs = 10000, + message = `Promise did not resolve within ${timeoutInMs}ms`, +): Promise => { + let timeout: NodeJS.Timeout; + + const result = await Promise.race([ + promise, + new Promise((_, reject) => { + timeout = setTimeout(() => reject(new PromiseTimeoutError(message)), timeoutInMs); + }), + ]); + + if (timeout) { + clearTimeout(timeout); + } + + return result as T; +}; diff --git a/server/lib/sentry.ts b/server/lib/sentry.ts index d05df8e3b34..e577a94e4ae 100644 --- a/server/lib/sentry.ts +++ b/server/lib/sentry.ts @@ -96,6 +96,7 @@ export enum HandlerType { CRON = 'CRON', FALLBACK = 'FALLBACK', WEBHOOK = 'WEBHOOK', + ELASTICSEARCH_SYNC_JOB = 'ELASTICSEARCH_SYNC_JOB', } export type CaptureErrorParams = { diff --git a/server/models/Collective.ts b/server/models/Collective.ts index 1ae09bb23ae..1042ebe713b 100644 --- a/server/models/Collective.ts +++ b/server/models/Collective.ts @@ -85,6 +85,7 @@ import { } from '../lib/collectivelib'; import { invalidateContributorsCache } from '../lib/contributors'; import { getFxRate } from '../lib/currency'; +import { elasticSearchFullAccountReIndex } from '../lib/elastic-search/sync-postgres'; import emailLib from '../lib/email'; import { formatAddress } from '../lib/format-address'; import { getGithubHandleFromUrl, getGithubUrlFromHandle } from '../lib/github'; @@ -2609,12 +2610,18 @@ class Collective extends Model< } } - return this.addHost(newHostCollective, remoteUser, { + await this.addHost(newHostCollective, remoteUser, { message, applicationData, shouldAutomaticallyApprove, }); } + + // Update caches + purgeCacheForCollective(this.slug); + elasticSearchFullAccountReIndex(this.id); + + return this; }; // edit the list of members and admins of this collective (create/update/remove)