From 6086d94ed9ec3f007539bb773633b4a57362a86f Mon Sep 17 00:00:00 2001 From: Panagiotis Garefalakis Date: Thu, 5 Sep 2024 10:32:57 -0700 Subject: [PATCH] [FRT-521] Integrate Confluent CLI with new Flink Artifact API (#2868) --- go.mod | 1 + go.sum | 2 + internal/flink/command.go | 5 +- internal/flink/command_artifact.go | 28 ++- internal/flink/command_artifact_create.go | 73 +++++--- internal/flink/command_artifact_delete.go | 40 +++-- internal/flink/command_artifact_describe.go | 19 +- internal/flink/command_artifact_list.go | 49 ++++-- pkg/ccloudv2/client.go | 3 + pkg/ccloudv2/flink_artifact.go | 87 +++++++++ pkg/resource/resource.go | 1 + .../output/flink/artifact/create-help.golden | 5 +- .../flink/artifact/create-python.golden | 16 +- .../output/flink/artifact/create.golden | 16 +- .../output/flink/artifact/delete-help.golden | 7 + .../flink/artifact/delete-prompt.golden | 2 +- .../output/flink/artifact/delete.golden | 2 +- .../flink/artifact/describe-help.golden | 4 +- .../output/flink/artifact/describe.golden | 16 +- .../output/flink/artifact/list-help.golden | 7 + .../output/flink/artifact/list.golden | 7 +- test/flink_test.go | 14 +- test/test-server/ccloudv2_router.go | 3 + test/test-server/flink_artifact_handler.go | 165 ++++++++++++++++++ 24 files changed, 484 insertions(+), 88 deletions(-) create mode 100644 pkg/ccloudv2/flink_artifact.go create mode 100644 test/test-server/flink_artifact_handler.go diff --git a/go.mod b/go.mod index 2a7db7cb99..0e68132ecd 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.10.0 github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5 + github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0 github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0 github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0 diff --git a/go.sum b/go.sum index 4c5059e11c..2edf0e4cbf 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5 h1:dY7/VtR github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5/go.mod h1:AaF39Acy3LFnHSHExaUtqNmbs7kL5/AL54CXX61+Il8= github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEtc/k3K11PX7f4Fj7sPFdo= github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48= +github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0 h1:2QuFhvrfU4AdxyfWWPFY0fqEg8p8wmKFfC6N+35pxHg= +github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0/go.mod h1:cl7LEL6bFgiXQ+8sEZvo3BrYZxDOvGkx4jV7eX1ssN4= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0 h1:KQVHoct3guckqY2dbzmNouJRwtUfESAKCOjSfbNOuKU= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0 h1:ZUAow4L6De1FwYoiwvEodm4lvxc+46wNW+IEAb7K9VU= diff --git a/internal/flink/command.go b/internal/flink/command.go index 9c4f8ebd30..598531f4da 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -7,7 +7,6 @@ import ( pcmd "github.com/confluentinc/cli/v3/pkg/cmd" "github.com/confluentinc/cli/v3/pkg/config" - "github.com/confluentinc/cli/v3/pkg/featureflags" ) type command struct { @@ -23,9 +22,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { c := &command{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)} - if cfg.IsTest || featureflags.Manager.BoolVariation("cli.flink_artifact.early_access", cfg.Context(), config.CliLaunchDarklyClient, true, false) { - cmd.AddCommand(c.newArtifactCommand()) - } + cmd.AddCommand(c.newArtifactCommand()) cmd.AddCommand(c.newComputePoolCommand()) cmd.AddCommand(c.newConnectivityTypeCommand()) cmd.AddCommand(c.newRegionCommand()) diff --git a/internal/flink/command_artifact.go b/internal/flink/command_artifact.go index b0fff9bfab..5b81373715 100644 --- a/internal/flink/command_artifact.go +++ b/internal/flink/command_artifact.go @@ -3,17 +3,21 @@ package flink import ( "github.com/spf13/cobra" - connectcustompluginv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin/v1" + flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1" pcmd "github.com/confluentinc/cli/v3/pkg/cmd" "github.com/confluentinc/cli/v3/pkg/output" ) type flinkArtifactOut struct { - Id string `human:"ID" serialized:"id" ` - Name string `human:"Name" serialized:"name" ` - Version string `human:"Version" serialized:"version" ` - ContentFormat string `human:"Content Format" serialized:"content_format" ` + Id string `human:"ID" serialized:"id"` + Name string `human:"Name" serialized:"name"` + Version string `human:"Version" serialized:"version"` + Class string `human:"Class" serialized:"class"` + Cloud string `human:"Cloud" serialized:"cloud"` + Region string `human:"Region" serialized:"region"` + Environment string `human:"Environment" serialized:"environment"` + ContentFormat string `human:"Content Format" serialized:"content_format"` } func (c *command) newArtifactCommand() *cobra.Command { @@ -31,12 +35,22 @@ func (c *command) newArtifactCommand() *cobra.Command { return cmd } -func printTable(cmd *cobra.Command, plugin connectcustompluginv1.ConnectV1CustomConnectorPlugin) error { +func printTable(cmd *cobra.Command, plugin flinkartifactv1.ArtifactV1FlinkArtifact) error { table := output.NewTable(cmd) + + var pluginVersion = "" + if len(plugin.GetVersions()) > 0 { + pluginVersion = (*plugin.Versions)[0].GetVersion() + } + table.Add(&flinkArtifactOut{ Name: plugin.GetDisplayName(), Id: plugin.GetId(), - Version: plugin.GetConnectorClass(), + Version: pluginVersion, + Class: plugin.GetClass(), + Cloud: plugin.GetCloud(), + Region: plugin.GetRegion(), + Environment: plugin.GetEnvironment(), ContentFormat: plugin.GetContentFormat(), }) diff --git a/internal/flink/command_artifact_create.go b/internal/flink/command_artifact_create.go index 74ec8b0cb0..aaeb0c608b 100644 --- a/internal/flink/command_artifact_create.go +++ b/internal/flink/command_artifact_create.go @@ -8,7 +8,7 @@ import ( "github.com/spf13/cobra" - connectcustompluginv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin/v1" + flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1" pcmd "github.com/confluentinc/cli/v3/pkg/cmd" "github.com/confluentinc/cli/v3/pkg/examples" @@ -21,11 +21,15 @@ var ( allowedFileExtensions = []string{"jar", "zip"} ) -type pluginCreateOut struct { +type artifactCreateOut struct { Id string `human:"ID" serialized:"id"` Name string `human:"Name" serialized:"name"` Version string `human:"Version" serialized:"version"` + Class string `human:"Class" serialized:"class"` ContentFormat string `human:"Content Format" serialized:"content_format"` + Cloud string `human:"Cloud" serialized:"cloud"` + Region string `human:"Region" serialized:"region"` + Environment string `human:"Environment" serialized:"environment"` ErrorTrace string `human:"Error Trace,omitempty" serialized:"error_trace,omitempty"` } @@ -38,18 +42,23 @@ func (c *command) newCreateCommand() *cobra.Command { Example: examples.BuildExampleString( examples.Example{ Text: `Create Flink artifact "my-flink-artifact".`, - Code: "confluent flink artifact create my-flink-artifact --artifact-file plugin.jar", + Code: "confluent flink artifact create my-flink-artifact --artifact-file plugin.jar --cloud aws --region us-west-2 --environment env-123456", }, ), } cmd.Flags().String("artifact-file", "", "Flink artifact JAR file or ZIP file.") + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) cmd.Flags().String("runtime-language", "java", fmt.Sprintf("Specify the Flink artifact runtime language as %s.", utils.ArrayToCommaDelimitedString(allowedRuntimeLanguages, "or"))) cmd.Flags().String("description", "", "Description of Flink artifact.") pcmd.AddContextFlag(cmd, c.CLICommand) pcmd.AddOutputFlag(cmd) cobra.CheckErr(cmd.MarkFlagRequired("artifact-file")) + cobra.CheckErr(cmd.MarkFlagRequired("cloud")) + cobra.CheckErr(cmd.MarkFlagRequired("region")) cobra.CheckErr(cmd.MarkFlagFilename("artifact-file", "zip", "jar")) return cmd @@ -57,11 +66,19 @@ func (c *command) newCreateCommand() *cobra.Command { func (c *command) createArtifact(cmd *cobra.Command, args []string) error { displayName := args[0] - description, err := cmd.Flags().GetString("description") + artifactFile, err := cmd.Flags().GetString("artifact-file") if err != nil { return err } - artifactFile, err := cmd.Flags().GetString("artifact-file") + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + region, err := cmd.Flags().GetString("region") + if err != nil { + return err + } + environment, err := c.Context.EnvironmentId() if err != nil { return err } @@ -69,17 +86,23 @@ func (c *command) createArtifact(cmd *cobra.Command, args []string) error { if err != nil { return err } + description, err := cmd.Flags().GetString("description") + if err != nil { + return err + } extension := strings.TrimPrefix(filepath.Ext(artifactFile), ".") if !slices.Contains(allowedFileExtensions, strings.ToLower(extension)) { return fmt.Errorf("only extensions allowed for `--artifact-file` are %s", utils.ArrayToCommaDelimitedString(allowedFileExtensions, "and")) } - request := connectcustompluginv1.ConnectV1PresignedUrlRequest{ - ContentFormat: connectcustompluginv1.PtrString(extension), + request := flinkartifactv1.ArtifactV1PresignedUrlRequest{ + ContentFormat: flinkartifactv1.PtrString(extension), + Cloud: flinkartifactv1.PtrString(cloud), + Region: flinkartifactv1.PtrString(region), } - resp, err := c.V2Client.GetPresignedUrl(request) + resp, err := c.V2Client.GetFlinkPresignedUrl(request) if err != nil { return err } @@ -88,29 +111,39 @@ func (c *command) createArtifact(cmd *cobra.Command, args []string) error { return err } - createArtifactRequest := connectcustompluginv1.ConnectV1CustomConnectorPlugin{ - DisplayName: connectcustompluginv1.PtrString(displayName), - Description: connectcustompluginv1.PtrString(description), - ConnectorType: connectcustompluginv1.PtrString("flink_udf"), - UploadSource: &connectcustompluginv1.ConnectV1CustomConnectorPluginUploadSourceOneOf{ - ConnectV1UploadSourcePresignedUrl: &connectcustompluginv1.ConnectV1UploadSourcePresignedUrl{ - Location: "PRESIGNED_URL_LOCATION", - UploadId: resp.GetUploadId(), + createArtifactRequest := flinkartifactv1.InlineObject{ + DisplayName: displayName, + Cloud: cloud, + Region: region, + Description: flinkartifactv1.PtrString(description), + UploadSource: flinkartifactv1.InlineObjectUploadSourceOneOf{ + ArtifactV1UploadSourcePresignedUrl: &flinkartifactv1.ArtifactV1UploadSourcePresignedUrl{ + Location: flinkartifactv1.PtrString("PRESIGNED_URL_LOCATION"), + UploadId: flinkartifactv1.PtrString(resp.GetUploadId()), }, }, - RuntimeLanguage: connectcustompluginv1.PtrString(runtimeLanguage), + RuntimeLanguage: flinkartifactv1.PtrString(runtimeLanguage), } - plugin, err := c.V2Client.CreateCustomPlugin(createArtifactRequest) + plugin, err := c.V2Client.CreateFlinkArtifact(createArtifactRequest) if err != nil { return err } + var pluginVersion = "" + if len(plugin.GetVersions()) > 0 { + pluginVersion = (*plugin.Versions)[0].GetVersion() + } + table := output.NewTable(cmd) - table.Add(&pluginCreateOut{ + table.Add(&artifactCreateOut{ Name: plugin.GetDisplayName(), Id: plugin.GetId(), - Version: plugin.GetConnectorClass(), + Version: pluginVersion, + Class: plugin.GetClass(), + Cloud: cloud, + Region: region, + Environment: environment, ContentFormat: plugin.GetContentFormat(), }) return table.Print() diff --git a/internal/flink/command_artifact_delete.go b/internal/flink/command_artifact_delete.go index 1d33929c42..e0753760de 100644 --- a/internal/flink/command_artifact_delete.go +++ b/internal/flink/command_artifact_delete.go @@ -5,6 +5,7 @@ import ( pcmd "github.com/confluentinc/cli/v3/pkg/cmd" "github.com/confluentinc/cli/v3/pkg/deletion" + "github.com/confluentinc/cli/v3/pkg/examples" "github.com/confluentinc/cli/v3/pkg/resource" ) @@ -14,47 +15,66 @@ func (c *command) newDeleteCommand() *cobra.Command { Short: "Delete one or more Flink UDF artifacts.", Args: cobra.MinimumNArgs(1), RunE: c.delete, + Example: examples.BuildExampleString( + examples.Example{ + Text: "Delete Flink UDF artifact.", + Code: "confluent flink artifact delete --cloud aws --region us-west-2 cfa-123456", + }, + ), } + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) pcmd.AddForceFlag(cmd) pcmd.AddContextFlag(cmd, c.CLICommand) + cobra.CheckErr(cmd.MarkFlagRequired("cloud")) + cobra.CheckErr(cmd.MarkFlagRequired("region")) + return cmd } func (c *command) delete(cmd *cobra.Command, args []string) error { - pluginIdToName, err := c.mapPluginIdToName() + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + region, err := cmd.Flags().GetString("region") if err != nil { return err } existenceFunc := func(id string) bool { - _, ok := pluginIdToName[id] + artifactIdToName, err := c.mapArtifactIdToName(cloud, region) + if err != nil { + return false + } + _, ok := artifactIdToName[id] return ok } - if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.CustomConnectorPlugin); err != nil { + if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkArtifact); err != nil { return err } deleteFunc := func(id string) error { - return c.V2Client.DeleteCustomPlugin(id) + return c.V2Client.DeleteFlinkArtifact(cloud, region, id) } - _, err = deletion.Delete(args, deleteFunc, resource.CustomConnectorPlugin) + _, err = deletion.Delete(args, deleteFunc, resource.FlinkArtifact) return err } -func (c *command) mapPluginIdToName() (map[string]string, error) { - plugins, err := c.V2Client.ListCustomPlugins("") +func (c *command) mapArtifactIdToName(cloud string, region string) (map[string]string, error) { + plugins, err := c.V2Client.ListFlinkArtifacts(cloud, region, "") if err != nil { return nil, err } - pluginIdToName := make(map[string]string) + artifactIdToName := make(map[string]string) for _, plugin := range plugins { - pluginIdToName[plugin.GetId()] = plugin.GetDisplayName() + artifactIdToName[plugin.GetId()] = plugin.GetDisplayName() } - return pluginIdToName, nil + return artifactIdToName, nil } diff --git a/internal/flink/command_artifact_describe.go b/internal/flink/command_artifact_describe.go index ce74d9c485..c026296790 100644 --- a/internal/flink/command_artifact_describe.go +++ b/internal/flink/command_artifact_describe.go @@ -16,19 +16,34 @@ func (c *command) newDescribeCommand() *cobra.Command { Example: examples.BuildExampleString( examples.Example{ Text: "Describe Flink UDF artifact.", - Code: "confluent flink artifact describe ccp-123456", + Code: "confluent flink artifact describe --cloud aws --region us-west-2 cfa-123456", }, ), } + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) pcmd.AddOutputFlag(cmd) + cobra.CheckErr(cmd.MarkFlagRequired("cloud")) + cobra.CheckErr(cmd.MarkFlagRequired("region")) + return cmd } func (c *command) describe(cmd *cobra.Command, args []string) error { - plugin, err := c.V2Client.DescribeCustomPlugin(args[0]) + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + + region, err := cmd.Flags().GetString("region") + if err != nil { + return err + } + + plugin, err := c.V2Client.DescribeFlinkArtifact(cloud, region, args[0]) if err != nil { return err } diff --git a/internal/flink/command_artifact_list.go b/internal/flink/command_artifact_list.go index 89cac6e99a..002a63dd58 100644 --- a/internal/flink/command_artifact_list.go +++ b/internal/flink/command_artifact_list.go @@ -1,17 +1,19 @@ package flink import ( - "strings" - "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v3/pkg/cmd" + "github.com/confluentinc/cli/v3/pkg/examples" "github.com/confluentinc/cli/v3/pkg/output" ) -type customPluginOutList struct { - Id string `human:"ID" serialized:"id"` - Name string `human:"Name" serialized:"name"` +type artifactOutList struct { + Id string `human:"ID" serialized:"id"` + Name string `human:"Name" serialized:"name"` + Cloud string `human:"Cloud" serialized:"cloud"` + Region string `human:"Region" serialized:"region"` + Environment string `human:"Environment" serialized:"environment"` } func (c *command) newListCommand() *cobra.Command { @@ -20,28 +22,49 @@ func (c *command) newListCommand() *cobra.Command { Short: "List Flink UDF artifacts.", Args: cobra.NoArgs, RunE: c.list, + Example: examples.BuildExampleString( + examples.Example{ + Text: "List Flink UDF artifacts.", + Code: "confluent flink artifact list --cloud aws --region us-west-2", + }, + ), } + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) pcmd.AddOutputFlag(cmd) + cobra.CheckErr(cmd.MarkFlagRequired("cloud")) + cobra.CheckErr(cmd.MarkFlagRequired("region")) + return cmd } func (c *command) list(cmd *cobra.Command, _ []string) error { - plugins, err := c.V2Client.ListCustomPlugins("") + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + region, err := cmd.Flags().GetString("region") + if err != nil { + return err + } + + artifacts, err := c.V2Client.ListFlinkArtifacts(cloud, region, "") if err != nil { return err } list := output.NewList(cmd) - for _, plugin := range plugins { - if strings.HasPrefix(plugin.GetConnectorType(), "flink") { - list.Add(&customPluginOutList{ - Name: plugin.GetDisplayName(), - Id: plugin.GetId(), - }) - } + for _, artifact := range artifacts { + list.Add(&artifactOutList{ + Name: artifact.GetDisplayName(), + Id: artifact.GetId(), + Cloud: artifact.GetCloud(), + Region: artifact.GetRegion(), + Environment: artifact.GetEnvironment(), + }) } return list.Print() } diff --git a/pkg/ccloudv2/client.go b/pkg/ccloudv2/client.go index 7d42d59c28..bb22c87fe4 100644 --- a/pkg/ccloudv2/client.go +++ b/pkg/ccloudv2/client.go @@ -11,6 +11,7 @@ import ( cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2" connectcustompluginv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin/v1" connectv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect/v1" + flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1" flinkv2 "github.com/confluentinc/ccloud-sdk-go-v2/flink/v2" iamv2 "github.com/confluentinc/ccloud-sdk-go-v2/iam/v2" identityproviderv2 "github.com/confluentinc/ccloud-sdk-go-v2/identity-provider/v2" @@ -46,6 +47,7 @@ type Client struct { CmkClient *cmkv2.APIClient ConnectClient *connectv1.APIClient ConnectCustomPluginClient *connectcustompluginv1.APIClient + FlinkArtifactClient *flinkartifactv1.APIClient FlinkClient *flinkv2.APIClient IamClient *iamv2.APIClient IdentityProviderClient *identityproviderv2.APIClient @@ -87,6 +89,7 @@ func NewClient(cfg *config.Config, unsafeTrace bool) *Client { CmkClient: newCmkClient(httpClient, url, userAgent, unsafeTrace), ConnectClient: newConnectClient(httpClient, url, userAgent, unsafeTrace), ConnectCustomPluginClient: newConnectCustomPluginClient(httpClient, url, userAgent, unsafeTrace), + FlinkArtifactClient: newFlinkArtifactClient(httpClient, url, userAgent, unsafeTrace), FlinkClient: newFlinkClient(httpClient, url, userAgent, unsafeTrace), IamClient: newIamClient(httpClient, url, userAgent, unsafeTrace), IdentityProviderClient: newIdentityProviderClient(httpClient, url, userAgent, unsafeTrace), diff --git a/pkg/ccloudv2/flink_artifact.go b/pkg/ccloudv2/flink_artifact.go new file mode 100644 index 0000000000..061628113b --- /dev/null +++ b/pkg/ccloudv2/flink_artifact.go @@ -0,0 +1,87 @@ +package ccloudv2 + +import ( + "context" + "net/http" + + flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1" + + "github.com/confluentinc/cli/v3/pkg/errors" +) + +func newFlinkArtifactClient(httpClient *http.Client, url, userAgent string, unsafeTrace bool) *flinkartifactv1.APIClient { + cfg := flinkartifactv1.NewConfiguration() + cfg.Debug = unsafeTrace + cfg.HTTPClient = httpClient + cfg.Servers = flinkartifactv1.ServerConfigurations{{URL: url}} + cfg.UserAgent = userAgent + + return flinkartifactv1.NewAPIClient(cfg) +} + +func (c *Client) flinkArtifactApiContext() context.Context { + return context.WithValue(context.Background(), flinkartifactv1.ContextAccessToken, c.cfg.Context().GetAuthToken()) +} + +func (c *Client) GetFlinkPresignedUrl(request flinkartifactv1.ArtifactV1PresignedUrlRequest) (flinkartifactv1.ArtifactV1PresignedUrl, error) { + resp, httpResp, err := c.FlinkArtifactClient.PresignedUrlsArtifactV1Api.PresignedUploadUrlArtifactV1PresignedUrl(c.flinkArtifactApiContext()).ArtifactV1PresignedUrlRequest(request).Execute() + return resp, errors.CatchCCloudV2Error(err, httpResp) +} + +func (c *Client) CreateFlinkArtifact(createFlinkArtifactRequest flinkartifactv1.InlineObject) (flinkartifactv1.ArtifactV1FlinkArtifact, error) { + resp, httpResp, err := c.FlinkArtifactClient.FlinkArtifactsArtifactV1Api.CreateArtifactV1FlinkArtifact(c.flinkArtifactApiContext()). + Cloud(createFlinkArtifactRequest.Cloud).Region(createFlinkArtifactRequest.Region).InlineObject(createFlinkArtifactRequest).Execute() + return resp, errors.CatchCCloudV2Error(err, httpResp) +} + +func (c *Client) ListFlinkArtifacts(cloud string, region string, env string) ([]flinkartifactv1.ArtifactV1FlinkArtifact, error) { + var list []flinkartifactv1.ArtifactV1FlinkArtifact + + done := false + pageToken := "" + for !done { + page, httpResp, err := c.executeListArtifacts(pageToken, cloud, region, env) + if err != nil { + return nil, errors.CatchCCloudV2Error(err, httpResp) + } + list = append(list, page.GetData()...) + + pageToken, done, err = extractNextPageToken(page.GetMetadata().Next) + if err != nil { + return nil, err + } + } + return list, nil +} + +func (c *Client) DescribeFlinkArtifact(cloud string, region string, id string) (flinkartifactv1.ArtifactV1FlinkArtifact, error) { + resp, httpResp, err := c.FlinkArtifactClient.FlinkArtifactsArtifactV1Api.GetArtifactV1FlinkArtifact(c.flinkArtifactApiContext(), id).Cloud(cloud).Region(region).Execute() + return resp, errors.CatchCCloudV2Error(err, httpResp) +} + +func (c *Client) DeleteFlinkArtifact(cloud string, region string, id string) error { + httpResp, err := c.FlinkArtifactClient.FlinkArtifactsArtifactV1Api.DeleteArtifactV1FlinkArtifact(c.flinkArtifactApiContext(), id).Cloud(cloud).Region(region).Execute() + return errors.CatchCCloudV2Error(err, httpResp) +} + +func (c *Client) UpdateFlinkArtifact(id string, updateRequest flinkartifactv1.ArtifactV1FlinkArtifactUpdate) (flinkartifactv1.ArtifactV1FlinkArtifact, error) { + resp, httpResp, err := c.FlinkArtifactClient.FlinkArtifactsArtifactV1Api.UpdateArtifactV1FlinkArtifact(c.flinkArtifactApiContext(), id).ArtifactV1FlinkArtifactUpdate(updateRequest).Execute() + return resp, errors.CatchCCloudV2Error(err, httpResp) +} + +func (c *Client) executeListArtifacts(pageToken, cloud string, region string, env string) (flinkartifactv1.ArtifactV1FlinkArtifactList, *http.Response, error) { + req := c.FlinkArtifactClient.FlinkArtifactsArtifactV1Api.ListArtifactV1FlinkArtifacts(c.flinkArtifactApiContext()).PageSize(ccloudV2ListPageSize) + if pageToken != "" { + req = req.PageToken(pageToken) + } + if cloud != "" { + req = req.Cloud(cloud) + } + if region != "" { + req = req.Region(region) + } + if env != "" { + req = req.Environment(env) + } + return req.Execute() +} diff --git a/pkg/resource/resource.go b/pkg/resource/resource.go index 3fe00ae005..d17bf874cf 100644 --- a/pkg/resource/resource.go +++ b/pkg/resource/resource.go @@ -32,6 +32,7 @@ const ( DnsRecord = "DNS record" Environment = "environment" Flink = "flink" + FlinkArtifact = "Flink artifact" FlinkConnectivityType = "Flink connectivity type" FlinkComputePool = "Flink compute pool" FlinkRegion = "Flink region" diff --git a/test/fixtures/output/flink/artifact/create-help.golden b/test/fixtures/output/flink/artifact/create-help.golden index 1be41bf4fe..c62336b596 100644 --- a/test/fixtures/output/flink/artifact/create-help.golden +++ b/test/fixtures/output/flink/artifact/create-help.golden @@ -6,10 +6,13 @@ Usage: Examples: Create Flink artifact "my-flink-artifact". - $ confluent flink artifact create my-flink-artifact --artifact-file plugin.jar + $ confluent flink artifact create my-flink-artifact --artifact-file plugin.jar --cloud aws --region us-west-2 --environment env-123456 Flags: --artifact-file string REQUIRED: Flink artifact JAR file or ZIP file. + --cloud string REQUIRED: Specify the cloud provider as "aws", "azure", or "gcp". + --region string REQUIRED: Cloud region for Flink (use "confluent flink region list" to see all). + --environment string Environment ID. --runtime-language string Specify the Flink artifact runtime language as "python" or "java". (default "java") --description string Description of Flink artifact. --context string CLI context name. diff --git a/test/fixtures/output/flink/artifact/create-python.golden b/test/fixtures/output/flink/artifact/create-python.golden index 078a874b6b..3aa148e0db 100644 --- a/test/fixtures/output/flink/artifact/create-python.golden +++ b/test/fixtures/output/flink/artifact/create-python.golden @@ -1,6 +1,10 @@ -+----------------+-------------------------+ -| ID | ccp-789012 | -| Name | my-custom-python-plugin | -| Version | ver-789012 | -| Content Format | ZIP | -+----------------+-------------------------+ ++----------------+----------------------------------+ +| ID | cfa-789012 | +| Name | my-flink-python-artifact | +| Version | 1.0.0 | +| Class | io.confluent.flink.example2.test | +| Content Format | ZIP | +| Cloud | aws | +| Region | us-west-2 | +| Environment | env-789012 | ++----------------+----------------------------------+ diff --git a/test/fixtures/output/flink/artifact/create.golden b/test/fixtures/output/flink/artifact/create.golden index 28df453f3f..20bd277fff 100644 --- a/test/fixtures/output/flink/artifact/create.golden +++ b/test/fixtures/output/flink/artifact/create.golden @@ -1,6 +1,10 @@ -+----------------+------------------+ -| ID | ccp-123456 | -| Name | my-custom-plugin | -| Version | ver-123456 | -| Content Format | JAR | -+----------------+------------------+ ++----------------+----------------------------------+ +| ID | cfa-123456 | +| Name | my-flink-artifact | +| Version | 1.0.0 | +| Class | io.confluent.flink.example1.test | +| Content Format | JAR | +| Cloud | aws | +| Region | us-west-2 | +| Environment | env-123456 | ++----------------+----------------------------------+ diff --git a/test/fixtures/output/flink/artifact/delete-help.golden b/test/fixtures/output/flink/artifact/delete-help.golden index acf4d65a6a..77577906f2 100644 --- a/test/fixtures/output/flink/artifact/delete-help.golden +++ b/test/fixtures/output/flink/artifact/delete-help.golden @@ -3,7 +3,14 @@ Delete one or more Flink UDF artifacts. Usage: confluent flink artifact delete [id-2] ... [id-n] [flags] +Examples: +Delete Flink UDF artifact. + + $ confluent flink artifact delete --cloud aws --region us-west-2 cfa-123456 + Flags: + --cloud string REQUIRED: Specify the cloud provider as "aws", "azure", or "gcp". + --region string REQUIRED: Cloud region for Flink (use "confluent flink region list" to see all). --force Skip the deletion confirmation prompt. --context string CLI context name. diff --git a/test/fixtures/output/flink/artifact/delete-prompt.golden b/test/fixtures/output/flink/artifact/delete-prompt.golden index 33431f84c2..8a57c40fa9 100644 --- a/test/fixtures/output/flink/artifact/delete-prompt.golden +++ b/test/fixtures/output/flink/artifact/delete-prompt.golden @@ -1 +1 @@ -Are you sure you want to delete custom connector plugin "ccp-123456"? (y/n): Deleted custom connector plugin "ccp-123456". +Are you sure you want to delete Flink artifact "cfa-123456"? (y/n): Deleted Flink artifact "cfa-123456". diff --git a/test/fixtures/output/flink/artifact/delete.golden b/test/fixtures/output/flink/artifact/delete.golden index ee5aa43296..a3fc4fb902 100644 --- a/test/fixtures/output/flink/artifact/delete.golden +++ b/test/fixtures/output/flink/artifact/delete.golden @@ -1 +1 @@ -Deleted custom connector plugin "ccp-123456". +Deleted Flink artifact "cfa-123456". diff --git a/test/fixtures/output/flink/artifact/describe-help.golden b/test/fixtures/output/flink/artifact/describe-help.golden index 7bb9c140b9..60bf34382f 100644 --- a/test/fixtures/output/flink/artifact/describe-help.golden +++ b/test/fixtures/output/flink/artifact/describe-help.golden @@ -6,9 +6,11 @@ Usage: Examples: Describe Flink UDF artifact. - $ confluent flink artifact describe ccp-123456 + $ confluent flink artifact describe --cloud aws --region us-west-2 cfa-123456 Flags: + --cloud string REQUIRED: Specify the cloud provider as "aws", "azure", or "gcp". + --region string REQUIRED: Cloud region for Flink (use "confluent flink region list" to see all). --context string CLI context name. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") diff --git a/test/fixtures/output/flink/artifact/describe.golden b/test/fixtures/output/flink/artifact/describe.golden index 13c89ceb5c..432641f19b 100644 --- a/test/fixtures/output/flink/artifact/describe.golden +++ b/test/fixtures/output/flink/artifact/describe.golden @@ -1,6 +1,10 @@ -+----------------+---------------+ -| ID | ccp-789013 | -| Name | CliPluginTest | -| Version | ver-123456 | -| Content Format | JAR | -+----------------+---------------+ ++----------------+----------------------------------+ +| ID | cfa-789013 | +| Name | CliArtifactTest | +| Version | 1.0.0 | +| Class | io.confluent.flink.example3.test | +| Cloud | AWS | +| Region | us-west-2 | +| Environment | env-789013 | +| Content Format | JAR | ++----------------+----------------------------------+ diff --git a/test/fixtures/output/flink/artifact/list-help.golden b/test/fixtures/output/flink/artifact/list-help.golden index 67a086bf55..ba4b83b240 100644 --- a/test/fixtures/output/flink/artifact/list-help.golden +++ b/test/fixtures/output/flink/artifact/list-help.golden @@ -3,7 +3,14 @@ List Flink UDF artifacts. Usage: confluent flink artifact list [flags] +Examples: +List Flink UDF artifacts. + + $ confluent flink artifact list --cloud aws --region us-west-2 + Flags: + --cloud string REQUIRED: Specify the cloud provider as "aws", "azure", or "gcp". + --region string REQUIRED: Cloud region for Flink (use "confluent flink region list" to see all). --context string CLI context name. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") diff --git a/test/fixtures/output/flink/artifact/list.golden b/test/fixtures/output/flink/artifact/list.golden index 41935b53b0..4701a3b9ac 100644 --- a/test/fixtures/output/flink/artifact/list.golden +++ b/test/fixtures/output/flink/artifact/list.golden @@ -1,3 +1,4 @@ - ID | Name --------------+----------------- - ccp-789013 | CliPluginTest3 + ID | Name | Cloud | Region | Environment +-------------+--------------------------+-------+-----------+-------------- + cfa-123456 | my-flink-artifact | AWS | us-west-2 | env-123456 + cfa-789012 | my-flink-python-artifact | AWS | us-east-1 | env-789012 diff --git a/test/flink_test.go b/test/flink_test.go index 5ebad6a998..48c24f5878 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -35,13 +35,13 @@ type flinkShellTest struct { func (s *CLITestSuite) TestFlinkArtifact() { tests := []CLITest{ - {args: "flink artifact create my-flink-artifact --artifact-file test/fixtures/input/flink/java-udf-examples-3.0.jar", fixture: "flink/artifact/create.golden"}, - {args: "flink artifact create my-flink-artifact --artifact-file test/fixtures/input/flink/java-udf-examples-3.0.jar --description cliPluginTest", fixture: "flink/artifact/create.golden"}, - {args: "flink artifact create my-flink-artifact --artifact-file test/fixtures/input/flink/python-udf-examples.zip --description cliPluginTest --runtime-language python", fixture: "flink/artifact/create-python.golden"}, - {args: "flink artifact describe ccp-789013", fixture: "flink/artifact/describe.golden"}, - {args: "flink artifact list", fixture: "flink/artifact/list.golden"}, - {args: "flink artifact delete ccp-123456 --force", fixture: "flink/artifact/delete.golden"}, - {args: "flink artifact delete ccp-123456", input: "y\n", fixture: "flink/artifact/delete-prompt.golden"}, + {args: "flink artifact create my-flink-artifact --artifact-file test/fixtures/input/flink/java-udf-examples-3.0.jar --cloud aws --region us-west-2 --environment env-123456", fixture: "flink/artifact/create.golden"}, + {args: "flink artifact create my-flink-artifact --artifact-file test/fixtures/input/flink/java-udf-examples-3.0.jar --cloud aws --region us-west-2 --environment env-123456 --description CliArtifactTest", fixture: "flink/artifact/create.golden"}, + {args: "flink artifact create my-flink-artifact --artifact-file test/fixtures/input/flink/python-udf-examples.zip --cloud aws --region us-west-2 --environment env-789012 --description CliArtifactTest --runtime-language python", fixture: "flink/artifact/create-python.golden"}, + {args: "flink artifact describe --cloud aws --region us-west-2 cfa-789013", fixture: "flink/artifact/describe.golden"}, + {args: "flink artifact list --cloud aws --region us-west-2", fixture: "flink/artifact/list.golden"}, + {args: "flink artifact delete --cloud aws --region us-west-2 --force cfa-123456", fixture: "flink/artifact/delete.golden"}, + {args: "flink artifact delete --cloud aws --region us-west-2 cfa-123456", input: "y\n", fixture: "flink/artifact/delete-prompt.golden"}, } for _, test := range tests { diff --git a/test/test-server/ccloudv2_router.go b/test/test-server/ccloudv2_router.go index ac7c87add9..ef5dcc3a4e 100644 --- a/test/test-server/ccloudv2_router.go +++ b/test/test-server/ccloudv2_router.go @@ -11,6 +11,9 @@ type CloudV2Router struct { } var ccloudV2Routes = []route{ + {"/artifact/v1/flink-artifacts", handleFlinkArtifactPlugins}, + {"/artifact/v1/flink-artifacts/{id}", handleFlinkArtifactPluginsId}, + {"/artifact/v1/presigned-upload-url", handleFlinkArtifactUploadUrl}, {"/byok/v1/keys", handleByokKeys}, {"/byok/v1/keys/{id}", handleByokKey}, {"/billing/v1/costs", handleBillingCosts}, diff --git a/test/test-server/flink_artifact_handler.go b/test/test-server/flink_artifact_handler.go new file mode 100644 index 0000000000..fcf5aafd4c --- /dev/null +++ b/test/test-server/flink_artifact_handler.go @@ -0,0 +1,165 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "testing" + + "github.com/gorilla/mux" + "github.com/stretchr/testify/require" + + flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1" +) + +var artifactVersions = &[]flinkartifactv1.ArtifactV1FlinkArtifactVersion{ + { + Version: "1.0.0", + ReleaseNotes: flinkartifactv1.PtrString("Initial release"), + IsBeta: flinkartifactv1.PtrBool(false), + UploadSource: flinkartifactv1.ArtifactV1FlinkArtifactVersionUploadSourceOneOf{ + ArtifactV1UploadSourcePresignedUrl: &flinkartifactv1.ArtifactV1UploadSourcePresignedUrl{ + Location: flinkartifactv1.PtrString("PRESIGNED_URL_LOCATION"), + UploadId: flinkartifactv1.PtrString("u-123"), + }, + }, + }, +} + +// Handler for: "/artifact/v1/flink-artifacts" +func handleFlinkArtifactPlugins(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + var decodeRespone flinkartifactv1.ArtifactV1FlinkArtifact + require.NoError(t, json.NewDecoder(r.Body).Decode(&decodeRespone)) + var plugin flinkartifactv1.ArtifactV1FlinkArtifact + switch strings.ToLower(decodeRespone.GetRuntimeLanguage()) { + case "java", "": + plugin = flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-123456"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-west-2"), + Environment: flinkartifactv1.PtrString("env-123456"), + DisplayName: flinkartifactv1.PtrString("my-flink-artifact"), + Class: flinkartifactv1.PtrString("io.confluent.flink.example1.test"), + ContentFormat: flinkartifactv1.PtrString("JAR"), + Versions: artifactVersions, + } + case "python": + plugin = flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-789012"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-east-1"), + Environment: flinkartifactv1.PtrString("env-789012"), + DisplayName: flinkartifactv1.PtrString("my-flink-python-artifact"), + Class: flinkartifactv1.PtrString("io.confluent.flink.example2.test"), + ContentFormat: flinkartifactv1.PtrString("ZIP"), + Versions: artifactVersions, + } + } + err := json.NewEncoder(w).Encode(plugin) + require.NoError(t, err) + case http.MethodGet: + plugin1 := flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-123456"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-west-2"), + Environment: flinkartifactv1.PtrString("env-123456"), + DisplayName: flinkartifactv1.PtrString("my-flink-artifact"), + Class: flinkartifactv1.PtrString("io.confluent.flink.example1.test"), + ContentFormat: flinkartifactv1.PtrString("JAR"), + Versions: artifactVersions, + } + plugin2 := flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-789012"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-east-1"), + Environment: flinkartifactv1.PtrString("env-789012"), + DisplayName: flinkartifactv1.PtrString("my-flink-python-artifact"), + Class: flinkartifactv1.PtrString("io.confluent.flink.example2.test"), + ContentFormat: flinkartifactv1.PtrString("ZIP"), + Versions: artifactVersions, + } + err := json.NewEncoder(w).Encode(flinkartifactv1.ArtifactV1FlinkArtifactList{Data: []flinkartifactv1.ArtifactV1FlinkArtifact{plugin1, plugin2}}) + require.NoError(t, err) + } + } +} + +// Handler for: "/artifact/v1/flink-artifacts/{id}" +func handleFlinkArtifactPluginsId(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + vars := mux.Vars(r) + id := vars["id"] + var plugin flinkartifactv1.ArtifactV1FlinkArtifact + if id == "cfa-123456" { + plugin = flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-123456"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-west-2"), + Environment: flinkartifactv1.PtrString("env-123456"), + DisplayName: flinkartifactv1.PtrString("my-flink-artifact"), + Class: flinkartifactv1.PtrString("io.confluent.flink.example1.test"), + ContentFormat: flinkartifactv1.PtrString("JAR"), + Versions: artifactVersions, + } + } else if id == "cfa-789012" { + plugin = flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-789012"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-east-1"), + Environment: flinkartifactv1.PtrString("env-789012"), + DisplayName: flinkartifactv1.PtrString("my-flink-python-artifact"), + Description: flinkartifactv1.PtrString("Flink custom artifact"), + Class: flinkartifactv1.PtrString("io.confluent.flink.example2.test"), + ContentFormat: flinkartifactv1.PtrString("ZIP"), + Versions: artifactVersions, + } + } else { + plugin = flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-789013"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-west-2"), + Environment: flinkartifactv1.PtrString("env-789013"), + DisplayName: flinkartifactv1.PtrString("CliArtifactTest"), + Class: flinkartifactv1.PtrString("io.confluent.flink.example3.test"), + ContentFormat: flinkartifactv1.PtrString("JAR"), + Versions: artifactVersions, + } + } + err := json.NewEncoder(w).Encode(plugin) + require.NoError(t, err) + case http.MethodPatch: + plugin := flinkartifactv1.ArtifactV1FlinkArtifact{ + Id: flinkartifactv1.PtrString("cfa-123456"), + DisplayName: flinkartifactv1.PtrString("CliArtifactTestUpdate"), + } + err := json.NewEncoder(w).Encode(plugin) + require.NoError(t, err) + case http.MethodDelete: + err := json.NewEncoder(w).Encode(flinkartifactv1.ArtifactV1FlinkArtifact{}) + require.NoError(t, err) + } + } +} + +// Handler for: "/artifact/v1/presigned-upload-url" +func handleFlinkArtifactUploadUrl(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost { + uploadUrl := flinkartifactv1.ArtifactV1PresignedUrl{ + ContentFormat: flinkartifactv1.PtrString("ZIP"), + Cloud: flinkartifactv1.PtrString("AWS"), + Region: flinkartifactv1.PtrString("us-west-2"), + UploadId: flinkartifactv1.PtrString("e53bb2e8-8de3-49fa-9fb1-4e3fd9a16b66"), + UploadUrl: flinkartifactv1.PtrString(fmt.Sprintf("%s/connect/v1/dummy-presigned-url", TestV2CloudUrl.String())), + } + err := json.NewEncoder(w).Encode(uploadUrl) + require.NoError(t, err) + } + } +}