Skip to content

Commit

Permalink
[APIT-892] Flink's Private Networking for ESKU (#2850)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgagniere authored Jul 9, 2024
1 parent 17aefb5 commit 0f3cd7c
Show file tree
Hide file tree
Showing 20 changed files with 199 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.10.0
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.8.0
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0
github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqH
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5 h1:dY7/VtRgRhB0wr/e+MxXDNchaCNvU54gm8ysKzpQFZ4=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5/go.mod h1:AaF39Acy3LFnHSHExaUtqNmbs7kL5/AL54CXX61+Il8=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.8.0 h1:H88FsS/dovseu4lhUnQ7hIfhy2Jy7/9VzFbypm0Dtl0=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.8.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEtc/k3K11PX7f4Fj7sPFdo=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0 h1:KQVHoct3guckqY2dbzmNouJRwtUfESAKCOjSfbNOuKU=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU=
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0 h1:ZUAow4L6De1FwYoiwvEodm4lvxc+46wNW+IEAb7K9VU=
Expand Down
1 change: 1 addition & 0 deletions internal/flink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd.AddCommand(c.newArtifactCommand())
}
cmd.AddCommand(c.newComputePoolCommand())
cmd.AddCommand(c.newConnectivityTypeCommand())
cmd.AddCommand(c.newRegionCommand())
cmd.AddCommand(c.newShellCommand(prerunner))
cmd.AddCommand(c.newStatementCommand())
Expand Down
18 changes: 18 additions & 0 deletions internal/flink/command_connectivity_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package flink

import (
"github.com/spf13/cobra"
)

var fields = []string{"private", "public"}

func (c *command) newConnectivityTypeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "connectivity-type",
Short: "Manage Flink connectivity type.",
}

cmd.AddCommand(c.newUseCommand())

return cmd
}
30 changes: 30 additions & 0 deletions internal/flink/command_connectivity_type_use.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package flink

import (
"github.com/spf13/cobra"

"github.com/confluentinc/cli/v3/pkg/errors"
"github.com/confluentinc/cli/v3/pkg/output"
"github.com/confluentinc/cli/v3/pkg/resource"
)

func (c *command) newUseCommand() *cobra.Command {
return &cobra.Command{
Use: "use <region-access>",
Short: "Select a Flink connectivity type.",
Long: "Select a Flink connectivity type for the current environment as \"public\" or \"private\". If unspecified, the CLI will default to the connectivity type that was set at the organization level.",
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
ValidArgs: fields,
RunE: c.ConnectivityTypeUse,
}
}
func (c *command) ConnectivityTypeUse(_ *cobra.Command, args []string) error {
if err := c.Context.SetCurrentFlinkAccessType(args[0]); err != nil {
return err
}
if err := c.Config.Save(); err != nil {
return err
}
output.Printf(c.Config.EnableColor, errors.UsingResourceMsg, resource.FlinkConnectivityType, args[0])
return nil
}
53 changes: 47 additions & 6 deletions pkg/cmd/authenticated_cli_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func (c *AuthenticatedCLICommand) GetFlinkGatewayClient(computePoolOnly bool) (*

if computePoolOnly {
if computePoolId := c.Context.GetCurrentFlinkComputePool(); computePoolId != "" {
url, err = c.getGatewayUrlForComputePool(computePoolId)
url, err = c.getGatewayUrlForComputePool(c.Context.GetCurrentFlinkAccessType(), computePoolId)
if err != nil {
return nil, err
}
} else {
return nil, errors.NewErrorWithSuggestions("no compute pool selected", "Select a compute pool with `confluent flink compute-pool use` or `--compute-pool`.")
}
} else if c.Context.GetCurrentFlinkRegion() != "" && c.Context.GetCurrentFlinkCloudProvider() != "" {
url, err = c.getGatewayUrlForRegion(c.Context.GetCurrentFlinkCloudProvider(), c.Context.GetCurrentFlinkRegion())
url, err = c.getGatewayUrlForRegion(c.Context.GetCurrentFlinkAccessType(), c.Context.GetCurrentFlinkCloudProvider(), c.Context.GetCurrentFlinkRegion())
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (c *AuthenticatedCLICommand) GetFlinkGatewayClient(computePoolOnly bool) (*
return c.flinkGatewayClient, nil
}

func (c *AuthenticatedCLICommand) getGatewayUrlForComputePool(id string) (string, error) {
func (c *AuthenticatedCLICommand) getGatewayUrlForComputePool(access, id string) (string, error) {
if c.Config.IsTest {
return testserver.TestFlinkGatewayUrl.String(), nil
}
Expand All @@ -104,19 +104,60 @@ func (c *AuthenticatedCLICommand) getGatewayUrlForComputePool(id string) (string
return "", err
}

return fmt.Sprintf("https://flink.%s.%s.%s", computePool.Spec.GetRegion(), strings.ToLower(computePool.Spec.GetCloud()), u.Host), nil
environmentId, err := c.Context.EnvironmentId()
if err != nil {
return "", err
}

privateURL := fmt.Sprintf("https://flink.%s.%s.private.%s", computePool.Spec.GetRegion(), strings.ToLower(computePool.Spec.GetCloud()), u.Host)
publicURL := fmt.Sprintf("https://flink.%s.%s.%s", computePool.Spec.GetRegion(), strings.ToLower(computePool.Spec.GetCloud()), u.Host)

if access == "private" {
return privateURL, nil
} else if access == "public" {
return publicURL, nil
} else {
list, err := c.V2Client.ListPrivateLinkAttachments(environmentId, nil, []string{"AWS"}, nil, []string{"READY"})
if err != nil {
return "", err
}
if len(list) > 0 {
return privateURL, nil
} else {
return publicURL, nil
}
}
}

func (c *AuthenticatedCLICommand) getGatewayUrlForRegion(provider, region string) (string, error) {
func (c *AuthenticatedCLICommand) getGatewayUrlForRegion(accessType, provider, region string) (string, error) {
regions, err := c.V2Client.ListFlinkRegions(provider)
if err != nil {
return "", err
}

environmentId, err := c.Context.EnvironmentId()
if err != nil {
return "", err
}

var hostUrl string
for _, flinkRegion := range regions {
if flinkRegion.GetRegionName() == region {
hostUrl = flinkRegion.GetHttpEndpoint()
if accessType == "public" {
hostUrl = flinkRegion.GetHttpEndpoint()
} else if accessType == "private" {
hostUrl = flinkRegion.GetPrivateHttpEndpoint()
} else {
list, err := c.V2Client.ListPrivateLinkAttachments(environmentId, nil, []string{"AWS"}, nil, []string{"READY"})
if err != nil {
return "", err
}
if len(list) > 0 {
hostUrl = flinkRegion.GetPrivateHttpEndpoint()
} else {
hostUrl = flinkRegion.GetHttpEndpoint()
}
}
break
}
}
Expand Down
33 changes: 26 additions & 7 deletions pkg/config/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Context struct {
Config *Config `json:"-"`
}

var noEnvError = "no environment found"

func newContext(name string, platform *Platform, credential *Credential, kafkaClusters map[string]*KafkaClusterConfig, kafka string, state *ContextState, config *Config, organizationId, environmentId string) (*Context, error) {
ctx := &Context{
Name: name,
Expand Down Expand Up @@ -191,7 +193,7 @@ func (c *Context) EnvironmentId() (string, error) {
return id, nil
}

return "", errors.NewErrorWithSuggestions("no environment found", "This issue may occur if this user has no valid role bindings. Contact an Organization Admin to create a role binding for this user.")
return "", errors.NewErrorWithSuggestions(noEnvError, "This issue may occur if this user has no valid role bindings. Contact an Organization Admin to create a role binding for this user.")
}

func (c *Context) GetCurrentEnvironment() string {
Expand Down Expand Up @@ -246,13 +248,23 @@ func (c *Context) GetCurrentFlinkComputePool() string {
func (c *Context) SetCurrentFlinkComputePool(id string) error {
ctx := c.GetCurrentEnvironmentContext()
if ctx == nil {
return fmt.Errorf("no environment found")
return fmt.Errorf(noEnvError)
}

ctx.CurrentFlinkComputePool = id
return nil
}

func (c *Context) SetCurrentFlinkAccessType(name string) error {
ctx := c.GetCurrentEnvironmentContext()
if ctx == nil {
return fmt.Errorf(noEnvError)
}

ctx.CurrentFlinkAccessType = name
return nil
}

func (c *Context) GetCurrentFlinkCloudProvider() string {
if ctx := c.GetCurrentEnvironmentContext(); ctx != nil {
return ctx.CurrentFlinkCloudProvider
Expand All @@ -263,7 +275,7 @@ func (c *Context) GetCurrentFlinkCloudProvider() string {
func (c *Context) SetCurrentFlinkCloudProvider(cloud string) error {
ctx := c.GetCurrentEnvironmentContext()
if ctx == nil {
return fmt.Errorf("no environment found")
return fmt.Errorf(noEnvError)
}

ctx.CurrentFlinkCloudProvider = cloud
Expand All @@ -279,7 +291,7 @@ func (c *Context) GetCurrentFlinkRegion() string {
func (c *Context) SetCurrentFlinkRegion(id string) error {
ctx := c.GetCurrentEnvironmentContext()
if ctx == nil {
return fmt.Errorf("no environment found")
return fmt.Errorf(noEnvError)
}

ctx.CurrentFlinkRegion = id
Expand All @@ -296,7 +308,7 @@ func (c *Context) GetCurrentFlinkCatalog() string {
func (c *Context) SetCurrentFlinkCatalog(id string) error {
ctx := c.GetCurrentEnvironmentContext()
if ctx == nil {
return fmt.Errorf("no environment found")
return fmt.Errorf(noEnvError)
}

ctx.CurrentFlinkCatalog = id
Expand All @@ -313,13 +325,20 @@ func (c *Context) GetCurrentFlinkDatabase() string {
func (c *Context) SetCurrentFlinkDatabase(id string) error {
ctx := c.GetCurrentEnvironmentContext()
if ctx == nil {
return fmt.Errorf("no environment found")
return fmt.Errorf(noEnvError)
}

ctx.CurrentFlinkDatabase = id
return nil
}

func (c *Context) GetCurrentFlinkAccessType() string {
if ctx := c.GetCurrentEnvironmentContext(); ctx != nil {
return ctx.CurrentFlinkAccessType
}
return ""
}

func (c *Context) GetCurrentServiceAccount() string {
if ctx := c.GetCurrentEnvironmentContext(); ctx != nil {
return ctx.CurrentServiceAccount
Expand All @@ -330,7 +349,7 @@ func (c *Context) GetCurrentServiceAccount() string {
func (c *Context) SetCurrentServiceAccount(id string) error {
ctx := c.GetCurrentEnvironmentContext()
if ctx == nil {
return fmt.Errorf("no environment found")
return fmt.Errorf(noEnvError)
}

ctx.CurrentServiceAccount = id
Expand Down
1 change: 1 addition & 0 deletions pkg/config/environment_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ type EnvironmentContext struct {
CurrentFlinkDatabase string `json:"current_flink_database,omitempty"`
CurrentFlinkRegion string `json:"current_flink_region,omitempty"`
CurrentServiceAccount string `json:"current_service_account,omitempty"`
CurrentFlinkAccessType string `json:"current_flink_access_type,omitempty"`
}
1 change: 1 addition & 0 deletions pkg/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
DnsRecord = "DNS record"
Environment = "environment"
Flink = "flink"
FlinkConnectivityType = "Flink connectivity type"
FlinkComputePool = "Flink compute pool"
FlinkRegion = "Flink region"
FlinkStatement = "Flink SQL statement"
Expand Down
14 changes: 14 additions & 0 deletions test/fixtures/output/flink/connectivity-type/help.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Manage Flink connectivity type.

Usage:
confluent flink connectivity-type [command]

Available Commands:
use Select a Flink connectivity type.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Use "confluent flink connectivity-type [command] --help" for more information about a command.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: Get "http://127.0.0.1:1028/sql/v1/organizations/abc-123/environments/env-596/statements?page_size=100&spec.compute_pool_id=lfcp-123456": GET http://127.0.0.1:1028/sql/v1/organizations/abc-123/environments/env-596/statements?page_size=100&spec.compute_pool_id=lfcp-123456 giving up after 1 attempt(s): Get "http://127.0.0.1:1028/sql/v1/organizations/abc-123/environments/env-596/statements?page_size=100&spec.compute_pool_id=lfcp-123456": dial tcp 127.0.0.1:1028: connectex: No connection could be made because the target machine actively refused it.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: Get "http://127.0.0.1:1028/sql/v1/organizations/abc-123/environments/env-596/statements?page_size=100&spec.compute_pool_id=lfcp-123456": GET http://127.0.0.1:1028/sql/v1/organizations/abc-123/environments/env-596/statements?page_size=100&spec.compute_pool_id=lfcp-123456 giving up after 1 attempt(s): Get "http://127.0.0.1:1028/sql/v1/organizations/abc-123/environments/env-596/statements?page_size=100&spec.compute_pool_id=lfcp-123456": dial tcp 127.0.0.1:1028: connect: connection refused
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Creation Date | Name | Statement | Compute Pool | Status | Status Detail
--------------------------------+----------------------+--------------------+--------------+-----------+-----------------------------
2022-01-01 00:00:00 +0000 UTC | 22222222-2222-2222-2 | CREATE TABLE test; | lfcp-123456 | COMPLETED | SQL statement is completed
2023-01-01 00:00:00 +0000 UTC | 11111111-1111-1111-1 | CREATE TABLE test; | lfcp-123456 | COMPLETED | SQL statement is completed
9 changes: 9 additions & 0 deletions test/fixtures/output/flink/connectivity-type/use-help.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Select a Flink connectivity type for the current environment as "public" or "private". If unspecified, the CLI will default to the connectivity type that was set at the organization level.

Usage:
confluent flink connectivity-type use <region-access> [flags]

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Using Flink connectivity type "private".
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Using Flink connectivity type "public".
11 changes: 6 additions & 5 deletions test/fixtures/output/flink/help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ Usage:
confluent flink [command]

Available Commands:
artifact Manage Flink UDF artifacts.
compute-pool Manage Flink compute pools.
region List Flink regions.
shell Start Flink interactive SQL client.
statement Manage Flink SQL statements.
artifact Manage Flink UDF artifacts.
compute-pool Manage Flink compute pools.
connectivity-type Manage Flink connectivity type.
region List Flink regions.
shell Start Flink interactive SQL client.
statement Manage Flink SQL statements.

Global Flags:
-h, --help Show help for this command.
Expand Down
22 changes: 22 additions & 0 deletions test/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -64,6 +65,27 @@ func (s *CLITestSuite) TestFlinkComputePool() {
}
}

func (s *CLITestSuite) TestFlinkConnectivityType() {
listPrivateFixture := "flink/connectivity-type/list-private.golden"
if runtime.GOOS == "windows" { // Error message is different on Windows
listPrivateFixture = "flink/connectivity-type/list-private-windows.golden"
}

tests := []CLITest{
{args: "flink connectivity-type use public", fixture: "flink/connectivity-type/use-public.golden"},
{args: "flink statement list --cloud aws --region eu-west-1", fixture: "flink/connectivity-type/list-public.golden"},
{args: "flink connectivity-type use private", fixture: "flink/connectivity-type/use-private.golden"},
// Checking that the private endpoint is getting hit. The error here tells us that we are not using the public URL anymore.
{args: "flink statement list --cloud aws --region eu-west-1", fixture: listPrivateFixture, exitCode: 1},
}

for _, test := range tests {
test.login = "cloud"
test.workflow = true
s.runIntegrationTest(test)
}
}

func (s *CLITestSuite) TestFlinkComputePoolDelete() {
tests := []CLITest{
{args: "flink compute-pool delete lfcp-123456 --force", fixture: "flink/compute-pool/delete.golden"},
Expand Down
13 changes: 7 additions & 6 deletions test/test-server/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (

var (
// TestCloudUrl is used to hardcode a specific port (1024) so tests can identify CCloud URLs
TestCloudUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1024"}
TestV2CloudUrl = url.URL{Scheme: "http", Host: "127.0.0.1:2048"}
TestHubUrl = url.URL{Scheme: "http", Host: "127.0.0.1:4096"}
TestKafkaRestProxyUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1025"}
TestFlinkGatewayUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1026"}
TestSchemaRegistryUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1027"}
TestCloudUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1024"}
TestV2CloudUrl = url.URL{Scheme: "http", Host: "127.0.0.1:2048"}
TestHubUrl = url.URL{Scheme: "http", Host: "127.0.0.1:4096"}
TestKafkaRestProxyUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1025"}
TestFlinkGatewayUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1026"}
TestFlinkGatewayUrlPrivate = url.URL{Scheme: "http", Host: "127.0.0.1:1028"}
TestSchemaRegistryUrl = url.URL{Scheme: "http", Host: "127.0.0.1:1027"}
)

// TestBackend consists of the servers for necessary mocked backend services
Expand Down
Loading

0 comments on commit 0f3cd7c

Please sign in to comment.