Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub/azuresb: new auth method to support Service principal/kubelet identity/Workload identity auth methods #3360

Merged
merged 4 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 50 additions & 13 deletions pubsub/azuresb/azuresb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers
// for the scheme "azuresb".
// The default URL opener will use a Service Bus Connection String based on
// the environment variable "SERVICEBUS_CONNECTION_STRING".
// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables. SERVICEBUS_CONNECTION_STRING takes presendence.
tsolodov marked this conversation as resolved.
Show resolved Hide resolved
// To customize the URL opener, or for more details on the URL format,
// see URLOpener.
// See https://gocloud.dev/concepts/urls/ for background information.
Expand Down Expand Up @@ -65,6 +65,7 @@ import (
"time"

common "github.com/Azure/azure-amqp-common-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-amqp"
"gocloud.dev/gcerrors"
Expand Down Expand Up @@ -99,7 +100,8 @@ func init() {
}

// defaultURLOpener creates an URLOpener with ConnectionString initialized from
// the environment variable SERVICEBUS_CONNECTION_STRING.
// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables.
// SERVICEBUS_CONNECTION_STRING takes presendence.
type defaultOpener struct {
init sync.Once
opener *URLOpener
Expand All @@ -109,11 +111,12 @@ type defaultOpener struct {
func (o *defaultOpener) defaultOpener() (*URLOpener, error) {
o.init.Do(func() {
cs := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if cs == "" {
o.err = errors.New("SERVICEBUS_CONNECTION_STRING environment variable not set")
sbHostname := os.Getenv("AZURE_SERVICEBUS_HOSTNAME")
if cs == "" && sbHostname == "" {
o.err = errors.New("SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME environment variables not set")
return
}
o.opener = &URLOpener{ConnectionString: cs}
o.opener = &URLOpener{ConnectionString: cs, ServiceBusHostname: sbHostname}
})
return o.opener, o.err
}
Expand Down Expand Up @@ -147,10 +150,14 @@ const Scheme = "azuresb"
//
// No other query parameters are supported.
type URLOpener struct {
// ConnectionString is the Service Bus connection string (required).
// ConnectionString is the Service Bus connection string (required if ServiceBusHostname is not defined).
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
ConnectionString string

// Azure ServiceBus hostname
// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash
ServiceBusHostname string

// ClientOptions are options when creating the Client.
ServiceBusClientOptions *servicebus.ClientOptions

Expand All @@ -165,14 +172,29 @@ type URLOpener struct {
}

func (o *URLOpener) sbClient(kind string, u *url.URL) (*servicebus.Client, error) {
if o.ConnectionString == "" {
return nil, fmt.Errorf("open %s %v: ConnectionString is required", kind, u)
if o.ConnectionString == "" && o.ServiceBusHostname == "" {
return nil, fmt.Errorf("open %s %v: ConnectionString or ServiceBusHostname is required", kind, u)
}
client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err)

// auth using shared key (old method)
// ConnectionString approach takes presendence
if o.ConnectionString != "" {
client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err)
}
return client, nil
}
return client, nil

// auth using Azure AAD Workload Identity/AAD Pod Identities/AKS Kubelet Identity/Service Principal
if o.ServiceBusHostname != "" {
client, err := NewClientFromServiceBusHostname(o.ServiceBusHostname, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid service bus hostname %q: %v", kind, u, o.ServiceBusHostname, err)
}
return client, nil
}
return nil, fmt.Errorf("open %s: please set ServiceBusHostname or ConnectionString", kind)
}

// OpenTopicURL opens a pubsub.Topic based on u.
Expand Down Expand Up @@ -234,12 +256,27 @@ type TopicOptions struct {
BatcherOptions batcher.Options
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.
// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth)
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error) {
return servicebus.NewClientFromConnectionString(connectionString, opts)
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth)
// for example you can use workload identity autorization.
// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash
func NewClientFromServiceBusHostname(serviceBusHostname string, opts *servicebus.ClientOptions) (*servicebus.Client, error) {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err
}
client, err := servicebus.NewClient(serviceBusHostname, cred, opts)
if err != nil {
return nil, err
}
return client, nil
}

// NewSender returns a *servicebus.Sender associated with a Service Bus Client.
func NewSender(sbClient *servicebus.Client, topicName string, opts *servicebus.NewSenderOptions) (*servicebus.Sender, error) {
return sbClient.NewSender(topicName, opts)
Expand Down
51 changes: 40 additions & 11 deletions pubsub/azuresb/azuresb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"gocloud.dev/pubsub/driver"
"gocloud.dev/pubsub/drivertest"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
)
Expand All @@ -34,6 +35,7 @@ var (
// See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string.
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
connString = os.Getenv("SERVICEBUS_CONNECTION_STRING")
sbHostname = os.Getenv("AZURE_SERVICEBUS_HOSTNAME")
)

const (
Expand All @@ -56,8 +58,8 @@ type harness struct {
}

func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) {
if connString == "" {
return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING to run")
if connString == "" && sbHostname == "" {
return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run")
}
adminClient, err := admin.NewClientFromConnectionString(connString, nil)
if err != nil {
Expand Down Expand Up @@ -310,23 +312,50 @@ func deleteSubscription(ctx context.Context, topicName string, subscriptionName
return nil
}

// to run test using Azure Entra credentials:
// 1. grant access to ${AZURE_CLIENT_ID} to Service Bus namespace
// 2. run test:
// AZURE_CLIENT_SECRET='secret' \
// AZURE_CLIENT_ID=client_id_uud \
// AZURE_TENANT_ID=tenant_id_uuid \
// AZURE_SERVICEBUS_HOSTNAME=hostname go test -benchmem -run=^$ -bench ^BenchmarkAzureServiceBusPubSub$ gocloud.dev/pubsub/azuresb
func BenchmarkAzureServiceBusPubSub(b *testing.B) {
const (
benchmarkTopicName = "benchmark-topic"
benchmarkSubscriptionName = "benchmark-subscription"
)
ctx := context.Background()

if connString == "" {
b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING to run")
}
adminClient, err := admin.NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
var adminClient *admin.Client
var sbClient *servicebus.Client
var err error

if connString == "" && sbHostname == "" {
b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run")
}
sbClient, err := NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)

if connString != "" {
adminClient, err = admin.NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
}
sbClient, err = NewClientFromConnectionString(connString, nil)
if err != nil {
b.Fatal(err)
}
} else if sbHostname != "" {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
b.Fatal(err)
}
adminClient, err = admin.NewClient(sbHostname, cred, nil)
if err != nil {
b.Fatal(err)
}
sbClient, err = NewClientFromServiceBusHostname(sbHostname, nil)
if err != nil {
b.Fatal(err)
}
}

// Make topic.
Expand Down