Skip to content

Commit

Permalink
Reorder client methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanz committed Apr 23, 2024
1 parent 4144908 commit 3a56ec1
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,25 +145,22 @@ func NewClient(db *sqlx.DB) *Client {
return &Client{db: db}
}

// RunTransaction runs the given function in a transaction.
// CreateTask creates the given task.
//
// The transaction will be rolled back if the function returns an error.
// Otherwise, it will be committed.
func (c *Client) RunTransaction(ctx context.Context, fn func(tx *sqlx.Tx) error) error {
tx, err := c.db.BeginTxx(ctx, nil)
// Returns ErrDuplicateTask if a task with the same fingerprint already exists.
func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `
INSERT INTO tasks
(id, fingerprint, type, payload, max_retries, timeout_seconds, created_at, scheduled_at)
VALUES
(:id, :fingerprint, :type, :payload, :max_retries, :timeout_seconds, :created_at, :scheduled_at)`, t)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}

if err := fn(tx); err != nil {
tx.Rollback()
if isConflictError(err) {
return ErrDuplicateTask
}
return err
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("commit tx: %w", err)
}

return nil
}

Expand All @@ -183,42 +180,45 @@ func (c *Client) ClaimTask(ctx context.Context, tx *sqlx.Tx) (Task, error) {
return t, nil
}

// CreateTask creates the given task.
//
// Returns ErrDuplicateTask if a task with the same fingerprint already exists.
func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `
INSERT INTO tasks
(id, fingerprint, type, payload, max_retries, timeout_seconds, created_at, scheduled_at)
VALUES
(:id, :fingerprint, :type, :payload, :max_retries, :timeout_seconds, :created_at, :scheduled_at)`, t)
// UpdateTask updates the given task.
func (c *Client) UpdateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at WHERE id = :id`, t)
if err != nil {
if isConflictError(err) {
return ErrDuplicateTask
}
return err
}

return nil
}

// UpdateTask updates the given task.
func (c *Client) UpdateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at WHERE id = :id`, t)
// DeleteTask deletes the given task.
func (c *Client) DeleteTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `DELETE FROM tasks WHERE id = :id`, t)
if err != nil {
return err
}

return nil
}

// DeleteTask deletes the given task.
func (c *Client) DeleteTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `DELETE FROM tasks WHERE id = :id`, t)
// RunTransaction runs the given function in a transaction.
//
// The transaction will be rolled back if the function returns an error.
// Otherwise, it will be committed.
func (c *Client) RunTransaction(ctx context.Context, fn func(tx *sqlx.Tx) error) error {
tx, err := c.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}

if err := fn(tx); err != nil {
tx.Rollback()
return err
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("commit tx: %w", err)
}

return nil
}

Expand Down

0 comments on commit 3a56ec1

Please sign in to comment.