diff --git a/bulkerlib/implementations/sql/clickhouse.go b/bulkerlib/implementations/sql/clickhouse.go index 4c5381a..bab68b4 100644 --- a/bulkerlib/implementations/sql/clickhouse.go +++ b/bulkerlib/implementations/sql/clickhouse.go @@ -301,14 +301,19 @@ func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error) { origConfig := *ch.config configCopy := origConfig configCopy.Parameters = utils.MapCopy(ch.config.Parameters) - configCopy.Parameters["session_id"] = uuid.New() + sessionId := uuid.New() + configCopy.Parameters["session_id"] = sessionId utils.MapPutIfAbsent(configCopy.Parameters, "session_timeout", "3600") // create db pool just for one session because 'session_id' config parameter defines session sessionDb, err := ch.dbConnectFunction(&configCopy) if err != nil { return nil, fmt.Errorf("failed to open session: %v", err) } - db = sessionDb + c, err := sessionDb.Conn(ctx) + if err != nil { + return nil, fmt.Errorf("failed to open connection: %v", err) + } + db = NewConWithDB(sessionDb, c, sessionId) } else { var err error db, err = ch.dataSource.Conn(ctx) @@ -1136,8 +1141,11 @@ func (ch *ClickHouse) Ping(_ context.Context) error { func chPing(db *sql.DB, httpMode bool) error { if httpMode { //keep select 1 and don't use Ping() because chproxy doesn't support /ping endpoint. - _, err := db.Exec("SELECT 1") - return err + //_, err := db.Exec("SELECT 1") + //return err + + //not sure Ping is necessary in httpMode + return nil } else { return db.Ping() } diff --git a/bulkerlib/implementations/sql/tx_wrapper.go b/bulkerlib/implementations/sql/tx_wrapper.go index 7de6752..8715134 100644 --- a/bulkerlib/implementations/sql/tx_wrapper.go +++ b/bulkerlib/implementations/sql/tx_wrapper.go @@ -7,6 +7,7 @@ import ( "github.com/jitsucom/bulker/jitsubase/errorj" "github.com/jitsucom/bulker/jitsubase/logging" "io" + "sync" ) // TxWrapper is sql transaction wrapper. Used for handling and log errors with db type (postgres, mySQL, redshift or snowflake) @@ -179,32 +180,83 @@ func (t *TxWrapper) Rollback() error { } type ConWithDB struct { - db *sql.DB - //con *sql.Conn + sessionId string + db *sql.DB + con *sql.Conn } -func NewConWithDB(db *sql.DB) *ConWithDB { - return &ConWithDB{db: db} +var activeSession sync.Map + +func NewConWithDB(db *sql.DB, con *sql.Conn, sessionId string) *ConWithDB { + return &ConWithDB{db: db, con: con, sessionId: sessionId} } -func (c *ConWithDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { - return c.db.ExecContext(ctx, query, args...) +func (c *ConWithDB) ExecContext(ctx context.Context, query string, args ...any) (res sql.Result, err error) { + q, ok := activeSession.LoadOrStore(c.sessionId, query) + if !ok { + defer func() { + del := activeSession.CompareAndDelete(c.sessionId, query) + if !del { + logging.Errorf("session %s was REUSED CONCURRENTLY", c.sessionId) + } + }() + res, err = c.con.ExecContext(ctx, query, args...) + } else { + err = fmt.Errorf("session %s is CONCURRENTLY USED by: %s", c.sessionId, q.(string)) + } + return } -func (c *ConWithDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { - return c.db.QueryContext(ctx, query, args...) +func (c *ConWithDB) QueryContext(ctx context.Context, query string, args ...any) (res *sql.Rows, err error) { + q, ok := activeSession.LoadOrStore(c.sessionId, query) + if !ok { + defer func() { + del := activeSession.CompareAndDelete(c.sessionId, query) + if !del { + logging.Fatalf("session %s was REUSED CONCURRENTLY", c.sessionId) + } + }() + res, err = c.con.QueryContext(ctx, query, args...) + } else { + err = fmt.Errorf("session %s is CONCURRENTLY USED by: %s", c.sessionId, q.(string)) + } + return } -func (c *ConWithDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { - return c.db.QueryRowContext(ctx, query, args...) +func (c *ConWithDB) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) { + q, ok := activeSession.LoadOrStore(c.sessionId, query) + if !ok { + defer func() { + del := activeSession.CompareAndDelete(c.sessionId, query) + if !del { + logging.Fatalf("session %s was REUSED CONCURRENTLY", c.sessionId) + } + }() + row = c.con.QueryRowContext(ctx, query, args...) + } else { + logging.Fatalf("session %s is CONCURRENTLY USED by: %s", c.sessionId, q.(string)) + } + return } -func (c *ConWithDB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) { - return c.db.PrepareContext(ctx, query) +func (c *ConWithDB) PrepareContext(ctx context.Context, query string) (res *sql.Stmt, err error) { + q, ok := activeSession.LoadOrStore(c.sessionId, query) + if !ok { + defer func() { + del := activeSession.CompareAndDelete(c.sessionId, query) + if !del { + logging.Fatalf("session %s was REUSED CONCURRENTLY", c.sessionId) + } + }() + res, err = c.con.PrepareContext(ctx, query) + } else { + err = fmt.Errorf("session %s is CONCURRENTLY USED by: %s", c.sessionId, q.(string)) + } + return } func (c *ConWithDB) Close() error { - //_ = c.con.Close() + _ = c.con.Close() if c.db != nil { return c.db.Close() }