Skip to content

Commit

Permalink
Add initial ELF workload deployment specs
Browse files Browse the repository at this point in the history
  • Loading branch information
kthomas committed Feb 5, 2024
1 parent c88ad92 commit 8dbc67c
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 8 deletions.
162 changes: 154 additions & 8 deletions spec/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ import (
"os/signal"
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

agentapi "github.com/synadia-io/nex/internal/agent-api"
controlapi "github.com/synadia-io/nex/internal/control-api"
"github.com/synadia-io/nex/internal/models"
nexnode "github.com/synadia-io/nex/internal/node"
)
Expand Down Expand Up @@ -190,7 +195,7 @@ var _ = Describe("nex node", func() {
It("should return an error", func(ctx SpecContext) {
err := nexnode.CmdUp(opts, nodeOpts, ctxx, cancel, log)
Expect(err).ToNot(BeNil())
Expect(err.Error()).To(ContainSubstring("failed to load node configuration file"))
Expect(err.Error()).To(ContainSubstring(fmt.Sprintf("open %s: no such file or directory", nodeOpts.ConfigFilepath)))
})
})

Expand Down Expand Up @@ -236,6 +241,9 @@ var _ = Describe("nex node", func() {

AfterEach(func() {
node.Stop()

node = nil
nodeProxy = nil
})

JustBeforeEach(func() {
Expand Down Expand Up @@ -281,15 +289,14 @@ var _ = Describe("nex node", func() {
Expect(nodeProxy.APIListener()).ToNot(BeNil())
})

// FIXME-- this needs to be updated
// It("should initialize a telemetry instance", func(ctx SpecContext) {
// Expect(nodeProxy.Telemetry()).ToNot(BeNil())
// })

Context("when node options enable otel", func() {
// TODO
It("should initialize a telemetry instance", func(ctx SpecContext) {
Expect(nodeProxy.Telemetry()).ToNot(BeNil())
})

// Context("when node options enable otel", func() {
// TODO
// })

Describe("node API listener subscriptions", func() {
It("should initialize a node API subscription for handling ping requests", func(ctx SpecContext) {
subsz, _ := _fixtures.natsServer.Subsz(&server.SubszOptions{
Expand Down Expand Up @@ -335,6 +342,10 @@ var _ = Describe("nex node", func() {
Describe("machine manager", func() {
var manager *nexnode.MachineManager

AfterEach(func() {
manager = nil
})

JustBeforeEach(func() {
nodeConfig.DefaultResourceDir = validResourceDir
_ = os.Mkdir(validResourceDir, 0755)
Expand Down Expand Up @@ -382,10 +393,145 @@ var _ = Describe("nex node", func() {
Expect(subsz.Subs[0].Msgs).To(Equal(int64(1)))
})
})

Describe("deploying an ELF binary workload", func() {
var deployRequest *controlapi.DeployRequest

Context("when the ELF binary is not statically-linked", func() {
var err error

AfterEach(func() {
os.Remove("./echoservice")
})

BeforeEach(func() {
cmd := exec.Command("go", "build", "../examples/echoservice")
_ = cmd.Start()
_ = cmd.Wait()
})

JustBeforeEach(func() {
deployRequest, err = newDeployRequest(*nodeID, "echoservice", "nex example echoservice", "./echoservice", map[string]string{"NATS_URL": "nats://127.0.0.1:4222"}, log)
Expect(err).To(BeNil())

rawDeployRequest, _ := json.MarshalIndent(deployRequest, "", " ")
fmt.Printf(string(rawDeployRequest))

Check failure on line 418 in spec/node_test.go

View workflow job for this annotation

GitHub Actions / lint

SA1006: printf-style function with dynamic format string and no further arguments should use print-style function instead (staticcheck)

nodeClient := controlapi.NewApiClientWithNamespace(_fixtures.natsConn, time.Millisecond*250, "default", log)
_, err = nodeClient.StartWorkload(deployRequest)
})

It("should fail to deploy the ELF workload", func(ctx SpecContext) {
Expect(err.Error()).To(ContainSubstring("elf binary contains at least one dynamically linked dependency"))
})
})
})
})
})
})
})
})
})
})

func cacheWorkloadArtifact(nc *nats.Conn, filename string) (string, string, string, error) {
js, err := nc.JetStream()
if err != nil {
panic(err)
}
var bucket nats.ObjectStore
bucket, err = js.ObjectStore("NEXCLIFILES")
if err != nil {
bucket, err = js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: agentapi.WorkloadCacheBucket,
Description: "Ad hoc object storage for NEX CLI developer mode uploads",
})
if err != nil {
return "", "", "", err
}
}
bytes, err := os.ReadFile(filename)
if err != nil {
return "", "", "", err
}
key := filepath.Base(filename)
key = strings.ReplaceAll(key, ".", "")

_, err = bucket.PutBytes(key, bytes)
if err != nil {
return "", "", "", err
}

var workloadType string
switch strings.Replace(filepath.Ext(filename), ".", "", 1) {
case "js":
workloadType = agentapi.NexExecutionProviderV8
case "wasm":
workloadType = agentapi.NexExecutionProviderWasm
default:
workloadType = "elf"
}

return fmt.Sprintf("nats://%s/%s", "NEXCLIFILES", key), key, workloadType, nil
}

func resolveNodeTargetPublicXKey(nodeID string, log *slog.Logger) (*string, error) {
nodeClient := controlapi.NewApiClientWithNamespace(_fixtures.natsConn, time.Millisecond*250, "default", log)

nodes, err := nodeClient.ListNodes()
if err != nil {
return nil, err
}

if len(nodes) == 0 {
return nil, errors.New("no nodes discovered")
}

for _, candidate := range nodes {
if strings.EqualFold(candidate.NodeId, nodeID) {
info, err := nodeClient.NodeInfo(nodes[0].NodeId)
if err != nil {
return nil, fmt.Errorf("failed to get node info for potential execution target: %s", err)
}

return &info.PublicXKey, nil
}
}

return nil, fmt.Errorf("no node discovered which matched %s", nodeID)
}

// newDeployRequest() generates a new deploy request given the workload name, description, and file path
func newDeployRequest(nodeID, name, desc, path string, env map[string]string, log *slog.Logger) (*controlapi.DeployRequest, error) { // initializes new sender and issuer keypairs and returns a new deploy request
senderKey, _ := nkeys.CreateCurveKeys()
issuerKey, _ := nkeys.CreateAccount()

location, _, workloadType, err := cacheWorkloadArtifact(_fixtures.natsConn, path)
if err != nil {
return nil, err
}

targetPublicXKey, err := resolveNodeTargetPublicXKey(nodeID, log)
if err != nil {
return nil, err
}

opts := []controlapi.RequestOption{
controlapi.WorkloadName(name),
controlapi.WorkloadType(workloadType),
controlapi.WorkloadDescription(desc),
controlapi.Location(location),
// controlapi.Checksum(""),
controlapi.SenderXKey(senderKey),
controlapi.Issuer(issuerKey),
controlapi.TargetNode(nodeID),
controlapi.TargetPublicXKey(*targetPublicXKey),
}

for k, v := range env {
opts = append(opts, controlapi.EnvironmentValue(k, v))
}

return controlapi.NewDeployRequest(opts...)
}

26 changes: 26 additions & 0 deletions spec/spec_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,34 @@ func startNATS(storeDir string) (*server.Server, *nats.Conn, *int, error) {
return nil, nil, nil, fmt.Errorf("failed to connect to NATS server: %s", err)
}

js, err := nc.JetStream()
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to resolve jetstream context via NATS connection: %s", err)
}

_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: "NEXCLIFILES",
Description: "Client-facing cache for deploying workloads",
Storage: nats.MemoryStorage,
})
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create jetstream object store: %s", err)
}

return ns, nc, &port, nil
}

// return a new NATS connection to the server running on `_fixtures.natsPort`
func newNATSConn() (*nats.Conn, error) {

Check failure on line 120 in spec/spec_suite_test.go

View workflow job for this annotation

GitHub Actions / lint

func `newNATSConn` is unused (unused)
nc, err := nats.Connect(_fixtures.natsServer.ClientURL())
if err != nil {
fmt.Printf("failed to connect to NATS server: %s", err)
return nil, err
}

return nc, err
}

func stopDaggerEngine() {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
Expand All @@ -124,3 +149,4 @@ func stopDaggerEngine() {
_ = cli.ContainerStop(ctx, c.ID, container.StopOptions{})
}
}

0 comments on commit 8dbc67c

Please sign in to comment.