Skip to content

PostgreSQL

KillWolfVlad edited this page Nov 27, 2021 · 5 revisions
  1. Install pg:
yarn packages/app add pg && yarn packages/app add --dev @types/pg && yarn packages/app remove @nestjs/typeorm
  1. Apply this patch to add PostgreSQL support to your NestJS application:
From 4a52897c24edf362fca7b9faf2d78099ce02f276 Mon Sep 17 00:00:00 2001
From: KillWolfVlad <KillWolfVlad@users.noreply.github.com>
Date: Sat, 27 Nov 2021 10:47:07 +0500
Subject: [PATCH] feat: add pg

---
 packages/app/src/infrastructure/index.ts      |  1 +
 packages/app/src/infrastructure/pg/index.ts   | 17 +++++
 .../app/src/infrastructure/pg/pgModule.ts     | 70 +++++++++++--------
 .../app/src/infrastructure/pg/tokens/index.ts | 18 +++++
 .../pg/tokens/pgReadPoolToken.ts              | 17 +++++
 .../pg/tokens/pgWritePoolToken.ts             | 17 +++++
 6 files changed, 112 insertions(+), 28 deletions(-)
 create mode 100644 packages/app/src/infrastructure/pg/index.ts
 create mode 100644 packages/app/src/infrastructure/pg/tokens/index.ts
 create mode 100644 packages/app/src/infrastructure/pg/tokens/pgReadPoolToken.ts
 create mode 100644 packages/app/src/infrastructure/pg/tokens/pgWritePoolToken.ts

diff --git a/packages/app/src/infrastructure/index.ts b/packages/app/src/infrastructure/index.ts
index f9bb302..b231eca 100644
--- a/packages/app/src/infrastructure/index.ts
+++ b/packages/app/src/infrastructure/index.ts
@@ -17,3 +17,4 @@
 export * from "./api";
 export * from "./config";
 export * from "./packageJson";
+export * from "./pg";
diff --git a/packages/app/src/infrastructure/pg/index.ts b/packages/app/src/infrastructure/pg/index.ts
new file mode 100644
index 0000000..e784703
--- /dev/null
+++ b/packages/app/src/infrastructure/pg/index.ts
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2021 Byndyusoft
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export * from "./tokens";
diff --git a/packages/app/src/infrastructure/pg/pgModule.ts b/packages/app/src/infrastructure/pg/pgModule.ts
index be29f91..e83e8a0 100644
--- a/packages/app/src/infrastructure/pg/pgModule.ts
+++ b/packages/app/src/infrastructure/pg/pgModule.ts
@@ -14,38 +14,52 @@
  * limitations under the License.
  */
 
-import { Module } from "@nestjs/common";
-import { TypeOrmModule } from "@nestjs/typeorm";
+import { Global, Inject, Module, OnModuleDestroy } from "@nestjs/common";
+import { Pool } from "pg";
 
-import * as entities from "ᐸEntitiesᐳ";
+import { ConfigDto, PgConfigDto } from "../config";
 
-import { ConfigDto } from "../config";
+import { PgReadPoolToken, PgWritePoolToken } from "./tokens";
 
+function createPgPool(connectionString: string, pgConfig: PgConfigDto): Pool {
+  return new Pool({
+    connectionString,
+    connectionTimeoutMillis: pgConfig.connectionTimeout,
+    max: pgConfig.poolSize,
+  });
+}
+
+@Global()
 @Module({
-  imports: [
-    TypeOrmModule.forRootAsync({
+  providers: [
+    {
+      provide: PgReadPoolToken,
+      inject: [ConfigDto, PgWritePoolToken],
+      useFactory: (config: ConfigDto, pgWritePool: Pool) =>
+        config.pg.writeConnectionString === config.pg.readConnectionString
+          ? pgWritePool
+          : createPgPool(config.pg.readConnectionString, config.pg),
+    },
+    {
+      provide: PgWritePoolToken,
       inject: [ConfigDto],
-      useFactory: (config: ConfigDto) => ({
-        type: "postgres",
-        entities: Object.values(entities),
-        logging: "all",
-        logger: "debug",
-        extra: {
-          max: config.pg.poolSize,
-        },
-        replication: {
-          master: {
-            url: config.pg.writeConnectionString,
-          },
-          slaves: [
-            {
-              url: config.pg.readConnectionString,
-            },
-          ],
-        },
-        connectTimeoutMS: config.pg.connectionTimeout,
-      }),
-    }),
+      useFactory: (config: ConfigDto) =>
+        createPgPool(config.pg.writeConnectionString, config.pg),
+    },
   ],
+  exports: [PgReadPoolToken, PgWritePoolToken],
 })
-export class PgModule {}
+export class PgModule implements OnModuleDestroy {
+  public constructor(
+    @Inject(PgReadPoolToken) private readonly __pgReadPool: Pool,
+    @Inject(PgWritePoolToken) private readonly __pgWritePool: Pool,
+  ) {}
+
+  public async onModuleDestroy(): Promise<void> {
+    if (this.__pgReadPool !== this.__pgWritePool) {
+      await this.__pgReadPool.end();
+    }
+
+    await this.__pgWritePool.end();
+  }
+}
diff --git a/packages/app/src/infrastructure/pg/tokens/index.ts b/packages/app/src/infrastructure/pg/tokens/index.ts
new file mode 100644
index 0000000..abd9842
--- /dev/null
+++ b/packages/app/src/infrastructure/pg/tokens/index.ts
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2021 Byndyusoft
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export * from "./pgReadPoolToken";
+export * from "./pgWritePoolToken";
diff --git a/packages/app/src/infrastructure/pg/tokens/pgReadPoolToken.ts b/packages/app/src/infrastructure/pg/tokens/pgReadPoolToken.ts
new file mode 100644
index 0000000..65c0253
--- /dev/null
+++ b/packages/app/src/infrastructure/pg/tokens/pgReadPoolToken.ts
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2021 Byndyusoft
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export const PgReadPoolToken = Symbol("PgReadPoolToken");
diff --git a/packages/app/src/infrastructure/pg/tokens/pgWritePoolToken.ts b/packages/app/src/infrastructure/pg/tokens/pgWritePoolToken.ts
new file mode 100644
index 0000000..28d631d
--- /dev/null
+++ b/packages/app/src/infrastructure/pg/tokens/pgWritePoolToken.ts
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2021 Byndyusoft
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export const PgWritePoolToken = Symbol("PgWritePoolToken");
-- 
2.34.1

Optional apply this patch to add PostgreSQL support to example commands and queries:

From eeb06d4eca4f8005cae0874ce47a0259c21f7a39 Mon Sep 17 00:00:00 2001
From: KillWolfVlad <KillWolfVlad@users.noreply.github.com>
Date: Sat, 27 Nov 2021 12:50:57 +0500
Subject: [PATCH] feat: add pg support for commands and queries

---
 .../app/src/dataAccess/dataAccessModule.ts    |  4 -
 .../dataAccess/users/checkUserExistsQuery.ts  | 22 ++---
 .../src/dataAccess/users/createUserCommand.ts | 66 ++++++++-------
 .../src/dataAccess/users/deleteUserCommand.ts | 78 ++++++++++--------
 .../src/dataAccess/users/getUserByIdQuery.ts  | 25 ++++--
 .../src/dataAccess/users/listUsersQuery.ts    | 54 ++++++++-----
 .../src/dataAccess/users/updateUserCommand.ts | 80 +++++++++++--------
 7 files changed, 189 insertions(+), 140 deletions(-)

diff --git a/packages/app/src/dataAccess/dataAccessModule.ts b/packages/app/src/dataAccess/dataAccessModule.ts
index 8e17ce3..d5f496c 100644
--- a/packages/app/src/dataAccess/dataAccessModule.ts
+++ b/packages/app/src/dataAccess/dataAccessModule.ts
@@ -15,9 +15,6 @@
  */
 
 import { Global, Module } from "@nestjs/common";
-import { TypeOrmModule } from "@nestjs/typeorm";
-
-import * as entities from "ᐸEntitiesᐳ";
 
 import * as users from "./users";
 
@@ -25,7 +22,6 @@ const providers = [...Object.values(users)];
 
 @Global()
 @Module({
-  imports: [TypeOrmModule.forFeature(Object.values(entities))],
   providers,
   exports: providers,
 })
diff --git a/packages/app/src/dataAccess/users/checkUserExistsQuery.ts b/packages/app/src/dataAccess/users/checkUserExistsQuery.ts
index 15d19d7..45faf92 100644
--- a/packages/app/src/dataAccess/users/checkUserExistsQuery.ts
+++ b/packages/app/src/dataAccess/users/checkUserExistsQuery.ts
@@ -15,18 +15,17 @@
  */
 
 import { TracingService } from "@byndyusoft/nest-opentracing";
-import { Injectable } from "@nestjs/common";
-import { InjectRepository } from "@nestjs/typeorm";
-import { Repository } from "typeorm";
+import { Inject, Injectable } from "@nestjs/common";
+import { Pool } from "pg";
 
+import { PgReadPoolToken } from "~/src/infrastructure";
 import { ParamsWithUserIdDto } from "ᐸDtosᐳ";
 import { UserEntity } from "ᐸEntitiesᐳ";
 
 @Injectable()
 export class CheckUserExistsQuery {
   public constructor(
-    @InjectRepository(UserEntity)
-    private readonly __userRepository: Repository<UserEntity>,
+    @Inject(PgReadPoolToken) private readonly __pgReadPool: Pool,
     private readonly __tracingService: TracingService,
   ) {}
 
@@ -34,11 +33,16 @@ export class CheckUserExistsQuery {
     return this.__tracingService.traceAsyncFunction(
       nameof(CheckUserExistsQuery),
       async () => {
-        const user = await this.__userRepository.findOne(options.userId, {
-          select: ["userId"],
-        });
+        const { rows } = await this.__pgReadPool.query<UserEntity>(
+          `
+            SELECT id as "userId"
+            FROM users
+            WHERE id = $1
+              AND deleted_at IS NULL;`,
+          [options.userId],
+        );
 
-        return !!user;
+        return rows.length > 0;
       },
     );
   }
diff --git a/packages/app/src/dataAccess/users/createUserCommand.ts b/packages/app/src/dataAccess/users/createUserCommand.ts
index af973bd..b1a4ea5 100644
--- a/packages/app/src/dataAccess/users/createUserCommand.ts
+++ b/packages/app/src/dataAccess/users/createUserCommand.ts
@@ -15,60 +15,70 @@
  */
 
 import { TracingService } from "@byndyusoft/nest-opentracing";
-import { Injectable } from "@nestjs/common";
-import { keys } from "ts-transformer-keys";
-import { Connection, EntityManager } from "typeorm";
+import { Inject, Injectable } from "@nestjs/common";
+import { Pool, PoolClient } from "pg";
 
+import { PgWritePoolToken } from "~/src/infrastructure";
 import {
   UserEntityToUserDtoMapper,
   UserEntityToUserOutboxDtoMapper,
 } from "~/src/mappers";
 import { CreateUserDto, UserDto } from "ᐸDtosᐳ";
-import { UserEntity, UserOutboxEntity } from "ᐸEntitiesᐳ";
+import { UserEntity } from "ᐸEntitiesᐳ";
 
 @Injectable()
 export class CreateUserCommand {
   public constructor(
-    private readonly __connection: Connection,
+    @Inject(PgWritePoolToken) private readonly __pgWritePool: Pool,
     private readonly __tracingService: TracingService,
   ) {}
 
   private static async __execute(
-    entityManager: EntityManager,
+    client: PoolClient,
     options: CreateUserDto,
   ): Promise<UserDto> {
-    const userRepository = entityManager.getRepository(UserEntity);
-
-    const userOutboxRepository = entityManager.getRepository(UserOutboxEntity);
-
-    const insertResult = await userRepository
-      .createQueryBuilder()
-      .insert()
-      .values(options)
-      .returning(keys<UserEntity>())
-      .execute();
-
-    const insertedEntities = insertResult.generatedMaps as UserEntity[];
+    const { rows } = await client.query<UserEntity>(
+      `
+        INSERT INTO users(name, email, version)
+        VALUES ($1, $2, 1)
+        RETURNING id AS "userId", name AS "name", email AS "email", version AS "userVersion";`,
+      [options.name, options.email],
+    );
 
     const now = new Date();
 
-    await userOutboxRepository.insert(
-      UserEntityToUserOutboxDtoMapper.map(...insertedEntities).map((x) => ({
-        entity: x,
-        timestamp: now,
-      })),
+    await client.query(
+      `
+        INSERT INTO users_outbox(entity, timestamp)
+        VALUES ($1, $2);`,
+      [UserEntityToUserOutboxDtoMapper.map(...rows)[0], now],
     );
 
-    return UserEntityToUserDtoMapper.map(...insertedEntities)[0];
+    return UserEntityToUserDtoMapper.map(...rows)[0];
   }
 
   public execute(options: CreateUserDto): Promise<UserDto> {
     return this.__tracingService.traceAsyncFunction(
       nameof(CreateUserCommand),
-      () =>
-        this.__connection.transaction((entityManager) =>
-          CreateUserCommand.__execute(entityManager, options),
-        ),
+      async () => {
+        const client = await this.__pgWritePool.connect();
+
+        try {
+          await client.query("BEGIN");
+
+          const result = await CreateUserCommand.__execute(client, options);
+
+          await client.query("COMMIT");
+
+          return result;
+        } catch (error) {
+          await client.query("ROLLBACK");
+
+          throw error;
+        } finally {
+          client.release();
+        }
+      },
     );
   }
 }
diff --git a/packages/app/src/dataAccess/users/deleteUserCommand.ts b/packages/app/src/dataAccess/users/deleteUserCommand.ts
index 2e56b67..632f13b 100644
--- a/packages/app/src/dataAccess/users/deleteUserCommand.ts
+++ b/packages/app/src/dataAccess/users/deleteUserCommand.ts
@@ -17,70 +17,63 @@
 import { TracingService } from "@byndyusoft/nest-opentracing";
 import {
   ConflictException,
+  Inject,
   Injectable,
   InternalServerErrorException,
 } from "@nestjs/common";
-import _ from "lodash";
-import { keys } from "ts-transformer-keys";
-import { Connection, EntityManager } from "typeorm";
+import { Pool, PoolClient } from "pg";
 
+import { PgWritePoolToken } from "~/src/infrastructure";
 import {
   UserEntityToUserDtoMapper,
   UserEntityToUserOutboxDtoMapper,
 } from "~/src/mappers";
 import { ParamsWithUserIdDto, QueryWithUserVersionDto, UserDto } from "ᐸDtosᐳ";
-import { UserEntity, UserOutboxEntity } from "ᐸEntitiesᐳ";
+import { UserEntity } from "ᐸEntitiesᐳ";
 
 @Injectable()
 export class DeleteUserCommand {
   public constructor(
-    private readonly __connection: Connection,
+    @Inject(PgWritePoolToken) private readonly __pgWritePool: Pool,
     private readonly __tracingService: TracingService,
   ) {}
 
   private static async __execute(
-    entityManager: EntityManager,
+    client: PoolClient,
     options: ParamsWithUserIdDto & QueryWithUserVersionDto,
   ): Promise<UserDto> {
-    const userRepository = entityManager.getRepository(UserEntity);
-
-    const userOutboxRepository = entityManager.getRepository(UserOutboxEntity);
-
     const now = new Date();
 
-    const updateResult = await userRepository
-      .createQueryBuilder()
-      .update()
-      .set({ deletedAt: now })
-      .whereEntity({ userId: options.userId } as UserEntity)
-      .where({
-        userId: options.userId,
-        userVersion: options.userVersion,
-      })
-      .returning(keys<UserEntity>())
-      .execute();
+    const { rows } = await client.query<UserEntity>(
+      `
+        UPDATE users
+        SET deleted_at = $1,
+            version    = version + 1
+        WHERE id = $2
+          AND version = $3
+        RETURNING id AS "userId", name AS "name", email AS "email", version AS "userVersion", deleted_at AS "deletedAt";`,
+      [now, options.userId, options.userVersion],
+    );
 
-    if (updateResult.affected === 0) {
+    if (rows.length === 0) {
       throw new ConflictException(
         "version conflict when deleting user",
         "BYS_409",
       );
     }
 
-    if (!_.isNil(updateResult.affected) && updateResult.affected > 1) {
+    if (rows.length > 1) {
       throw new InternalServerErrorException("deleted more than 1 user");
     }
 
-    const updatedEntities = updateResult.generatedMaps as UserEntity[];
-
-    await userOutboxRepository.insert(
-      UserEntityToUserOutboxDtoMapper.map(...updatedEntities).map((x) => ({
-        entity: x,
-        timestamp: now,
-      })),
+    await client.query(
+      `
+        INSERT INTO users_outbox(entity, timestamp)
+        VALUES ($1, $2);`,
+      [UserEntityToUserOutboxDtoMapper.map(...rows)[0], now],
     );
 
-    return UserEntityToUserDtoMapper.map(...updatedEntities)[0];
+    return UserEntityToUserDtoMapper.map(...rows)[0];
   }
 
   public execute(
@@ -88,10 +81,25 @@ export class DeleteUserCommand {
   ): Promise<UserDto> {
     return this.__tracingService.traceAsyncFunction(
       nameof(DeleteUserCommand),
-      () =>
-        this.__connection.transaction((entityManager) =>
-          DeleteUserCommand.__execute(entityManager, options),
-        ),
+      async () => {
+        const client = await this.__pgWritePool.connect();
+
+        try {
+          await client.query("BEGIN");
+
+          const result = await DeleteUserCommand.__execute(client, options);
+
+          await client.query("COMMIT");
+
+          return result;
+        } catch (error) {
+          await client.query("ROLLBACK");
+
+          throw error;
+        } finally {
+          client.release();
+        }
+      },
     );
   }
 }
diff --git a/packages/app/src/dataAccess/users/getUserByIdQuery.ts b/packages/app/src/dataAccess/users/getUserByIdQuery.ts
index 95b7bd1..6aad18d 100644
--- a/packages/app/src/dataAccess/users/getUserByIdQuery.ts
+++ b/packages/app/src/dataAccess/users/getUserByIdQuery.ts
@@ -15,10 +15,10 @@
  */
 
 import { TracingService } from "@byndyusoft/nest-opentracing";
-import { Injectable } from "@nestjs/common";
-import { InjectRepository } from "@nestjs/typeorm";
-import { Repository } from "typeorm";
+import { Inject, Injectable } from "@nestjs/common";
+import { Pool } from "pg";
 
+import { PgReadPoolToken } from "~/src/infrastructure";
 import { UserEntityToUserDtoMapper } from "~/src/mappers";
 import { ParamsWithUserIdDto, UserDto } from "ᐸDtosᐳ";
 import { UserEntity } from "ᐸEntitiesᐳ";
@@ -26,8 +26,7 @@ import { UserEntity } from "ᐸEntitiesᐳ";
 @Injectable()
 export class GetUserByIdQuery {
   public constructor(
-    @InjectRepository(UserEntity)
-    private readonly __userRepository: Repository<UserEntity>,
+    @Inject(PgReadPoolToken) private readonly __pgReadPool: Pool,
     private readonly __tracingService: TracingService,
   ) {}
 
@@ -35,9 +34,21 @@ export class GetUserByIdQuery {
     return this.__tracingService.traceAsyncFunction(
       nameof(GetUserByIdQuery),
       async () => {
-        const user = await this.__userRepository.findOne(options.userId);
+        const { rows } = await this.__pgReadPool.query<UserEntity>(
+          `
+            SELECT id      AS "userId",
+                   name    AS "name",
+                   email   AS "email",
+                   version AS "userVersion"
+            FROM users
+            WHERE id = $1
+              AND deleted_at IS NULL;`,
+          [options.userId],
+        );
 
-        return user ? UserEntityToUserDtoMapper.map(user)[0] : null;
+        return rows.length > 0
+          ? UserEntityToUserDtoMapper.map(...rows)[0]
+          : null;
       },
     );
   }
diff --git a/packages/app/src/dataAccess/users/listUsersQuery.ts b/packages/app/src/dataAccess/users/listUsersQuery.ts
index 63088a8..6d35ff8 100644
--- a/packages/app/src/dataAccess/users/listUsersQuery.ts
+++ b/packages/app/src/dataAccess/users/listUsersQuery.ts
@@ -15,11 +15,10 @@
  */
 
 import { TracingService } from "@byndyusoft/nest-opentracing";
-import { Injectable } from "@nestjs/common";
-import { InjectRepository } from "@nestjs/typeorm";
-import _ from "lodash";
-import { In, Repository } from "typeorm";
+import { Inject, Injectable } from "@nestjs/common";
+import { Pool } from "pg";
 
+import { PgReadPoolToken } from "~/src/infrastructure";
 import { UserEntityToUserDtoMapper } from "~/src/mappers";
 import { ListUsersQueryDto, UserDto } from "ᐸDtosᐳ";
 import { UserEntity } from "ᐸEntitiesᐳ";
@@ -27,8 +26,7 @@ import { UserEntity } from "ᐸEntitiesᐳ";
 @Injectable()
 export class ListUsersQuery {
   public constructor(
-    @InjectRepository(UserEntity)
-    private readonly __userRepository: Repository<UserEntity>,
+    @Inject(PgReadPoolToken) private readonly __pgReadPool: Pool,
     private readonly __tracingService: TracingService,
   ) {}
 
@@ -36,23 +34,35 @@ export class ListUsersQuery {
     return this.__tracingService.traceAsyncFunction(
       nameof(ListUsersQuery),
       async () => {
-        const users = await this.__userRepository.find({
-          where: _.omitBy(
-            {
-              userId: options.userIds ? In(options.userIds) : undefined,
-              name: options.names ? In(options.names) : undefined,
-              email: options.emails ? In(options.emails) : undefined,
-            },
-            (value) => value === undefined,
-          ),
-          order: {
-            userId: "DESC",
-          },
-          skip: options.pageToken,
-          take: options.pageSize,
-        });
+        const { rows } = await this.__pgReadPool.query<UserEntity>(
+          `
+            SELECT id      AS "userId",
+                   name    AS "name",
+                   email   AS "email",
+                   version AS "userVersion"
+            FROM users
+            WHERE CASE
+                    WHEN array_length($1::BIGINT[], 1) > 0 THEN id = ANY ($1)
+                    ELSE TRUE END
+              AND CASE
+                    WHEN array_length($2::TEXT[], 1) > 0 THEN name = ANY ($2)
+                    ELSE TRUE END
+              AND CASE
+                    WHEN array_length($3::TEXT[], 1) > 0 THEN email = ANY ($3)
+                    ELSE TRUE END
+              AND deleted_at IS NULL
+            ORDER BY id DESC
+            LIMIT $4 OFFSET $5;`,
+          [
+            options.userIds,
+            options.names,
+            options.emails,
+            options.pageSize,
+            options.pageToken,
+          ],
+        );
 
-        return UserEntityToUserDtoMapper.map(...users);
+        return UserEntityToUserDtoMapper.map(...rows);
       },
     );
   }
diff --git a/packages/app/src/dataAccess/users/updateUserCommand.ts b/packages/app/src/dataAccess/users/updateUserCommand.ts
index ed65e28..8455202 100644
--- a/packages/app/src/dataAccess/users/updateUserCommand.ts
+++ b/packages/app/src/dataAccess/users/updateUserCommand.ts
@@ -17,13 +17,13 @@
 import { TracingService } from "@byndyusoft/nest-opentracing";
 import {
   ConflictException,
+  Inject,
   Injectable,
   InternalServerErrorException,
 } from "@nestjs/common";
-import _ from "lodash";
-import { keys } from "ts-transformer-keys";
-import { Connection, EntityManager } from "typeorm";
+import { Pool, PoolClient } from "pg";
 
+import { PgWritePoolToken } from "~/src/infrastructure";
 import {
   UserEntityToUserDtoMapper,
   UserEntityToUserOutboxDtoMapper,
@@ -34,58 +34,53 @@ import {
   UpdateUserDto,
   UserDto,
 } from "ᐸDtosᐳ";
-import { UserEntity, UserOutboxEntity } from "ᐸEntitiesᐳ";
+import { UserEntity } from "ᐸEntitiesᐳ";
 
 @Injectable()
 export class UpdateUserCommand {
   public constructor(
-    private readonly __connection: Connection,
+    @Inject(PgWritePoolToken) private readonly __pgWritePool: Pool,
     private readonly __tracingService: TracingService,
   ) {}
 
   private static async __execute(
-    entityManager: EntityManager,
+    client: PoolClient,
     options: ParamsWithUserIdDto & QueryWithUserVersionDto & UpdateUserDto,
   ): Promise<UserDto> {
-    const userRepository = entityManager.getRepository(UserEntity);
-
-    const userOutboxRepository = entityManager.getRepository(UserOutboxEntity);
-
-    const updateResult = await userRepository
-      .createQueryBuilder()
-      .update()
-      .set(_.pick(options, keys<UpdateUserDto>()))
-      .whereEntity({ userId: options.userId } as UserEntity)
-      .where({
-        userId: options.userId,
-        userVersion: options.userVersion,
-      })
-      .returning(keys<UserEntity>())
-      .execute();
+    const { rows } = await client.query<UserEntity>(
+      `
+        UPDATE users
+        SET name    = COALESCE($1, name),
+            email   = COALESCE($2, email),
+            version = version + 1
+        WHERE id = $3
+          AND version = $4
+        RETURNING id AS "userId", name AS "name", email AS "email", version AS "userVersion";
+      `,
+      [options.name, options.email, options.userId, options.userVersion],
+    );
 
-    if (updateResult.affected === 0) {
+    if (rows.length === 0) {
       throw new ConflictException(
         "version conflict when updating user",
         "BYS_409",
       );
     }
 
-    if (!_.isNil(updateResult.affected) && updateResult.affected > 1) {
+    if (rows.length > 1) {
       throw new InternalServerErrorException("updated more than 1 user");
     }
 
-    const updatedEntities = updateResult.generatedMaps as UserEntity[];
-
     const now = new Date();
 
-    await userOutboxRepository.insert(
-      UserEntityToUserOutboxDtoMapper.map(...updatedEntities).map((x) => ({
-        entity: x,
-        timestamp: now,
-      })),
+    await client.query(
+      `
+        INSERT INTO users_outbox(entity, timestamp)
+        VALUES ($1, $2);`,
+      [UserEntityToUserOutboxDtoMapper.map(...rows)[0], now],
     );
 
-    return UserEntityToUserDtoMapper.map(...updatedEntities)[0];
+    return UserEntityToUserDtoMapper.map(...rows)[0];
   }
 
   public execute(
@@ -93,10 +88,25 @@ export class UpdateUserCommand {
   ): Promise<UserDto> {
     return this.__tracingService.traceAsyncFunction(
       nameof(UpdateUserCommand),
-      () =>
-        this.__connection.transaction((entityManager) =>
-          UpdateUserCommand.__execute(entityManager, options),
-        ),
+      async () => {
+        const client = await this.__pgWritePool.connect();
+
+        try {
+          await client.query("BEGIN");
+
+          const result = await UpdateUserCommand.__execute(client, options);
+
+          await client.query("COMMIT");
+
+          return result;
+        } catch (error) {
+          await client.query("ROLLBACK");
+
+          throw error;
+        } finally {
+          client.release();
+        }
+      },
     );
   }
 }
-- 
2.34.1
Clone this wiki locally