Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix log streamer and cleanup code #202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/garm-cli/cmd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var logCmd = &cobra.Command{
slog.With(slog.Any("error", err)).Error("reading log message")
return
}
fmt.Print(util.SanitizeLogEntry(string(message)))
fmt.Println(util.SanitizeLogEntry(string(message)))
}
}()

Expand Down
9 changes: 4 additions & 5 deletions cmd/garm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func maybeInitController(db common.Store) error {
return nil
}

func setupLogging(ctx context.Context, cfg *config.Config, hub *websocket.Hub) {
logCfg := cfg.GetLoggingConfig()
func setupLogging(ctx context.Context, logCfg config.Logging, hub *websocket.Hub) {
logWriter, err := util.GetLoggingWriter(logCfg.LogFile)
if err != nil {
log.Fatalf("fetching log writer: %+v", err)
Expand Down Expand Up @@ -157,16 +156,16 @@ func main() {
log.Fatalf("Fetching config: %+v", err)
}

logCfg := cfg.GetLoggingConfig()
var hub *websocket.Hub
if cfg.Default.EnableLogStreamer != nil && *cfg.Default.EnableLogStreamer {
if logCfg.EnableLogStreamer != nil && *logCfg.EnableLogStreamer {
hub = websocket.NewHub(ctx)
if err := hub.Start(); err != nil {
log.Fatal(err)
}
defer hub.Stop() //nolint
}

setupLogging(ctx, cfg, hub)
setupLogging(ctx, logCfg, hub)

db, err := database.NewDatabase(ctx, cfg.Database)
if err != nil {
Expand Down
51 changes: 42 additions & 9 deletions websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package websocket
import (
"context"
"fmt"
"sync"
"time"
)

Expand Down Expand Up @@ -33,40 +34,63 @@ type Hub struct {

// Unregister requests from clients.
unregister chan *Client

mux sync.Mutex
once sync.Once
}

func (h *Hub) run() {
defer func() {
close(h.closed)
}()
for {
select {
case <-h.quit:
close(h.closed)
return
case <-h.ctx.Done():
close(h.closed)
return
case client := <-h.register:
if client != nil {
h.mux.Lock()
h.clients[client.id] = client
h.mux.Unlock()
}
case client := <-h.unregister:
if client != nil {
h.mux.Lock()
if _, ok := h.clients[client.id]; ok {
delete(h.clients, client.id)
client.conn.Close()
close(client.send)
delete(h.clients, client.id)
}
h.mux.Unlock()
}
case message := <-h.broadcast:
staleClients := []string{}
for id, client := range h.clients {
if client == nil {
staleClients = append(staleClients, id)
continue
}

select {
case client.send <- message:
case <-time.After(5 * time.Second):
close(client.send)
delete(h.clients, id)
staleClients = append(staleClients, id)
}
}
if len(staleClients) > 0 {
h.mux.Lock()
for _, id := range staleClients {
if client, ok := h.clients[id]; ok {
if client != nil {
client.conn.Close()
close(client.send)
}
delete(h.clients, id)
}
}
h.mux.Unlock()
}
}
}
Expand All @@ -78,22 +102,31 @@ func (h *Hub) Register(client *Client) error {
}

func (h *Hub) Write(msg []byte) (int, error) {
tmp := make([]byte, len(msg))
copy(tmp, msg)

select {
case <-time.After(5 * time.Second):
return 0, fmt.Errorf("timed out sending message to client")
case h.broadcast <- msg:

case h.broadcast <- tmp:
}
return len(msg), nil
return len(tmp), nil
}

func (h *Hub) Start() error {
go h.run()
return nil
}

func (h *Hub) Close() error {
h.once.Do(func() {
close(h.quit)
})
return nil
}

func (h *Hub) Stop() error {
close(h.quit)
h.Close()
select {
case <-h.closed:
return nil
Expand Down