Skip to content

Commit

Permalink
[FRT-521] Integrate Confluent CLI with new Flink Artifact API (#2868)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgaref authored Sep 5, 2024
1 parent d35288c commit 6086d94
Show file tree
Hide file tree
Showing 24 changed files with 484 additions and 88 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.10.0
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5 h1:dY7/VtR
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.5/go.mod h1:AaF39Acy3LFnHSHExaUtqNmbs7kL5/AL54CXX61+Il8=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEtc/k3K11PX7f4Fj7sPFdo=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48=
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0 h1:2QuFhvrfU4AdxyfWWPFY0fqEg8p8wmKFfC6N+35pxHg=
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0/go.mod h1:cl7LEL6bFgiXQ+8sEZvo3BrYZxDOvGkx4jV7eX1ssN4=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0 h1:KQVHoct3guckqY2dbzmNouJRwtUfESAKCOjSfbNOuKU=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU=
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0 h1:ZUAow4L6De1FwYoiwvEodm4lvxc+46wNW+IEAb7K9VU=
Expand Down
5 changes: 1 addition & 4 deletions internal/flink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

pcmd "github.com/confluentinc/cli/v3/pkg/cmd"
"github.com/confluentinc/cli/v3/pkg/config"
"github.com/confluentinc/cli/v3/pkg/featureflags"
)

type command struct {
Expand All @@ -23,9 +22,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {

c := &command{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}

if cfg.IsTest || featureflags.Manager.BoolVariation("cli.flink_artifact.early_access", cfg.Context(), config.CliLaunchDarklyClient, true, false) {
cmd.AddCommand(c.newArtifactCommand())
}
cmd.AddCommand(c.newArtifactCommand())
cmd.AddCommand(c.newComputePoolCommand())
cmd.AddCommand(c.newConnectivityTypeCommand())
cmd.AddCommand(c.newRegionCommand())
Expand Down
28 changes: 21 additions & 7 deletions internal/flink/command_artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package flink
import (
"github.com/spf13/cobra"

connectcustompluginv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin/v1"
flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1"

pcmd "github.com/confluentinc/cli/v3/pkg/cmd"
"github.com/confluentinc/cli/v3/pkg/output"
)

type flinkArtifactOut struct {
Id string `human:"ID" serialized:"id" `
Name string `human:"Name" serialized:"name" `
Version string `human:"Version" serialized:"version" `
ContentFormat string `human:"Content Format" serialized:"content_format" `
Id string `human:"ID" serialized:"id"`
Name string `human:"Name" serialized:"name"`
Version string `human:"Version" serialized:"version"`
Class string `human:"Class" serialized:"class"`
Cloud string `human:"Cloud" serialized:"cloud"`
Region string `human:"Region" serialized:"region"`
Environment string `human:"Environment" serialized:"environment"`
ContentFormat string `human:"Content Format" serialized:"content_format"`
}

func (c *command) newArtifactCommand() *cobra.Command {
Expand All @@ -31,12 +35,22 @@ func (c *command) newArtifactCommand() *cobra.Command {
return cmd
}

func printTable(cmd *cobra.Command, plugin connectcustompluginv1.ConnectV1CustomConnectorPlugin) error {
func printTable(cmd *cobra.Command, plugin flinkartifactv1.ArtifactV1FlinkArtifact) error {
table := output.NewTable(cmd)

var pluginVersion = ""
if len(plugin.GetVersions()) > 0 {
pluginVersion = (*plugin.Versions)[0].GetVersion()
}

table.Add(&flinkArtifactOut{
Name: plugin.GetDisplayName(),
Id: plugin.GetId(),
Version: plugin.GetConnectorClass(),
Version: pluginVersion,
Class: plugin.GetClass(),
Cloud: plugin.GetCloud(),
Region: plugin.GetRegion(),
Environment: plugin.GetEnvironment(),
ContentFormat: plugin.GetContentFormat(),
})

Expand Down
73 changes: 53 additions & 20 deletions internal/flink/command_artifact_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/spf13/cobra"

connectcustompluginv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin/v1"
flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1"

pcmd "github.com/confluentinc/cli/v3/pkg/cmd"
"github.com/confluentinc/cli/v3/pkg/examples"
Expand All @@ -21,11 +21,15 @@ var (
allowedFileExtensions = []string{"jar", "zip"}
)

type pluginCreateOut struct {
type artifactCreateOut struct {
Id string `human:"ID" serialized:"id"`
Name string `human:"Name" serialized:"name"`
Version string `human:"Version" serialized:"version"`
Class string `human:"Class" serialized:"class"`
ContentFormat string `human:"Content Format" serialized:"content_format"`
Cloud string `human:"Cloud" serialized:"cloud"`
Region string `human:"Region" serialized:"region"`
Environment string `human:"Environment" serialized:"environment"`
ErrorTrace string `human:"Error Trace,omitempty" serialized:"error_trace,omitempty"`
}

Expand All @@ -38,48 +42,67 @@ func (c *command) newCreateCommand() *cobra.Command {
Example: examples.BuildExampleString(
examples.Example{
Text: `Create Flink artifact "my-flink-artifact".`,
Code: "confluent flink artifact create my-flink-artifact --artifact-file plugin.jar",
Code: "confluent flink artifact create my-flink-artifact --artifact-file plugin.jar --cloud aws --region us-west-2 --environment env-123456",
},
),
}

cmd.Flags().String("artifact-file", "", "Flink artifact JAR file or ZIP file.")
pcmd.AddCloudFlag(cmd)
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
cmd.Flags().String("runtime-language", "java", fmt.Sprintf("Specify the Flink artifact runtime language as %s.", utils.ArrayToCommaDelimitedString(allowedRuntimeLanguages, "or")))
cmd.Flags().String("description", "", "Description of Flink artifact.")
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("artifact-file"))
cobra.CheckErr(cmd.MarkFlagRequired("cloud"))
cobra.CheckErr(cmd.MarkFlagRequired("region"))
cobra.CheckErr(cmd.MarkFlagFilename("artifact-file", "zip", "jar"))

return cmd
}

func (c *command) createArtifact(cmd *cobra.Command, args []string) error {
displayName := args[0]
description, err := cmd.Flags().GetString("description")
artifactFile, err := cmd.Flags().GetString("artifact-file")
if err != nil {
return err
}
artifactFile, err := cmd.Flags().GetString("artifact-file")
cloud, err := cmd.Flags().GetString("cloud")
if err != nil {
return err
}
region, err := cmd.Flags().GetString("region")
if err != nil {
return err
}
environment, err := c.Context.EnvironmentId()
if err != nil {
return err
}
runtimeLanguage, err := cmd.Flags().GetString("runtime-language")
if err != nil {
return err
}
description, err := cmd.Flags().GetString("description")
if err != nil {
return err
}

extension := strings.TrimPrefix(filepath.Ext(artifactFile), ".")
if !slices.Contains(allowedFileExtensions, strings.ToLower(extension)) {
return fmt.Errorf("only extensions allowed for `--artifact-file` are %s", utils.ArrayToCommaDelimitedString(allowedFileExtensions, "and"))
}

request := connectcustompluginv1.ConnectV1PresignedUrlRequest{
ContentFormat: connectcustompluginv1.PtrString(extension),
request := flinkartifactv1.ArtifactV1PresignedUrlRequest{
ContentFormat: flinkartifactv1.PtrString(extension),
Cloud: flinkartifactv1.PtrString(cloud),
Region: flinkartifactv1.PtrString(region),
}

resp, err := c.V2Client.GetPresignedUrl(request)
resp, err := c.V2Client.GetFlinkPresignedUrl(request)
if err != nil {
return err
}
Expand All @@ -88,29 +111,39 @@ func (c *command) createArtifact(cmd *cobra.Command, args []string) error {
return err
}

createArtifactRequest := connectcustompluginv1.ConnectV1CustomConnectorPlugin{
DisplayName: connectcustompluginv1.PtrString(displayName),
Description: connectcustompluginv1.PtrString(description),
ConnectorType: connectcustompluginv1.PtrString("flink_udf"),
UploadSource: &connectcustompluginv1.ConnectV1CustomConnectorPluginUploadSourceOneOf{
ConnectV1UploadSourcePresignedUrl: &connectcustompluginv1.ConnectV1UploadSourcePresignedUrl{
Location: "PRESIGNED_URL_LOCATION",
UploadId: resp.GetUploadId(),
createArtifactRequest := flinkartifactv1.InlineObject{
DisplayName: displayName,
Cloud: cloud,
Region: region,
Description: flinkartifactv1.PtrString(description),
UploadSource: flinkartifactv1.InlineObjectUploadSourceOneOf{
ArtifactV1UploadSourcePresignedUrl: &flinkartifactv1.ArtifactV1UploadSourcePresignedUrl{
Location: flinkartifactv1.PtrString("PRESIGNED_URL_LOCATION"),
UploadId: flinkartifactv1.PtrString(resp.GetUploadId()),
},
},
RuntimeLanguage: connectcustompluginv1.PtrString(runtimeLanguage),
RuntimeLanguage: flinkartifactv1.PtrString(runtimeLanguage),
}

plugin, err := c.V2Client.CreateCustomPlugin(createArtifactRequest)
plugin, err := c.V2Client.CreateFlinkArtifact(createArtifactRequest)
if err != nil {
return err
}

var pluginVersion = ""
if len(plugin.GetVersions()) > 0 {
pluginVersion = (*plugin.Versions)[0].GetVersion()
}

table := output.NewTable(cmd)
table.Add(&pluginCreateOut{
table.Add(&artifactCreateOut{
Name: plugin.GetDisplayName(),
Id: plugin.GetId(),
Version: plugin.GetConnectorClass(),
Version: pluginVersion,
Class: plugin.GetClass(),
Cloud: cloud,
Region: region,
Environment: environment,
ContentFormat: plugin.GetContentFormat(),
})
return table.Print()
Expand Down
40 changes: 30 additions & 10 deletions internal/flink/command_artifact_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

pcmd "github.com/confluentinc/cli/v3/pkg/cmd"
"github.com/confluentinc/cli/v3/pkg/deletion"
"github.com/confluentinc/cli/v3/pkg/examples"
"github.com/confluentinc/cli/v3/pkg/resource"
)

Expand All @@ -14,47 +15,66 @@ func (c *command) newDeleteCommand() *cobra.Command {
Short: "Delete one or more Flink UDF artifacts.",
Args: cobra.MinimumNArgs(1),
RunE: c.delete,
Example: examples.BuildExampleString(
examples.Example{
Text: "Delete Flink UDF artifact.",
Code: "confluent flink artifact delete --cloud aws --region us-west-2 cfa-123456",
},
),
}

pcmd.AddCloudFlag(cmd)
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
pcmd.AddForceFlag(cmd)
pcmd.AddContextFlag(cmd, c.CLICommand)

cobra.CheckErr(cmd.MarkFlagRequired("cloud"))
cobra.CheckErr(cmd.MarkFlagRequired("region"))

return cmd
}

func (c *command) delete(cmd *cobra.Command, args []string) error {
pluginIdToName, err := c.mapPluginIdToName()
cloud, err := cmd.Flags().GetString("cloud")
if err != nil {
return err
}
region, err := cmd.Flags().GetString("region")
if err != nil {
return err
}

existenceFunc := func(id string) bool {
_, ok := pluginIdToName[id]
artifactIdToName, err := c.mapArtifactIdToName(cloud, region)
if err != nil {
return false
}
_, ok := artifactIdToName[id]
return ok
}

if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.CustomConnectorPlugin); err != nil {
if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkArtifact); err != nil {
return err
}

deleteFunc := func(id string) error {
return c.V2Client.DeleteCustomPlugin(id)
return c.V2Client.DeleteFlinkArtifact(cloud, region, id)
}

_, err = deletion.Delete(args, deleteFunc, resource.CustomConnectorPlugin)
_, err = deletion.Delete(args, deleteFunc, resource.FlinkArtifact)
return err
}

func (c *command) mapPluginIdToName() (map[string]string, error) {
plugins, err := c.V2Client.ListCustomPlugins("")
func (c *command) mapArtifactIdToName(cloud string, region string) (map[string]string, error) {
plugins, err := c.V2Client.ListFlinkArtifacts(cloud, region, "")
if err != nil {
return nil, err
}

pluginIdToName := make(map[string]string)
artifactIdToName := make(map[string]string)
for _, plugin := range plugins {
pluginIdToName[plugin.GetId()] = plugin.GetDisplayName()
artifactIdToName[plugin.GetId()] = plugin.GetDisplayName()
}

return pluginIdToName, nil
return artifactIdToName, nil
}
19 changes: 17 additions & 2 deletions internal/flink/command_artifact_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,34 @@ func (c *command) newDescribeCommand() *cobra.Command {
Example: examples.BuildExampleString(
examples.Example{
Text: "Describe Flink UDF artifact.",
Code: "confluent flink artifact describe ccp-123456",
Code: "confluent flink artifact describe --cloud aws --region us-west-2 cfa-123456",
},
),
}

pcmd.AddCloudFlag(cmd)
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("cloud"))
cobra.CheckErr(cmd.MarkFlagRequired("region"))

return cmd
}

func (c *command) describe(cmd *cobra.Command, args []string) error {
plugin, err := c.V2Client.DescribeCustomPlugin(args[0])
cloud, err := cmd.Flags().GetString("cloud")
if err != nil {
return err
}

region, err := cmd.Flags().GetString("region")
if err != nil {
return err
}

plugin, err := c.V2Client.DescribeFlinkArtifact(cloud, region, args[0])
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 6086d94

Please sign in to comment.