Skip to content

Commit

Permalink
feat(routing/http/server): add routing timeout (#720)
Browse files Browse the repository at this point in the history
* feat: add routing timeouts to delegated server

* chore: update changelog

* docs: CHANGELOG.md

---------

Co-authored-by: Daniel N <2color@users.noreply.github.com>
Co-authored-by: Marcin Rataj <lidel@lidel.org>
  • Loading branch information
3 people authored Nov 19, 2024
1 parent 1bf1488 commit e38f236
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes:
### Added

- `routing/http/server`: added Prometheus instrumentation to http delegated routing endpoints.
- `routing/http/server`: added configurable routing timeout (`DefaultRoutingTimeout` being 30s) to prevent indefinite hangs during content/peer routing. Set custom duration via `WithRoutingTimeout`.

### Changed

Expand Down
30 changes: 26 additions & 4 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (

DefaultRecordsLimit = 20
DefaultStreamingRecordsLimit = 0
DefaultRoutingTimeout = 30 * time.Second
)

var logger = logging.Logger("routing/http/server")
Expand Down Expand Up @@ -132,11 +133,18 @@ func WithPrometheusRegistry(reg prometheus.Registerer) Option {
}
}

func WithRoutingTimeout(timeout time.Duration) Option {
return func(s *server) {
s.routingTimeout = timeout
}
}

func Handler(svc ContentRouter, opts ...Option) http.Handler {
server := &server{
svc: svc,
recordsLimit: DefaultRecordsLimit,
streamingRecordsLimit: DefaultStreamingRecordsLimit,
routingTimeout: DefaultRoutingTimeout,
}

for _, opt := range opts {
Expand Down Expand Up @@ -174,6 +182,7 @@ type server struct {
recordsLimit int
streamingRecordsLimit int
promRegistry prometheus.Registerer
routingTimeout time.Duration
}

func (s *server) detectResponseType(r *http.Request) (string, error) {
Expand Down Expand Up @@ -246,7 +255,10 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) {
recordsLimit = s.recordsLimit
}

provIter, err := s.svc.FindProviders(httpReq.Context(), cid, recordsLimit)
ctx, cancel := context.WithTimeout(httpReq.Context(), s.routingTimeout)
defer cancel()

provIter, err := s.svc.FindProviders(ctx, cid, recordsLimit)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
// handlerFunc takes care of setting the 404 and necessary headers
Expand Down Expand Up @@ -335,7 +347,11 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
recordsLimit = s.recordsLimit
}

provIter, err := s.svc.FindPeers(r.Context(), pid, recordsLimit)
// Add timeout to the routing operation
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
defer cancel()

provIter, err := s.svc.FindPeers(ctx, pid, recordsLimit)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
// handlerFunc takes care of setting the 404 and necessary headers
Expand Down Expand Up @@ -466,7 +482,10 @@ func (s *server) GetIPNS(w http.ResponseWriter, r *http.Request) {
return
}

record, err := s.svc.GetIPNS(r.Context(), name)
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
defer cancel()

record, err := s.svc.GetIPNS(ctx, name)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
writeErr(w, "GetIPNS", http.StatusNotFound, fmt.Errorf("delegate error: %w", err))
Expand Down Expand Up @@ -550,7 +569,10 @@ func (s *server) PutIPNS(w http.ResponseWriter, r *http.Request) {
return
}

err = s.svc.PutIPNS(r.Context(), name, record)
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
defer cancel()

err = s.svc.PutIPNS(ctx, name, record)
if err != nil {
writeErr(w, "PutIPNS", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
return
Expand Down

0 comments on commit e38f236

Please sign in to comment.