Skip to content

Commit

Permalink
Support HTTP API server proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 28, 2024
1 parent da08e32 commit 0f9cfb4
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 144 deletions.
90 changes: 90 additions & 0 deletions proxy/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2024 Winlin
//
// SPDX-License-Identifier: MIT
package main

import (
"context"
"net/http"
"srs-proxy/logger"
"strings"
"sync"
"time"
)

type httpAPI struct {
// The underlayer HTTP server.
server *http.Server
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
// The wait group for all goroutines.
wg sync.WaitGroup
}

func NewHttpAPI(opts ...func(*httpAPI)) *httpAPI {
v := &httpAPI{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *httpAPI) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)

v.wg.Wait()
return nil
}

func (v *httpAPI) Run(ctx context.Context) error {
// Parse address to listen.
addr := envHttpAPI()
if !strings.Contains(addr, ":") {
addr = ":" + addr
}

// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
logger.Df(ctx, "HTTP API server listen at %v", addr)

// Shutdown the server gracefully when quiting.
go func() {
ctxParent := ctx
<-ctxParent.Done()

ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()

v.server.Shutdown(ctx)
}()

// The basic version handler, also can be used as health check API.
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
apiResponse(ctx, w, r, map[string]string{
"signature": Signature(),
"version": Version(),
})
})

// Run HTTP API server.
v.wg.Add(1)
go func() {
defer v.wg.Done()

err := v.server.ListenAndServe()
if err != nil {
if ctx.Err() != context.Canceled {
// TODO: If HTTP API server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "HTTP API accept err %+v", err)
} else {
logger.Df(ctx, "HTTP API server done")
}
}
}()

return nil
}
31 changes: 27 additions & 4 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"srs-proxy/logger"
"strings"
"sync"
"time"
)

Expand All @@ -18,6 +19,8 @@ type httpServer struct {
server *http.Server
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
// The wait group for all goroutines.
wg sync.WaitGroup
}

func NewHttpServer(opts ...func(*httpServer)) *httpServer {
Expand All @@ -29,10 +32,15 @@ func NewHttpServer(opts ...func(*httpServer)) *httpServer {
}

func (v *httpServer) Close() error {
return v.server.Close()
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)

v.wg.Wait()
return nil
}

func (v *httpServer) ListenAndServe(ctx context.Context) error {
func (v *httpServer) Run(ctx context.Context) error {
// Parse address to listen.
addr := envHttpServer()
if !strings.Contains(addr, ":") {
Expand All @@ -42,7 +50,7 @@ func (v *httpServer) ListenAndServe(ctx context.Context) error {
// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
logger.Df(ctx, "HTTP stream server listen at %v", addr)
logger.Df(ctx, "HTTP Stream server listen at %v", addr)

// Shutdown the server gracefully when quiting.
go func() {
Expand Down Expand Up @@ -79,5 +87,20 @@ func (v *httpServer) ListenAndServe(ctx context.Context) error {
})

// Run HTTP server.
return v.server.ListenAndServe()
v.wg.Add(1)
go func() {
defer v.wg.Done()

err := v.server.ListenAndServe()
if err != nil {
if ctx.Err() != context.Canceled {
// TODO: If HTTP Stream server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "HTTP Stream accept err %+v", err)
} else {
logger.Df(ctx, "HTTP Stream server done")
}
}
}()

return nil
}
13 changes: 12 additions & 1 deletion proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,25 @@ func doMain(ctx context.Context) error {
return errors.Wrapf(err, "rtmp server")
}

// Start the HTTP API server.
httpAPI := NewHttpAPI(func(server *httpAPI) {
server.gracefulQuitTimeout = gracefulQuitTimeout
})
defer httpAPI.Close()
if err := httpAPI.Run(ctx); err != nil {
return errors.Wrapf(err, "http api server")
}

// Start the HTTP web server.
httpServer := NewHttpServer(func(server *httpServer) {
server.gracefulQuitTimeout = gracefulQuitTimeout
})
defer httpServer.Close()
if err := httpServer.ListenAndServe(ctx); err != nil {
if err := httpServer.Run(ctx); err != nil {
return errors.Wrapf(err, "http server")
}

// Wait for the main loop to quit.
<-ctx.Done()
return nil
}
9 changes: 7 additions & 2 deletions proxy/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main

import (
"context"
"io"
"math/rand"
"net"
"os"
Expand Down Expand Up @@ -72,7 +73,7 @@ func (v *rtmpServer) Run(ctx context.Context) error {
if err != nil {
if ctx.Err() != context.Canceled {
// TODO: If RTMP server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "accept rtmp err %+v", err)
logger.Wf(ctx, "RTMP server accept err %+v", err)
} else {
logger.Df(ctx, "RTMP server done")
}
Expand All @@ -82,7 +83,11 @@ func (v *rtmpServer) Run(ctx context.Context) error {
go func(ctx context.Context, conn *net.TCPConn) {
defer conn.Close()
if err := v.serve(ctx, conn); err != nil {
logger.Wf(ctx, "serve conn %v err %+v", conn.RemoteAddr(), err)
if errors.Cause(err) == io.EOF {
logger.Df(ctx, "RTMP client peer closed")
} else {
logger.Wf(ctx, "serve conn %v err %+v", conn.RemoteAddr(), err)
}
} else {
logger.Df(ctx, "RTMP client done")
}
Expand Down
Loading

0 comments on commit 0f9cfb4

Please sign in to comment.