Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub/azuresb: new auth method to support Service principal/kubelet identity/Workload identity auth methods #3360

Merged
merged 4 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 50 additions & 13 deletions pubsub/azuresb/azuresb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers
// for the scheme "azuresb".
// The default URL opener will use a Service Bus Connection String based on
// the environment variable "SERVICEBUS_CONNECTION_STRING".
// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables. SERVICEBUS_CONNECTION_STRING takes precedence.
// To customize the URL opener, or for more details on the URL format,
// see URLOpener.
// See https://gocloud.dev/concepts/urls/ for background information.
Expand Down Expand Up @@ -65,6 +65,7 @@
"time"

common "github.com/Azure/azure-amqp-common-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-amqp"
"gocloud.dev/gcerrors"
Expand Down Expand Up @@ -99,7 +100,8 @@
}

// defaultURLOpener creates an URLOpener with ConnectionString initialized from
// the environment variable SERVICEBUS_CONNECTION_STRING.
// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables.
// SERVICEBUS_CONNECTION_STRING takes precedence.
type defaultOpener struct {
init sync.Once
opener *URLOpener
Expand All @@ -109,11 +111,12 @@
func (o *defaultOpener) defaultOpener() (*URLOpener, error) {
o.init.Do(func() {
cs := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if cs == "" {
o.err = errors.New("SERVICEBUS_CONNECTION_STRING environment variable not set")
sbHostname := os.Getenv("AZURE_SERVICEBUS_HOSTNAME")
if cs == "" && sbHostname == "" {
o.err = errors.New("SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME environment variables not set")

Check warning on line 116 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L116

Added line #L116 was not covered by tests
return
}
o.opener = &URLOpener{ConnectionString: cs}
o.opener = &URLOpener{ConnectionString: cs, ServiceBusHostname: sbHostname}
})
return o.opener, o.err
}
Expand Down Expand Up @@ -147,10 +150,14 @@
//
// No other query parameters are supported.
type URLOpener struct {
// ConnectionString is the Service Bus connection string (required).
// ConnectionString is the Service Bus connection string (required if ServiceBusHostname is not defined).
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
ConnectionString string

// Azure ServiceBus hostname
// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash
ServiceBusHostname string

// ClientOptions are options when creating the Client.
ServiceBusClientOptions *servicebus.ClientOptions

Expand All @@ -165,14 +172,29 @@
}

func (o *URLOpener) sbClient(kind string, u *url.URL) (*servicebus.Client, error) {
if o.ConnectionString == "" {
return nil, fmt.Errorf("open %s %v: ConnectionString is required", kind, u)
if o.ConnectionString == "" && o.ServiceBusHostname == "" {
return nil, fmt.Errorf("open %s %v: ConnectionString or ServiceBusHostname is required", kind, u)

Check warning on line 176 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L176

Added line #L176 was not covered by tests
}
client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err)

// auth using shared key (old method)
// ConnectionString approach takes presendence
if o.ConnectionString != "" {
client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err)

Check warning on line 184 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L184

Added line #L184 was not covered by tests
}
return client, nil
}
return client, nil

// auth using Azure AAD Workload Identity/AAD Pod Identities/AKS Kubelet Identity/Service Principal
if o.ServiceBusHostname != "" {
client, err := NewClientFromServiceBusHostname(o.ServiceBusHostname, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid service bus hostname %q: %v", kind, u, o.ServiceBusHostname, err)

Check warning on line 193 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L190-L193

Added lines #L190 - L193 were not covered by tests
}
return client, nil

Check warning on line 195 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L195

Added line #L195 was not covered by tests
}
return nil, fmt.Errorf("open %s: please set ServiceBusHostname or ConnectionString", kind)

Check warning on line 197 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L197

Added line #L197 was not covered by tests
}

// OpenTopicURL opens a pubsub.Topic based on u.
Expand Down Expand Up @@ -234,12 +256,27 @@
BatcherOptions batcher.Options
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.
// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth)
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error) {
return servicebus.NewClientFromConnectionString(connectionString, opts)
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth)
// for example you can use workload identity autorization.
// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash
func NewClientFromServiceBusHostname(serviceBusHostname string, opts *servicebus.ClientOptions) (*servicebus.Client, error) {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err

Check warning on line 271 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L268-L271

Added lines #L268 - L271 were not covered by tests
}
client, err := servicebus.NewClient(serviceBusHostname, cred, opts)
if err != nil {
return nil, err

Check warning on line 275 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L273-L275

Added lines #L273 - L275 were not covered by tests
}
return client, nil

Check warning on line 277 in pubsub/azuresb/azuresb.go

View check run for this annotation

Codecov / codecov/patch

pubsub/azuresb/azuresb.go#L277

Added line #L277 was not covered by tests
}

// NewSender returns a *servicebus.Sender associated with a Service Bus Client.
func NewSender(sbClient *servicebus.Client, topicName string, opts *servicebus.NewSenderOptions) (*servicebus.Sender, error) {
return sbClient.NewSender(topicName, opts)
Expand Down
51 changes: 40 additions & 11 deletions pubsub/azuresb/azuresb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"gocloud.dev/pubsub/driver"
"gocloud.dev/pubsub/drivertest"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
)
Expand All @@ -34,6 +35,7 @@ var (
// See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string.
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
connString = os.Getenv("SERVICEBUS_CONNECTION_STRING")
sbHostname = os.Getenv("AZURE_SERVICEBUS_HOSTNAME")
)

const (
Expand All @@ -56,8 +58,8 @@ type harness struct {
}

func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) {
if connString == "" {
return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING to run")
if connString == "" && sbHostname == "" {
return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run")
}
adminClient, err := admin.NewClientFromConnectionString(connString, nil)
if err != nil {
Expand Down Expand Up @@ -310,23 +312,50 @@ func deleteSubscription(ctx context.Context, topicName string, subscriptionName
return nil
}

// to run test using Azure Entra credentials:
// 1. grant access to ${AZURE_CLIENT_ID} to Service Bus namespace
// 2. run test:
// AZURE_CLIENT_SECRET='secret' \
// AZURE_CLIENT_ID=client_id_uud \
// AZURE_TENANT_ID=tenant_id_uuid \
// AZURE_SERVICEBUS_HOSTNAME=hostname go test -benchmem -run=^$ -bench ^BenchmarkAzureServiceBusPubSub$ gocloud.dev/pubsub/azuresb
func BenchmarkAzureServiceBusPubSub(b *testing.B) {
const (
benchmarkTopicName = "benchmark-topic"
benchmarkSubscriptionName = "benchmark-subscription"
)
ctx := context.Background()

if connString == "" {
b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING to run")
}
adminClient, err := admin.NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
var adminClient *admin.Client
var sbClient *servicebus.Client
var err error

if connString == "" && sbHostname == "" {
b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run")
}
sbClient, err := NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)

if connString != "" {
adminClient, err = admin.NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
}
sbClient, err = NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
}
} else if sbHostname != "" {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
b.Fatal(err)
}
adminClient, err = admin.NewClient(sbHostname, cred, nil)
if err != nil {
b.Fatal(err)
}
sbClient, err = NewClientFromServiceBusHostname(sbHostname, nil)
if err != nil {
b.Fatal(err)
}
}

// Make topic.
Expand Down
Loading