Skip to content

Commit

Permalink
Added TCP transport support
Browse files Browse the repository at this point in the history
  • Loading branch information
0xef53 committed Apr 1, 2021
1 parent 2ed16c2 commit 45b07ed
Show file tree
Hide file tree
Showing 19 changed files with 620 additions and 444 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.DS_Store
cert/*.crt
cert/*.key
debug/
bin/
packages/
Expand Down
77 changes: 77 additions & 0 deletions cert/cert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cert

import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"path/filepath"
)

type Store interface {
ReadFile(string) ([]byte, error)
}

type Dir string

func (d Dir) ReadFile(name string) ([]byte, error) {
return os.ReadFile(filepath.Join(string(d), name))
}

func NewClientConfig(store Store) (*tls.Config, error) {
return newConfig(store, "client")
}

func NewServerConfig(store Store) (*tls.Config, error) {
cfg, err := newConfig(store, "agent")
if err != nil {
return nil, err
}

cfg.ClientAuth = tls.RequireAndVerifyClientCert
cfg.ClientCAs = cfg.RootCAs

return cfg, nil
}

func newConfig(store Store, name string) (*tls.Config, error) {
if store == nil {
return nil, fmt.Errorf("no certificate store defined")
}

certPool := x509.NewCertPool()

ca, err := store.ReadFile("CA.crt")
if err != nil {
return nil, fmt.Errorf("unable to read CA certificate: %s", err)
}

// Append the client certificates from the CA
if ok := certPool.AppendCertsFromPEM(ca); !ok {
return nil, fmt.Errorf("failed to append client certificates from CA.crt")
}

crt, err := store.ReadFile(name + ".crt")
if err != nil {
return nil, fmt.Errorf("unable to load crt file: %s", err)
}

key, err := store.ReadFile(name + ".key")
if err != nil {
return nil, fmt.Errorf("unable to load key file: %s", err)
}

cert, err := tls.X509KeyPair(crt, key)
if err != nil {
return nil, err
}

return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
PreferServerCipherSuites: true,
ClientSessionCache: tls.NewLRUClientSessionCache(0),
NextProtos: []string{"h2"},
}, nil
}
6 changes: 6 additions & 0 deletions cert/embed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package cert

import "embed"

//go:embed CA.crt agent.crt agent.key client.crt client.key
var EmbedStore embed.FS
198 changes: 167 additions & 31 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,203 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"

"github.com/0xef53/phoenix-guest-agent/cert"
"github.com/0xef53/phoenix-guest-agent/pkg/devconn"
pb "github.com/0xef53/phoenix-guest-agent/protobufs/agent"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc "google.golang.org/grpc"

"github.com/mdlayher/vsock"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)

const AgentVersion = "1.0.3"
const DefaultSerialPort = "/dev/virtio-ports/org.guest-agent.0"

type Agent struct {
WithoutSSH bool
LegacyMode bool
SerialPort string
legacyMode bool
serialPort string

withoutSSH bool
withoutTCP bool

crtStore cert.Store
}

func (a Agent) Serve() error {
ctx, cancel := context.WithCancel(context.Background())
func (a Agent) ListenAndServe(ctx context.Context) error {
grpcSrv := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.TagBasedRequestFieldExtractor("log"))),
grpc.UnaryServerInterceptor(unaryLogRequestInterceptor),
grpc.UnaryServerInterceptor(unaryPreHandlerInterceptor),
),
grpc_middleware.WithStreamServerChain(
grpc.StreamServerInterceptor(streamPreHandlerInterceptor),
),
)

go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT)
s := <-sigc
poller := StatPoller{}

log.WithFields(log.Fields{"signal": s}).Info("Graceful shutdown initiated ...")
// The cancel function is needed here to be able
// to shutdown the agent from the GRPC request
ctx, cancel := context.WithCancel(ctx)

cancel()
}()
// Register main GRPC handler
pb.RegisterAgentServiceServer(grpcSrv, &AgentServiceServer{stat: poller.Stat, cancel: cancel})

//
// Listeners
//

// Which listener should we use ?
listener, err := func() (net.Listener, error) {
if _, err := os.Stat("/dev/vsock"); os.IsNotExist(err) || a.LegacyMode {
// legacy mode via virtio serial port
log.Debug("Using legacy mode via virtio serial port")
return devconn.ListenDevice(a.SerialPort)
var listeners []net.Listener

if _, err := os.Stat("/dev/vsock"); os.IsNotExist(err) || a.legacyMode {
log.Debug("Using legacy mode via virtio serial port")

if l, err := devconn.ListenDevice(a.serialPort); err == nil {
listeners = append(listeners, l)
} else {
return err
}

if !a.withoutTCP {
if ll, err := a.linkLocalListeners(); err == nil {
listeners = append(listeners, ll...)
} else {
log.Debugf("Non-fatal error: unable to use IPv6 link-local addresses: %s", err)
}
}
} else {
log.Debug("Using Linux VM sockets (AF_VSOCK) as a transport")
return vsock.Listen(8383)
}()
if err != nil {
return err

if l, err := vsock.Listen(8383); err == nil {
listeners = append(listeners, l)
} else {
return err
}
}

// Run stat poller and insecure GRPC server
//
// Run servers
//

group, ctx := errgroup.WithContext(ctx)

poller := StatPoller{}
group.Go(func() error {
return poller.Run(ctx, 30*time.Second)
})

group.Go(func() error {
return runGRPCServer(ctx, listener, func(srv *grpc.Server) {
pb.RegisterAgentServiceServer(srv, &AgentServiceServer{stat: poller.Stat, cancel: cancel})
idleConnsClosed := make(chan struct{})

// Graceful shutdown
go func() {
<-ctx.Done()
grpcSrv.GracefulStop()
close(idleConnsClosed)
}()

for _, l := range listeners {
listener := l
laddr := listener.Addr().String()

group.Go(func() error {
log.WithField("addr", laddr).Info("Starting GRPC server")

if err := grpcSrv.Serve(listener); err != nil {
// Error starting or closing listener
return err
}

log.WithField("addr", laddr).Info("GRPC server stopped")

return nil
})
})
}

<-idleConnsClosed

return group.Wait()
}

func (a *Agent) linkLocalListeners() ([]net.Listener, error) {
tlsConfig, err := cert.NewServerConfig(a.crtStore)
if err != nil {
return nil, err
}

ifaces, err := net.Interfaces()
if err != nil {
return nil, err
}

var listeners []net.Listener

for _, netif := range ifaces {
addrs, err := netif.Addrs()
if err != nil {
return nil, err
}

for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() == nil {
if ipnet.IP.IsLinkLocalUnicast() {
l, err := tls.Listen("tcp", "["+ipnet.IP.String()+"%"+netif.Name+"]:8383", tlsConfig)
if err != nil {
return nil, err
}
listeners = append(listeners, l)
break
}
}
}
}

return listeners, nil
}

func unaryLogRequestInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
tags := grpc_ctxtags.Extract(ctx)

log.WithFields(tags.Values()).Debug("GRPC Request: ", info.FullMethod)

return handler(ctx, req)
}

func unaryPreHandlerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
switch s := info.Server.(type) {
case *AgentServiceServer:
if s.IsLocked() {
switch info.FullMethod {
case "/agent.AgentService/UnfreezeFileSystems", "/agent.AgentService/GetAgentInfo", "/agent.AgentService/GetGuestInfo":
default:
return nil, fmt.Errorf("All filesystems are frozen. Unable to execute: %s", info.FullMethod)
}
}
}

return handler(ctx, req)
}

func streamPreHandlerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
switch s := srv.(type) {
case *AgentServiceServer:
if s.IsLocked() {
switch info.FullMethod {
case "/agent.AgentService/DownloadFile":
default:
return fmt.Errorf("All filesystems are frozen. Unable to execute: %s", info.FullMethod)
}
}
}

return handler(srv, stream)
}
Loading

0 comments on commit 45b07ed

Please sign in to comment.