Skip to content

Commit

Permalink
feat(onlink): Manage couchbase collections on link
Browse files Browse the repository at this point in the history
Signed-off-by: Ritesh Rai <ritesh.rai@aexp.com>
  • Loading branch information
ritesh089 authored and brooksmtownsend committed Jul 5, 2024
1 parent adb5e2b commit 0661807
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 41 deletions.
31 changes: 31 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
version: '3.7'

services:
couchbase:
image: couchbase:latest
ports:
- 8091:8091
- 8092:8092
- 8093:8093
- 8094:8094
- 11210:11210
- 11207:11207
- 11211:11211
environment:
- COUCHBASE_ADMINISTRATOR_USERNAME=Administrator
- COUCHBASE_ADMINISTRATOR_PASSWORD=password
- COUCHBASE_BUCKET=test
- COUCHBASE_CONNECTION_STRING=localhost
volumes:
- couchbase-data:/opt/couchbase/var

couchbase-init:
image: couchbase:latest
depends_on:
- couchbase
volumes:
- ./init-couchbase.sh:/init-couchbase.sh
entrypoint: "/init-couchbase.sh"

volumes:
couchbase-data:
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.3

require (
github.com/couchbase/gocb/v2 v2.8.1
github.com/wasmCloud/provider-sdk-go v0.0.0-20240603160107-53fab4f0d660
github.com/wasmCloud/provider-sdk-go v0.0.0-20240626135506-00b1fb3dec9e
github.com/wrpc/wrpc/go v0.0.0-20240619071643-b830439e40d6
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/wasmCloud/provider-sdk-go v0.0.0-20240603160107-53fab4f0d660 h1:W47MTq7z2MKpydilhyNqhuIfiGB4oo2pLF21M7CCWyY=
github.com/wasmCloud/provider-sdk-go v0.0.0-20240603160107-53fab4f0d660/go.mod h1:DZRTpRkX3YluIs+FKcx157XbQa1q4rFRDpxbamZZBc0=
github.com/wasmCloud/provider-sdk-go v0.0.0-20240626135506-00b1fb3dec9e h1:UIK1VDU1hwvGTd4S9pokw/IoY0ChIhYldWB44JnWuD8=
github.com/wasmCloud/provider-sdk-go v0.0.0-20240626135506-00b1fb3dec9e/go.mod h1:DZRTpRkX3YluIs+FKcx157XbQa1q4rFRDpxbamZZBc0=
github.com/wrpc/wrpc/go v0.0.0-20240619071643-b830439e40d6 h1:P2X0Tsw8o45TmBZbST5vRguBjK80e7QJ9TX/fh/8l2U=
github.com/wrpc/wrpc/go v0.0.0-20240619071643-b830439e40d6/go.mod h1:xbr40Iv8kVVnQHnHzmdbtaxOvyRZLb7y3c90vrGGcxo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
25 changes: 25 additions & 0 deletions init-couchbase.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# Wait for Couchbase to be up and running
until curl -s http://couchbase:8091/pools > /dev/null; do
echo "Waiting for Couchbase to be available..."
sleep 1
done

# Initialize the cluster with the specified username and password
if /opt/couchbase/bin/couchbase-cli server-list -c couchbase:8091 -u Administrator -p password > /dev/null; then
echo "Cluster already initialized"
else
echo "Initializing cluster..."
/opt/couchbase/bin/couchbase-cli cluster-init -c couchbase:8091 --cluster-username Administrator --cluster-password password --cluster-ramsize 512 --services data,index,query,fts
fi

sleep 5
# Create the bucket
if /opt/couchbase/bin/couchbase-cli bucket-list -c couchbase:8091 -u Administrator -p password | grep test > /dev/null; then
echo "Bucket already created"
exit 0
else
echo "Creating bucket..."
/opt/couchbase/bin/couchbase-cli bucket-create -c couchbase:8091 --username Administrator --password password --bucket test --bucket-type couchbase --bucket-ramsize 256 --enable-flush 1
fi
41 changes: 9 additions & 32 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"syscall"
"time"

server "github.com/couchbase-examples/wasmcloud-provider-couchbase/bindings"
"github.com/couchbase/gocb/v2"
Expand All @@ -22,7 +21,10 @@ func main() {

func run() error {
// Initialize the provider with callbacks to track linked components
providerHandler := Handler{}
providerHandler := Handler{
linkedFrom: make(map[string]map[string]string),
clusterConnections: make(map[string]*gocb.Collection),
}
p, err := provider.New(
provider.TargetLinkPut(func(link provider.InterfaceLinkDefinition) error {
return handleNewTargetLink(&providerHandler, link)
Expand All @@ -44,31 +46,6 @@ func run() error {
// Store the provider for use in the handlers
providerHandler.WasmcloudProvider = p

// couchbase setup start
bucketName := p.HostData().Config["bucketName"]
connectionString := p.HostData().Config["connectionString"]
username := p.HostData().Config["username"]
password := p.HostData().Config["password"]

cluster, err := gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: username,
Password: password,
},
})
if err != nil {
return err
}

bucket := cluster.Bucket(bucketName)
if err = bucket.WaitUntilReady(5*time.Second, nil); err != nil {
return err
}

col := bucket.DefaultCollection()
providerHandler.collection = col
// couchbase setup end

// Setup two channels to await RPC and control interface operations
providerCh := make(chan error, 1)
signalCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -98,19 +75,19 @@ func run() error {
p.Shutdown()
stopFunc()
}

return nil
}

func handleNewTargetLink(handler *Handler, link provider.InterfaceLinkDefinition) error {
// handler.Logger.Info("Handling new target link", "link", link)
// handler.linkedFrom[link.Target] = link.TargetConfig
handler.Logger.Info("Handling new target link", "link", link)
handler.linkedFrom[link.SourceID] = link.TargetConfig
handler.updateCouchbaseCluster(handler, link.SourceID, link.TargetConfig)
return nil
}

func handleDelTargetLink(handler *Handler, link provider.InterfaceLinkDefinition) error {
// handler.Logger.Info("Handling del target link", "link", link)
// delete(handler.linkedFrom, link.Target)
handler.Logger.Info("Handling del target link", "link", link)
delete(handler.linkedFrom, link.Target)
return nil
}

Expand Down
67 changes: 60 additions & 7 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package main

import (
"context"
"errors"
"time"

"github.com/couchbase/gocb/v2"
sdk "github.com/wasmCloud/provider-sdk-go"
wrpc "github.com/wrpc/wrpc/go"
wrpcnats "github.com/wrpc/wrpc/go/nats"

// Generated bindings
"github.com/couchbase-examples/wasmcloud-provider-couchbase/bindings/exports/wrpc/keyvalue/atomics"
Expand All @@ -22,17 +25,19 @@ var (
type Handler struct {
// The provider instance
*sdk.WasmcloudProvider
// The couchbase collection
collection *gocb.Collection
// All components linked to this provider and their config.
linkedFrom map[string]map[string]string

// map that stores couchbase cluster connections
clusterConnections map[string]*gocb.Collection
}

// Implementation of wasi:keyvalue/store

func (h *Handler) Get(ctx context.Context, bucket string, key string) (*wrpc.Result[[]uint8, store.Error], error) {
h.Logger.Debug("received request to get value", "key", key)
res, err := h.collection.Get(key, &gocb.GetOptions{Transcoder: gocb.NewRawJSONTranscoder()})
collection, err := h.getCollectionFromContext(ctx)
res, err := collection.Get(key, &gocb.GetOptions{Transcoder: gocb.NewRawJSONTranscoder()})
if err != nil {
h.Logger.Error("unable to get value in store", "key", key, "error", err)
return wrpc.Err[[]uint8](*errNoSuchStore), err
Expand All @@ -47,9 +52,25 @@ func (h *Handler) Get(ctx context.Context, bucket string, key string) (*wrpc.Res
return wrpc.Ok[store.Error](response), nil
}

func (h *Handler) getCollectionFromContext(ctx context.Context) (*gocb.Collection, error) {
header, ok := wrpcnats.HeaderFromContext(ctx)
if !ok {
h.Logger.Warn("Received request from unknown origin")
return nil, errors.New("Error fetching header from wrpc context")
}
// Only allow requests from a linked component
sourceId := header.Get("source-id")
if h.linkedFrom[sourceId] == nil {
h.Logger.Warn("Received request from unlinked source", "sourceId", sourceId)
return nil, errors.New("Received request from unlinked source")
}
return h.clusterConnections[sourceId], nil
}

func (h *Handler) Set(ctx context.Context, bucket string, key string, value []uint8) (*wrpc.Result[struct{}, store.Error], error) {
h.Logger.Debug("received request to set value", "key", key)
_, err := h.collection.Upsert(key, &value, &gocb.UpsertOptions{Transcoder: gocb.NewRawJSONTranscoder()})
collection, err := h.getCollectionFromContext(ctx)
_, err = collection.Upsert(key, &value, &gocb.UpsertOptions{Transcoder: gocb.NewRawJSONTranscoder()})
if err != nil {
h.Logger.Error("unable to store value", "key", key, "error", err)
return wrpc.Err[struct{}](*errInvalidDataType), err
Expand All @@ -59,7 +80,8 @@ func (h *Handler) Set(ctx context.Context, bucket string, key string, value []ui

func (h *Handler) Delete(ctx context.Context, bucket string, key string) (*wrpc.Result[struct{}, store.Error], error) {
h.Logger.Debug("received request to delete value", "key", key)
_, err := h.collection.Remove(key, nil)
collection, err := h.getCollectionFromContext(ctx)
_, err = collection.Remove(key, nil)
if err != nil {
h.Logger.Error("unable to remove value", "key", key, "error", err)
return wrpc.Err[struct{}](*errNoSuchStore), err
Expand All @@ -69,7 +91,8 @@ func (h *Handler) Delete(ctx context.Context, bucket string, key string) (*wrpc.

func (h *Handler) Exists(ctx context.Context, bucket string, key string) (*wrpc.Result[bool, store.Error], error) {
h.Logger.Debug("received request to check value existence", "key", key)
res, err := h.collection.Exists(key, nil)
collection, err := h.getCollectionFromContext(ctx)
res, err := collection.Exists(key, nil)
if err != nil {
h.Logger.Error("unable to check existence of value", "key", key, "error", err)
return wrpc.Err[bool](*errNoSuchStore), err
Expand All @@ -85,11 +108,41 @@ func (h *Handler) ListKeys(ctx context.Context, bucket string, cursor *uint64) (
// Implementation of wasi:keyvalue/atomics
func (h *Handler) Increment(ctx context.Context, bucket string, key string, delta uint64) (*wrpc.Result[uint64, atomics.Error], error) {
h.Logger.Debug("received request to increment key by delta", "key", key, "delta", delta)
res, err := h.collection.Binary().Increment(key, &gocb.IncrementOptions{Initial: int64(delta), Delta: delta})
collection, err := h.getCollectionFromContext(ctx)
res, err := collection.Binary().Increment(key, &gocb.IncrementOptions{Initial: int64(delta), Delta: delta})
if err != nil {
h.Logger.Error("unable to increment value at key", "key", key, "error", err)
return wrpc.Err[uint64](*errInvalidDataType), err
}

return wrpc.Ok[atomics.Error](res.Content()), nil
}

func (h *Handler) updateCouchbaseCluster(handler *Handler, sourceId string, config map[string]string) {
// Connect to the cluster
cluster, err := gocb.Connect(config["connectionString"], gocb.ClusterOptions{
Username: config["username"],
Password: config["password"],
})
if err != nil {
handler.Logger.Error("unable to connect to couchbase cluster", "error", err)
return
}
var collection *gocb.Collection
bucketName := config["bucketName"]
scopeName := config["scopeName"]
collectionName := config["collectionName"]
if collectionName != "" && scopeName != "" {
collection = cluster.Bucket(bucketName).Scope(scopeName).Collection(collectionName)
} else {
collection = cluster.Bucket(bucketName).DefaultCollection()
}

bucket := cluster.Bucket(bucketName)
if err = bucket.WaitUntilReady(5*time.Second, nil); err != nil {
handler.Logger.Error("unable to connect to couchbase bucket", "error", err)
}

// Store the connection
handler.clusterConnections[sourceId] = collection
}
9 changes: 8 additions & 1 deletion wadm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ spec:
namespace: wasi
package: keyvalue
interfaces: [atomics, store]

target_config:
- name: provider-config
properties:
username: 'Administrator'
password: 'password'
bucketName: 'test'
connectionString: 'localhost'
scopeName: 'test'
- name: couchbase
type: capability
properties:
Expand Down

0 comments on commit 0661807

Please sign in to comment.