Skip to content

Commit

Permalink
TRD-637 backfill task management (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
linhnt3400 authored Oct 17, 2024
1 parent 7627ccf commit 7436908
Show file tree
Hide file tree
Showing 15 changed files with 1,472 additions and 60 deletions.
183 changes: 181 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,185 @@
## This is service to indexing tradelogs from event
# Tradelogs

### Re-generate mock file
This is service to indexing tradelogs from event

# Tradelogs V2

## Backfill Server API Documentation

This server serve the endpoints to manage backfill task: list, create, cancel, restart tasks
### 1. **Backfill Task Creation**

- **URL**: `/backfill`
- **Method**: `POST`
- **Description**: Creates a new backfill task.
- **Request Body**:
- `from_block` (uint64, required): The starting block number.
- `to_block` (uint64, required): The ending block number.
- `exchange` (string, required): The exchange name.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true,
"id": "<task_id>",
"message": "<message>"
}
```
- **400 Bad Request**: If there is a validation error (e.g., missing fields, invalid exchange).
```json
{
"success": false,
"error": "<error_message>"
}
```
- **500 Internal Server Error**: If there is an error during task creation.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 2. **Get All Backfill Tasks**

- **URL**: `/backfill`
- **Method**: `GET`
- **Description**: Retrieves all backfill tasks.
- **Response**:
- **200 OK**: On success. The task with id -1 is the service's default backfill flow.
```json
{
"success": true,
"tasks": [
{
"id": -1,
"exchange": "",
"from_block": 20926953,
"to_block": 20962657,
"processed_block": 20962657,
"created_at": "0001-01-01T00:00:00Z",
"updated_at": "0001-01-01T00:00:00Z",
"status": "processing"
},
{
"id": 1,
"exchange": "zerox",
"from_block": 20962657,
"to_block": 20962658,
"processed_block": 20962657,
"created_at": "2024-10-14T09:07:01.059135Z",
"updated_at": "2024-10-14T17:18:32.814065Z",
"status": "done"
}
]
}
```
- **500 Internal Server Error**: If there is an error retrieving the tasks.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 3. **Get Backfill Task By ID**

- **URL**: `/backfill/:id`
- **Method**: `GET`
- **Description**: Retrieves a specific backfill task by its ID.
- **URL Parameters**:
- `id` (int, required): The task ID.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true,
"task": {
"id": 1,
"exchange": "zerox",
"from_block": 20962657,
"to_block": 20962658,
"processed_block": 20962657,
"created_at": "2024-10-14T09:07:01.059135Z",
"updated_at": "2024-10-14T17:18:32.814065Z",
"status": "done"
}
}
```
- **400 Bad Request**: If the task ID is invalid.
```json
{
"success": false,
"error": "invalid task id: <id>"
}
```
- **500 Internal Server Error**: If there is an error retrieving the task.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 4. **Cancel Backfill Task**

- **URL**: `/backfill/cancel/:id`
- **Method**: `GET`
- **Description**: Cancels a specific backfill task by its ID.
- **URL Parameters**:
- `id` (int, required): The task ID.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true
}
```
- **400 Bad Request**: If the task ID is invalid.
```json
{
"success": false,
"error": "invalid task id: <error_message>"
}
```
- **500 Internal Server Error**: If there is an error canceling the task.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 5. **Restart Backfill Task**

- **URL**: `/backfill/restart/:id`
- **Method**: `GET`
- **Description**: Restarts a specific backfill task by its ID.
- **URL Parameters**:
- `id` (int, required): The task ID.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true
}
```
- **400 Bad Request**: If the task ID is invalid.
```json
{
"success": false,
"error": "invalid task id: <error_message>"
}
```
- **500 Internal Server Error**: If there is an error restarting the task.
```json
{
"success": false,
"error": "<error_message>"
}
```

## Re-generate mock file

First, you need to install `mockery`

Expand Down
9 changes: 8 additions & 1 deletion v2/cmd/backfill/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/KyberNetwork/tradelogs/v2/internal/server"
"github.com/KyberNetwork/tradelogs/v2/internal/service"
"github.com/KyberNetwork/tradelogs/v2/internal/worker"
libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app"
"github.com/KyberNetwork/tradelogs/v2/pkg/handler"
Expand Down Expand Up @@ -53,6 +54,7 @@ func run(c *cli.Context) error {
l.Infow("Starting backfill service")

db, err := initDB(c)
l.Infow("init db successfully")
if err != nil {
return fmt.Errorf("cannot init DB: %w", err)
}
Expand Down Expand Up @@ -123,7 +125,12 @@ func run(c *cli.Context) error {
}
}()

s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), w)
srv, err := service.NewBackfillService(backfillStorage, l, w)
if err != nil {
return fmt.Errorf("cannot create backfill service: %w", err)
}

s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), srv)

return s.Run()
}
Expand Down
14 changes: 14 additions & 0 deletions v2/cmd/migrations/00003_add_backfill_task.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TYPE backfill_status AS ENUM ('processing', 'failed', 'done', 'canceled');

create table backfill_task
(
id serial primary key,
exchange text not null,
from_block bigint not null,
to_block bigint not null,
processed_block bigint default 0 not null,
created_at timestamptz default now() not null,
updated_at timestamptz default now() not null,
status backfill_status default 'processing'::backfill_status not null
);

3 changes: 2 additions & 1 deletion v2/cmd/parse_log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func run(c *cli.Context) error {
l.Infow("Starting log parser service")

db, err := initDB(c)
l.Infow("init db successfully")
if err != nil {
return fmt.Errorf("cannot init DB: %w", err)
}
Expand Down Expand Up @@ -151,7 +152,7 @@ func initDB(c *cli.Context) (*sqlx.DB, error) {
return db, nil
}

func getMostRecentBlock(l *zap.SugaredLogger, s state.Storage, rpcClient *rpcnode.Client) (uint64, error) {
func getMostRecentBlock(l *zap.SugaredLogger, s state.Storage, rpcClient rpcnode.IClient) (uint64, error) {
var blockNumber uint64
block, err := s.GetState(state.ProcessedBlockKey)
if err == nil {
Expand Down
94 changes: 84 additions & 10 deletions v2/internal/server/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package server
import (
"fmt"
"net/http"
"strconv"

"github.com/KyberNetwork/tradelogs/v2/internal/worker"
"github.com/KyberNetwork/tradelogs/v2/internal/service"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/rs/xid"
Expand All @@ -16,7 +17,7 @@ type BackfillServer struct {
l *zap.SugaredLogger
r *gin.Engine
bindAddr string
worker *worker.BackFiller
service *service.Backfill
}

type Query struct {
Expand All @@ -25,15 +26,15 @@ type Query struct {
Exchange string `json:"exchange" binding:"required"`
}

func NewBackfill(l *zap.SugaredLogger, bindAddr string, w *worker.BackFiller) *BackfillServer {
func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill) *BackfillServer {
engine := gin.New()
engine.Use(gin.Recovery())

server := &BackfillServer{
l: l,
r: engine,
bindAddr: bindAddr,
worker: w,
service: s,
}

gin.SetMode(gin.ReleaseMode)
Expand All @@ -53,6 +54,10 @@ func (s *BackfillServer) Run() error {
func (s *BackfillServer) register() {
pprof.Register(s.r, "/debug")
s.r.POST("/backfill", s.backfill)
s.r.GET("/backfill", s.getAllTask)
s.r.GET("/backfill/:id", s.getTask)
s.r.GET("/backfill/cancel/:id", s.cancelTask)
s.r.GET("/backfill/restart/:id", s.restartTask)
}

func responseErr(c *gin.Context, err error) {
Expand All @@ -62,9 +67,10 @@ func responseErr(c *gin.Context, err error) {
})
}

func responseOK(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"success": true,
func internalServerError(c *gin.Context, err error) {
c.JSON(http.StatusInternalServerError, gin.H{
"success": false,
"error": err.Error(),
})
}

Expand All @@ -83,12 +89,80 @@ func (s *BackfillServer) backfill(c *gin.Context) {
l := s.l.With("reqID", xid.New().String())
l.Infow("receive backfill params", "params", params)

err := s.worker.BackfillByExchange(params.FromBlock, params.ToBlock, params.Exchange)
id, message, err := s.service.NewBackfillTask(params.FromBlock, params.ToBlock, params.Exchange)
if err != nil {
l.Errorw("error when backfill", "error", err)
responseErr(c, err)
internalServerError(c, err)
return
}

c.JSON(http.StatusOK, gin.H{
"success": true,
"id": id,
"message": message,
})
}

func (s *BackfillServer) getAllTask(c *gin.Context) {
tasks, err := s.service.ListTask()
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"tasks": tasks,
})
}

func (s *BackfillServer) getTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil || len(id) == 0 {
responseErr(c, fmt.Errorf("invalid task id: %s", id))
return
}
task, err := s.service.GetTask(int(taskID))
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"task": task,
})
}

func (s *BackfillServer) cancelTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
responseErr(c, fmt.Errorf("invalid task id: %w", err))
return
}
err = s.service.CancelBackfillTask(int(taskID))
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
})
}

responseOK(c)
func (s *BackfillServer) restartTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
responseErr(c, fmt.Errorf("invalid task id: %w", err))
return
}
err = s.service.RestartBackfillTask(int(taskID))
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
})
}
Loading

0 comments on commit 7436908

Please sign in to comment.