Skip to content

Commit

Permalink
pubsub/azuresb: new auth method to support Service principal/kubelet …
Browse files Browse the repository at this point in the history
…identity/Workload identity auth methods (google#3360)
  • Loading branch information
tsolodov authored and ybourgery committed Jan 25, 2024
1 parent b0bf66e commit 47db02a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 24 deletions.
63 changes: 50 additions & 13 deletions pubsub/azuresb/azuresb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers
// for the scheme "azuresb".
// The default URL opener will use a Service Bus Connection String based on
// the environment variable "SERVICEBUS_CONNECTION_STRING".
// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables. SERVICEBUS_CONNECTION_STRING takes precedence.
// To customize the URL opener, or for more details on the URL format,
// see URLOpener.
// See https://gocloud.dev/concepts/urls/ for background information.
Expand Down Expand Up @@ -65,6 +65,7 @@ import (
"time"

common "github.com/Azure/azure-amqp-common-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-amqp"
"gocloud.dev/gcerrors"
Expand Down Expand Up @@ -99,7 +100,8 @@ func init() {
}

// defaultURLOpener creates an URLOpener with ConnectionString initialized from
// the environment variable SERVICEBUS_CONNECTION_STRING.
// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables.
// SERVICEBUS_CONNECTION_STRING takes precedence.
type defaultOpener struct {
init sync.Once
opener *URLOpener
Expand All @@ -109,11 +111,12 @@ type defaultOpener struct {
func (o *defaultOpener) defaultOpener() (*URLOpener, error) {
o.init.Do(func() {
cs := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if cs == "" {
o.err = errors.New("SERVICEBUS_CONNECTION_STRING environment variable not set")
sbHostname := os.Getenv("AZURE_SERVICEBUS_HOSTNAME")
if cs == "" && sbHostname == "" {
o.err = errors.New("SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME environment variables not set")
return
}
o.opener = &URLOpener{ConnectionString: cs}
o.opener = &URLOpener{ConnectionString: cs, ServiceBusHostname: sbHostname}
})
return o.opener, o.err
}
Expand Down Expand Up @@ -147,10 +150,14 @@ const Scheme = "azuresb"
//
// No other query parameters are supported.
type URLOpener struct {
// ConnectionString is the Service Bus connection string (required).
// ConnectionString is the Service Bus connection string (required if ServiceBusHostname is not defined).
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
ConnectionString string

// Azure ServiceBus hostname
// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash
ServiceBusHostname string

// ClientOptions are options when creating the Client.
ServiceBusClientOptions *servicebus.ClientOptions

Expand All @@ -165,14 +172,29 @@ type URLOpener struct {
}

func (o *URLOpener) sbClient(kind string, u *url.URL) (*servicebus.Client, error) {
if o.ConnectionString == "" {
return nil, fmt.Errorf("open %s %v: ConnectionString is required", kind, u)
if o.ConnectionString == "" && o.ServiceBusHostname == "" {
return nil, fmt.Errorf("open %s %v: ConnectionString or ServiceBusHostname is required", kind, u)
}
client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err)

// auth using shared key (old method)
// ConnectionString approach takes presendence
if o.ConnectionString != "" {
client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err)
}
return client, nil
}
return client, nil

// auth using Azure AAD Workload Identity/AAD Pod Identities/AKS Kubelet Identity/Service Principal
if o.ServiceBusHostname != "" {
client, err := NewClientFromServiceBusHostname(o.ServiceBusHostname, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid service bus hostname %q: %v", kind, u, o.ServiceBusHostname, err)
}
return client, nil
}
return nil, fmt.Errorf("open %s: please set ServiceBusHostname or ConnectionString", kind)
}

// OpenTopicURL opens a pubsub.Topic based on u.
Expand Down Expand Up @@ -234,12 +256,27 @@ type TopicOptions struct {
BatcherOptions batcher.Options
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.
// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth)
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error) {
return servicebus.NewClientFromConnectionString(connectionString, opts)
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth)
// for example you can use workload identity autorization.
// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash
func NewClientFromServiceBusHostname(serviceBusHostname string, opts *servicebus.ClientOptions) (*servicebus.Client, error) {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err
}
client, err := servicebus.NewClient(serviceBusHostname, cred, opts)
if err != nil {
return nil, err
}
return client, nil
}

// NewSender returns a *servicebus.Sender associated with a Service Bus Client.
func NewSender(sbClient *servicebus.Client, topicName string, opts *servicebus.NewSenderOptions) (*servicebus.Sender, error) {
return sbClient.NewSender(topicName, opts)
Expand Down
51 changes: 40 additions & 11 deletions pubsub/azuresb/azuresb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"gocloud.dev/pubsub/driver"
"gocloud.dev/pubsub/drivertest"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
)
Expand All @@ -34,6 +35,7 @@ var (
// See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string.
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
connString = os.Getenv("SERVICEBUS_CONNECTION_STRING")
sbHostname = os.Getenv("AZURE_SERVICEBUS_HOSTNAME")
)

const (
Expand All @@ -56,8 +58,8 @@ type harness struct {
}

func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) {
if connString == "" {
return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING to run")
if connString == "" && sbHostname == "" {
return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run")
}
adminClient, err := admin.NewClientFromConnectionString(connString, nil)
if err != nil {
Expand Down Expand Up @@ -310,23 +312,50 @@ func deleteSubscription(ctx context.Context, topicName string, subscriptionName
return nil
}

// to run test using Azure Entra credentials:
// 1. grant access to ${AZURE_CLIENT_ID} to Service Bus namespace
// 2. run test:
// AZURE_CLIENT_SECRET='secret' \
// AZURE_CLIENT_ID=client_id_uud \
// AZURE_TENANT_ID=tenant_id_uuid \
// AZURE_SERVICEBUS_HOSTNAME=hostname go test -benchmem -run=^$ -bench ^BenchmarkAzureServiceBusPubSub$ gocloud.dev/pubsub/azuresb
func BenchmarkAzureServiceBusPubSub(b *testing.B) {
const (
benchmarkTopicName = "benchmark-topic"
benchmarkSubscriptionName = "benchmark-subscription"
)
ctx := context.Background()

if connString == "" {
b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING to run")
}
adminClient, err := admin.NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
var adminClient *admin.Client
var sbClient *servicebus.Client
var err error

if connString == "" && sbHostname == "" {
b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run")
}
sbClient, err := NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)

if connString != "" {
adminClient, err = admin.NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
}
sbClient, err = NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
}
} else if sbHostname != "" {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
b.Fatal(err)
}
adminClient, err = admin.NewClient(sbHostname, cred, nil)
if err != nil {
b.Fatal(err)
}
sbClient, err = NewClientFromServiceBusHostname(sbHostname, nil)
if err != nil {
b.Fatal(err)
}
}

// Make topic.
Expand Down

0 comments on commit 47db02a

Please sign in to comment.