diff --git a/README.md b/README.md index a3743174..ee7f9e99 100644 --- a/README.md +++ b/README.md @@ -41,3 +41,182 @@ In addition, an OCI Image is provided `docker run eu.gcr.io/gardener-project/com Commands of the cli are hierarchically defined in [pkg/commands](./pkg/commands). When a new command should be added make sure to commply to the google docs guidline for cli command (https://developers.google.com/style/code-syntax#required-items). + +### Transport +The `transport` subcommand copies [Open Component Model (OCM)](https://github.com/gardener/component-spec) based applications between OCI registries or CTF artifacts. The command supports copy-by-reference and copy-by-value. Copy-by-reference only copies the component descriptors. Copy-by-value additionally copies all resources to new locations. + +The basic flow of the `transport` subcommand is the following +``` +- load component descriptor and all component references +- for componentDescriptor : componentDescriptors + - if copy-by-value + - for every resource : componentDescriptor.Resources + - download resource content from source location + - upload resource content to target location + - set resource.Access to target location + - upload patched component descriptor +``` + +If copy-by-value is enable, it is possible to modify resources during the resource copy process, see [here](#modifying-resources). + +#### Configuration +The configuration for downloading and uploading the resources is handed over via the cli option `--transport-cfg-path`. The variable must point to a valid config file. The following snippet shows a basic config file for downloading and uploading resources of types `localOciBlob` and `ociImage`. It defines 1 downloader and 1 uploader for each resource type and matches these definitions to the actual resources via the `filters` attribute. + +```yaml +meta: + version: v1 + +downloaders: +- name: 'oci-artifact-downloader' + type: 'OciArtifactDownloader' + filters: + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' +- name: 'local-oci-blob-downloader' + type: 'LocalOciBlobDownloader' + filters: + - type: 'AccessTypeFilter' + spec: + includeAccessTypes: + - 'localOciBlob' + +uploaders: +- name: 'oci-artifact-uploader' + type: 'OciArtifactUploader' + spec: + baseUrl: 'eu.gcr.io/target/images' + keepSourceRepo: false + filters: + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' +- name: 'local-oci-blob-uploader' + type: 'LocalOciBlobUploader' + filters: + - type: 'AccessTypeFilter' + spec: + includeAccessTypes: + - 'localOciBlob' +``` + +Every resource that gets processed is matched against the filter definitions of each downloader and uploader. In order to work correctly, every resource must match against exactly 1 downloader and 1..n uploaders from the config file. There are builtin downloaders and uploaders, and also the possibility to extend the framework via external downloaders and uploaders, see [here](#extensions). + +##### Modifying resources +Additionally to only downloading and reuploading resources, resources can be modified during the process. The units that perform the modifications are called *processors* and are defined in the config file. As with downloaders and uploaders, there are builtin processors, and also the possibility to extend the framework via external processors, see [here](#extensions). + +The following snippet shows a config file for removing files from OCI artifact layers. It defines 2 processors of type `OciImageFilter` which should remove the *bin* and *etc* directories from all layers of an OCI artifact. These processors are matched against actual resources via `processingRules`. Every resource that gets processed is matched against the filter definitions of each processing rule. If all filters of a rule match, then the resource will get processed by the processors defined in the rule. It is possible that a resource matches 0..n processing rules. + +```yaml +meta: + version: v1 + +downloaders: + ... + +uploaders: + ... + +processors: +- name: 'remove-bin-dir' + type: 'OciImageFilter' + spec: + removePatterns: + - 'bin/*' +- name: 'remove-etc-dir' + type: 'OciImageFilter' + spec: + removePatterns: + - 'etc/*' + +processingRules: +- name: 'remove-bin-dir-from-test-component-images' + processors: + - name: 'remove-bin-dir' + type: 'processor' + filters: + - type: 'ComponentNameFilter' + spec: + includeComponentNames: + - 'github.com/test/test-component' + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' +- name: 'remove-etc-dir-from-all-images' + processors: + - name: 'remove-etc-dir' + type: 'processor' + filters: + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' +``` + +##### Repository Context Override +It is possible to load component descriptors from different OCI registries via the cli option `--repo-context-override-cfg`. It must point to a config file where the source repository contexts of component descriptors can be explicitely mapped. When downloading the source component descriptors, the program will first look into the repository context override file and try to match a component descriptor against each override definition. If no override matches, the default repository context from the cli option `--from` will be used. The following snippet shows a configuration, where the component `github.com/test/test-component` will be loaded from the repository context with base URL `eu.gcr.io/override`. + +```yaml +meta: + version: v1 +overrides: +- repositoryContext: + baseUrl: 'eu.gcr.io/override' + componentNameMapping: urlPath + type: ociRegistry + componentNameFilterSpec: + includeComponentNames: + - 'github.com/test/test-component' +``` + +#### Extensions +A pipeline for processing a single resource must consist of 1 downloader, 0..n processors, and 1..n uploaders. For each resource that gets processed, a unique chain of downloader, processors, and uploaders is created and is created based on the definitions in the config file. The elements in a chain are then called sequentially. + +![resource processing](images/transport.png) + +The downloader is the first element in a chain. It receives a TAR archive which contains 2 files. The first file is `component-descriptor.yaml` which contains the component descriptor in YAML format. The second file is `resource.yaml` which contains the YAML of the resource for which the chain is called. The downloader must then download the actual resource content based on this information. Once the content is downloaded, the downloader builds a new TAR archive for passing the information to subsequent processors. This TAR archive contains the `component-descriptor.yaml` and `resource.yaml`, and additionally the serialized resource content as a single blob file. + +The subsequent processors can open the TAR archive, deserialize the resource content, perform the modifications to either the `resource.yaml` or the actual resource content, and again serialize the content for subsequent processors. Beware that the serialization format of all processors in a chain must match. + +At the end of a processing pipeline are 1..n uploaders. They publish the resource content to a sink (OCI registry, CTF archive, S3 bucket, ...). As with the preceding steps, uploaders must also build a new TAR archive and pass the information to subsequent uploaders. Beware that uploaders must update the `access` attribute in the `resource.yaml` to point to the new resource location. + +After all uploaders run through, the orchestrating program collects the modified `resource.yaml` files from the TAR archive and replaces the original resources in the component descriptor. The original component descriptor now contains the updated resources. As a final step, the modified component descriptor is uploaded. + +Every stage of resource processing (downloaders, processors, uploaders) is extensible. An extension is a static binary, which reads a TAR archive as input (either via stdin or Unix Domain Sockets), performs its modifications, and then outputs a TAR archive with the modified content via stdout or Unix Domain Sockets. To describe extensions in the config file, use `type: 'Executable'` in the downloader/processor/uploader definition. The following snippet shows the definition of an extension processor. + +```yaml +processors: +- name: 'test-extension' + type: 'Executable' + spec: + bin: '/path/to/binary' + args: + - '--test-arg' + - '42' + env: + TEST_ENV: '42' +``` + +When creating extensions, some details must be considered. + +##### General points: +- One can chose whether the extension binary should support communication via stdin/stdout, Unix Domain Sockets, or both. + +- Modifications of the component-descriptor.yaml file in a TAR archive are ignored when reuploading component descriptors and should therefore not be performed. + +- The extension must read the input stream in a non-blocking fashion (e.g. directly write to temp file). If the stream is directly consumed as a TAR file, the writing side might block for large resources, as the TAR archive must first be completely written before it can be opened on the reading side. + +- Beware that the serialization formats of the resource content must match between the downloader, processors, and uploaders of a resource processing chain. The serialization format is therefore an unseen part of the processor interface. + +##### Points when usind Unix Domain Sockets: +- For Unix Domain Sockets, the calling program will set an environment variable which indicates the URL under which the Unix Domain Socket server of the extension binary should start. The calling program will try to connect to the extension via this URL. + +- The extension program must stop the Unix Domain Socket server and stop, once it receives SIGTERM from the calling program. + +- The extension program should remove the socket file in the file system once the program finishes. + +##### Points when using stdin/stdout +- When using stdin/stdout for reading and writing processor TAR archives, beware that no other output like log data is allowed to be written to stdout. This data would otherwise interfere with the program data. diff --git a/docs/reference/component-cli.md b/docs/reference/component-cli.md index 3fed7f11..8c1294b7 100644 --- a/docs/reference/component-cli.md +++ b/docs/reference/component-cli.md @@ -21,5 +21,6 @@ component cli * [component-cli ctf](component-cli_ctf.md) - * [component-cli image-vector](component-cli_image-vector.md) - command to add resource from a image vector and retrieve from a component descriptor * [component-cli oci](component-cli_oci.md) - +* [component-cli transport](component-cli_transport.md) - * [component-cli version](component-cli_version.md) - displays the version diff --git a/docs/reference/component-cli_transport.md b/docs/reference/component-cli_transport.md new file mode 100644 index 00000000..5907b32b --- /dev/null +++ b/docs/reference/component-cli_transport.md @@ -0,0 +1,39 @@ +## component-cli transport + + + +``` +component-cli transport [flags] +``` + +### Options + +``` + --allow-plain-http allows the fallback to http if the oci registry does not support https + --cc-config string path to the local concourse config file + --dry-run only download component descriptors and perform matching of resources against transport config file. no component descriptors are uploaded, no resources are down/uploaded + --from string source repository base url + -h, --help help for transport + --insecure-skip-tls-verify If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure + --processor-timeout duration execution timeout for each individual processor (default 30s) + --registry-config string path to the dockerconfig.json with the oci registry authentication information + --repo-ctx-override-cfg string path to the repository context override config file + --to string target repository where the components are copied to + --transport-cfg string path to the transport config file +``` + +### Options inherited from parent commands + +``` + --cli logger runs as cli logger. enables cli logging + --dev enable development logging which result in console encoding, enabled stacktrace and enabled caller + --disable-caller disable the caller of logs (default true) + --disable-stacktrace disable the stacktrace of error logs (default true) + --disable-timestamp disable timestamp output (default true) + -v, --verbosity int number for the log level verbosity (default 1) +``` + +### SEE ALSO + +* [component-cli](component-cli.md) - component cli + diff --git a/hack/generate-docs/main.go b/hack/generate-docs/main.go index a8fa3462..e6877a46 100644 --- a/hack/generate-docs/main.go +++ b/hack/generate-docs/main.go @@ -30,7 +30,7 @@ func main() { cmd := app.NewComponentsCliCommand(context.TODO()) cmd.DisableAutoGenTag = true check(doc.GenMarkdownTree(cmd, outputDir)) - fmt.Printf("Successfully written docs to %s", outputDir) + fmt.Printf("Successfully written docs to %s\n", outputDir) } func printHelp() { diff --git a/images/transport.png b/images/transport.png new file mode 100644 index 00000000..0dbe84ae Binary files /dev/null and b/images/transport.png differ diff --git a/ociclient/mock/client_mock.go b/ociclient/mock/client_mock.go index 08616ed5..9f11dc62 100644 --- a/ociclient/mock/client_mock.go +++ b/ociclient/mock/client_mock.go @@ -83,6 +83,25 @@ func (mr *MockClientMockRecorder) GetOCIArtifact(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOCIArtifact", reflect.TypeOf((*MockClient)(nil).GetOCIArtifact), arg0, arg1) } +// PushBlob mocks base method. +func (m *MockClient) PushBlob(arg0 context.Context, arg1 string, arg2 v1.Descriptor, arg3 ...ociclient.PushOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PushBlob", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// PushBlob indicates an expected call of PushBlob. +func (mr *MockClientMockRecorder) PushBlob(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushBlob", reflect.TypeOf((*MockClient)(nil).PushBlob), varargs...) +} + // PushManifest mocks base method. func (m *MockClient) PushManifest(arg0 context.Context, arg1 string, arg2 *v1.Manifest, arg3 ...ociclient.PushOption) error { m.ctrl.T.Helper() diff --git a/pkg/commands/componentarchive/remote/copy.go b/pkg/commands/componentarchive/remote/copy.go index ce13ba9d..6078187a 100644 --- a/pkg/commands/componentarchive/remote/copy.go +++ b/pkg/commands/componentarchive/remote/copy.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "io" - "net/url" "os" "path" "strings" @@ -25,8 +24,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - "github.com/gardener/component-cli/ociclient/oci" - "github.com/gardener/component-cli/ociclient" "github.com/gardener/component-cli/ociclient/cache" @@ -293,7 +290,7 @@ func (c *Copier) Copy(ctx context.Context, name, version string) error { } // mangle the target artifact name to keep the original image ref somehow readable. - target, err := targetOCIArtifactRef(c.TargetArtifactRepository, ociRegistryAcc.ImageReference, c.KeepSourceRepository) + target, err := utils.TargetOCIArtifactRef(c.TargetArtifactRepository, ociRegistryAcc.ImageReference, c.KeepSourceRepository) if err != nil { return fmt.Errorf("unable to create target oci artifact reference for resource %s: %w", res.Name, err) } @@ -333,7 +330,7 @@ func (c *Copier) Copy(ctx context.Context, name, version string) error { } src := path.Join(c.SourceArtifactRepository, relOCIRegistryAcc.Reference) - target, err := targetOCIArtifactRef(c.TargetArtifactRepository, src, c.KeepSourceRepository) + target, err := utils.TargetOCIArtifactRef(c.TargetArtifactRepository, src, c.KeepSourceRepository) if err != nil { return fmt.Errorf("unable to create target oci artifact reference for resource %s: %w", res.Name, err) } @@ -403,28 +400,3 @@ func (c *Copier) Copy(ctx context.Context, name, version string) error { return nil } - -func targetOCIArtifactRef(targetRepo, ref string, keepOrigHost bool) (string, error) { - if !strings.Contains(targetRepo, "://") { - // add dummy protocol to correctly parse the url - targetRepo = "http://" + targetRepo - } - t, err := url.Parse(targetRepo) - if err != nil { - return "", err - } - parsedRef, err := oci.ParseRef(ref) - if err != nil { - return "", err - } - - if !keepOrigHost { - parsedRef.Host = t.Host - parsedRef.Repository = path.Join(t.Path, parsedRef.Repository) - return parsedRef.String(), nil - } - replacedRef := strings.NewReplacer(".", "_", ":", "_").Replace(parsedRef.Name()) - parsedRef.Repository = path.Join(t.Path, replacedRef) - parsedRef.Host = t.Host - return parsedRef.String(), nil -} diff --git a/pkg/commands/componentarchive/remote/remote.go b/pkg/commands/componentarchive/remote/remote.go index 89738ecf..7e80f4b8 100644 --- a/pkg/commands/componentarchive/remote/remote.go +++ b/pkg/commands/componentarchive/remote/remote.go @@ -20,6 +20,7 @@ func NewRemoteCommand(ctx context.Context) *cobra.Command { cmd.AddCommand(NewPushCommand(ctx)) cmd.AddCommand(NewGetCommand(ctx)) cmd.AddCommand(NewCopyCommand(ctx)) + cmd.AddCommand(NewTransportCommand(ctx)) return cmd } diff --git a/pkg/commands/componentarchive/remote/transport.go b/pkg/commands/componentarchive/remote/transport.go new file mode 100644 index 00000000..9e7626aa --- /dev/null +++ b/pkg/commands/componentarchive/remote/transport.go @@ -0,0 +1,357 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package remote + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/gardener/component-spec/bindings-go/ctf" + cdoci "github.com/gardener/component-spec/bindings-go/oci" + "github.com/go-logr/logr" + "github.com/mandelsoft/vfs/pkg/osfs" + "github.com/mandelsoft/vfs/pkg/vfs" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/gardener/component-cli/ociclient" + "github.com/gardener/component-cli/ociclient/cache" + ociopts "github.com/gardener/component-cli/ociclient/options" + "github.com/gardener/component-cli/pkg/commands/constants" + "github.com/gardener/component-cli/pkg/logger" + "github.com/gardener/component-cli/pkg/transport" + "github.com/gardener/component-cli/pkg/transport/config" + "github.com/gardener/component-cli/pkg/utils" +) + +type Options struct { + SourceRepository string + TargetRepository string + + // ComponentName is the unique name of the component in the registry. + ComponentName string + // Version is the component Version in the oci registry. + Version string + + // TransportCfgPath is the path to the transport config file + TransportCfgPath string + // RepoCtxOverrideCfgPath is the path to the repository context override config file + RepoCtxOverrideCfgPath string + + GenerateSignature bool + SignatureName string + PrivateKeyPath string + + DryRun bool + ProcessorTimeout time.Duration + + // OCIOptions contains all oci client related options. + OCIOptions ociopts.Options +} + +// NewTransportCommand creates a new transport command. +func NewTransportCommand(ctx context.Context) *cobra.Command { + opts := &Options{} + cmd := &cobra.Command{ + Use: "transport", + Run: func(cmd *cobra.Command, args []string) { + if err := opts.Complete(args); err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + + if err := opts.Run(ctx, logger.Log, osfs.New()); err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + }, + } + opts.AddFlags(cmd.Flags()) + return cmd +} + +func (o *Options) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&o.SourceRepository, "from", "", "source repository base url") + fs.StringVar(&o.TargetRepository, "to", "", "target repository where the components are copied to") + fs.StringVar(&o.TransportCfgPath, "transport-cfg", "", "path to the transport config file") + fs.StringVar(&o.RepoCtxOverrideCfgPath, "repo-ctx-override-cfg", "", "path to the repository context override config file") + fs.BoolVar(&o.DryRun, "dry-run", false, "only download component descriptors and perform matching of resources against transport config file. no component descriptors are uploaded, no resources are down/uploaded") + fs.DurationVar(&o.ProcessorTimeout, "processor-timeout", 30*time.Second, "execution timeout for each individual processor") + o.OCIOptions.AddFlags(fs) +} + +func (o *Options) Complete(args []string) error { + o.ComponentName = args[0] + o.Version = args[1] + + cliHomeDir, err := constants.CliHomeDir() + if err != nil { + return err + } + o.OCIOptions.CacheDir = filepath.Join(cliHomeDir, "components") + if err := os.MkdirAll(o.OCIOptions.CacheDir, os.ModePerm); err != nil { + return fmt.Errorf("unable to create cache directory %s: %w", o.OCIOptions.CacheDir, err) + } + + if len(o.ComponentName) == 0 { + return errors.New("a component name must be defined") + } + if len(o.Version) == 0 { + return errors.New("a component's Version must be defined") + } + + if len(o.SourceRepository) == 0 { + return errors.New("a source repository must be defined") + } + if len(o.TargetRepository) == 0 { + return errors.New("a target repository must be defined") + } + + if len(o.TransportCfgPath) == 0 { + return errors.New("a path to a transport config file must be defined") + } + + if o.GenerateSignature { + if o.SignatureName == "" { + return errors.New("a signature name must be defined") + } + if o.PrivateKeyPath == "" { + return errors.New("a path to a private key file must be defined") + } + } + + return nil +} + +func (o *Options) Run(ctx context.Context, log logr.Logger, fs vfs.FileSystem) error { + if o.DryRun { + log.Info("dry-run: no component descriptors are uploaded, no resources are down/uploaded") + } + + ociClient, _, err := o.OCIOptions.Build(log, fs) + if err != nil { + return fmt.Errorf("unable to build oci client: %s", err.Error()) + } + + ociCache, err := cache.NewCache(log, cache.WithBasePath(o.OCIOptions.CacheDir)) + if err != nil { + return fmt.Errorf("unable to build cache: %w", err) + } + + if err := cache.InjectCacheInto(ociClient, ociCache); err != nil { + return fmt.Errorf("unable to inject cache into oci client: %w", err) + } + + var repoCtxOverride *utils.RepositoryContextOverride + if o.RepoCtxOverrideCfgPath != "" { + repoCtxOverride, err = utils.ParseRepositoryContextOverrideConfig(o.RepoCtxOverrideCfgPath) + if err != nil { + return fmt.Errorf("unable to parse repository context override config file: %w", err) + } + } + + transportCfg, err := config.ParseTransportConfig(o.TransportCfgPath) + if err != nil { + return fmt.Errorf("unable to parse transport config file: %w", err) + } + + sourceCtx := cdv2.NewOCIRegistryRepository(o.SourceRepository, "") + targetCtx := cdv2.NewOCIRegistryRepository(o.TargetRepository, "") + + cds, err := ResolveRecursive(ctx, ociClient, *sourceCtx, o.ComponentName, o.Version, repoCtxOverride, log) + if err != nil { + return fmt.Errorf("unable to resolve component descriptors: %w", err) + } + + factory, err := transport.NewProcessingJobFactory(*transportCfg, ociClient, ociCache, *targetCtx, log, o.ProcessorTimeout) + if err != nil { + return fmt.Errorf("unable to create processing job factory: %w", err) + } + + if o.DryRun { + for _, cd := range cds { + componentLog := log.WithValues("component-name", cd.Name, "component-version", cd.Version) + for _, res := range cd.Resources { + resourceLog := componentLog.WithValues("resource-name", res.Name, "resource-version", res.Version) + job, err := factory.Create(*cd, res) + if err != nil { + resourceLog.Error(err, "unable to create processing job") + return err + } + resourceLog.Info("matched resource", "matching", job.GetMatching()) + } + } + return nil + } + + wg := sync.WaitGroup{} + cdLookup := map[string]*cdv2.ComponentDescriptor{} + errs := []error{} + errsMux := sync.Mutex{} + + for _, cd := range cds { + cd := cd + componentLog := log.WithValues("component-name", cd.Name, "component-version", cd.Version) + + key := fmt.Sprintf("%s:%s", cd.Name, cd.Version) + if _, ok := cdLookup[key]; ok { + err := errors.New("component descriptor already exists in map") + componentLog.Error(err, "unable to add component descriptor to map") + return err + } + cdLookup[key] = cd + + wg.Add(1) + go func() { + defer wg.Done() + processedResources, err := processResources(ctx, cd, *targetCtx, componentLog, factory) + if err != nil { + errsMux.Lock() + errs = append(errs, err) + errsMux.Unlock() + return + } + + cd.Resources = processedResources + }() + } + + wg.Wait() + + if len(errs) > 0 { + return fmt.Errorf("%d errors occurred during resource processing", len(errs)) + } + + for _, cd := range cdLookup { + cd := cd + componentLog := log.WithValues("component-name", cd.Name, "component-version", cd.Version) + + wg.Add(1) + go func() { + defer wg.Done() + manifest, err := cdoci.NewManifestBuilder(ociCache, ctf.NewComponentArchive(cd, nil)).Build(ctx) + if err != nil { + componentLog.Error(err, "unable to build oci artifact for component archive") + return + } + + ociRef, err := cdoci.OCIRef(*targetCtx, o.ComponentName, o.Version) + if err != nil { + componentLog.Error(err, "unable to build component descriptor oci reference") + return + } + + if err := ociClient.PushManifest(ctx, ociRef, manifest); err != nil { + componentLog.Error(err, "unable to push component descriptor manifest") + return + } + }() + } + + wg.Wait() + + if len(errs) > 0 { + return fmt.Errorf("%d errors occurred during component descriptor uploading", len(errs)) + } + + return nil +} + +func processResources( + ctx context.Context, + cd *cdv2.ComponentDescriptor, + targetCtx cdv2.OCIRegistryRepository, + log logr.Logger, + processingJobFactory *transport.ProcessingJobFactory, +) ([]cdv2.Resource, error) { + wg := sync.WaitGroup{} + errs := []error{} + errsMux := sync.Mutex{} + processedResources := []cdv2.Resource{} + resMux := sync.Mutex{} + + for _, resource := range cd.Resources { + resource := resource + resourceLog := log.WithValues("resource-name", resource.Name, "resource-version", resource.Version) + + wg.Add(1) + go func() { + defer wg.Done() + + job, err := processingJobFactory.Create(*cd, resource) + if err != nil { + errsMux.Lock() + errs = append(errs, fmt.Errorf("unable to create processing job: %w", err)) + errsMux.Unlock() + return + } + + resourceLog.V(5).Info("matched resource", "matching", job.GetMatching()) + + if err = job.Process(ctx); err != nil { + errsMux.Lock() + errs = append(errs, fmt.Errorf("unable to process resource: %w", err)) + errsMux.Unlock() + return + } + + resMux.Lock() + processedResources = append(processedResources, *job.ProcessedResource) + resMux.Unlock() + }() + } + + wg.Wait() + + if len(errs) > 0 { + return nil, errors.New("unable to process resources") + } + + return processedResources, nil +} + +func ResolveRecursive( + ctx context.Context, + client ociclient.Client, + defaultRepo cdv2.OCIRegistryRepository, + componentName, + componentVersion string, + repoCtxOverrideCfg *utils.RepositoryContextOverride, + log logr.Logger, +) ([]*cdv2.ComponentDescriptor, error) { + componentLog := log.WithValues("component-name", componentName, "component-version", componentVersion) + + repoCtx := defaultRepo + if repoCtxOverrideCfg != nil { + repoCtx = *repoCtxOverrideCfg.GetRepositoryContext(componentName, defaultRepo) + componentLog.V(7).Info("repository context after override", "repository-context", repoCtx) + } + + cdresolver := cdoci.NewResolver(client) + cd, err := cdresolver.Resolve(ctx, &repoCtx, componentName, componentVersion) + if err != nil { + componentLog.Error(err, "unable to fetch component descriptor") + return nil, err + } + + cds := []*cdv2.ComponentDescriptor{ + cd, + } + for _, ref := range cd.ComponentReferences { + cdDeps, err := ResolveRecursive(ctx, client, defaultRepo, ref.ComponentName, ref.Version, repoCtxOverrideCfg, log) + if err != nil { + componentLog.Error(err, "unable to resolve ref", "ref", ref) + return nil, err + } + cds = append(cds, cdDeps...) + } + + return cds, nil +} diff --git a/pkg/commands/oci/oci.go b/pkg/commands/oci/oci.go index 2894dbb1..f61c0811 100644 --- a/pkg/commands/oci/oci.go +++ b/pkg/commands/oci/oci.go @@ -10,7 +10,7 @@ import ( "github.com/spf13/cobra" ) -// NewOCICommand creates a new ctf command. +// NewOCICommand creates a new oci command. func NewOCICommand(ctx context.Context) *cobra.Command { cmd := &cobra.Command{ Use: "oci", diff --git a/pkg/testutils/tar.go b/pkg/testutils/tar.go new file mode 100644 index 00000000..dbcc4a4b --- /dev/null +++ b/pkg/testutils/tar.go @@ -0,0 +1,68 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package testutils + +import ( + "archive/tar" + "bytes" + "fmt" + "io" + "time" + + . "github.com/onsi/gomega" +) + +// CreateTARArchive creates a TAR archive which contains a defined set of files +func CreateTARArchive(files map[string][]byte) *bytes.Buffer { + buf := bytes.NewBuffer([]byte{}) + tw := tar.NewWriter(buf) + defer tw.Close() + + for filename, content := range files { + h := tar.Header{ + Name: filename, + Size: int64(len(content)), + Mode: 0600, + ModTime: time.Now(), + } + + Expect(tw.WriteHeader(&h)).To(Succeed()) + _, err := tw.Write(content) + Expect(err).ToNot(HaveOccurred()) + } + + return buf +} + +// CheckTARArchive checks that a TAR archive contains a defined set of files +func CheckTARArchive(archiveReader io.Reader, expectedFiles map[string][]byte) { + tr := tar.NewReader(archiveReader) + + expectedFilesCopy := map[string][]byte{} + for key, value := range expectedFiles { + expectedFilesCopy[key] = value + } + + for { + header, err := tr.Next() + if err != nil { + if err == io.EOF { + break + } + Expect(err).ToNot(HaveOccurred()) + } + + actualContentBuf := bytes.NewBuffer([]byte{}) + _, err = io.Copy(actualContentBuf, tr) + Expect(err).ToNot(HaveOccurred()) + + expectedContent, ok := expectedFilesCopy[header.Name] + Expect(ok).To(BeTrue(), fmt.Sprintf("file \"%s\" is not included in expected files", header.Name)) + Expect(actualContentBuf.Bytes()).To(Equal(expectedContent)) + + delete(expectedFilesCopy, header.Name) + } + + Expect(expectedFilesCopy).To(BeEmpty(), fmt.Sprintf("unable to find all expected files in TAR archive. missing files = %+v", expectedFilesCopy)) +} diff --git a/pkg/transport/process/downloaders/downloader_factory.go b/pkg/transport/process/downloaders/downloader_factory.go index a6c68a44..bea3b7ad 100644 --- a/pkg/transport/process/downloaders/downloader_factory.go +++ b/pkg/transport/process/downloaders/downloader_factory.go @@ -7,6 +7,8 @@ import ( "encoding/json" "fmt" + "github.com/go-logr/logr" + "github.com/gardener/component-cli/ociclient" "github.com/gardener/component-cli/ociclient/cache" "github.com/gardener/component-cli/pkg/transport/process" @@ -26,10 +28,11 @@ const ( // - Add Go file to downloader package which contains the source code of the new downloader // - Add string constant for new downloader type -> will be used in DownloaderFactory.Create() // - Add source code for creating new downloader to DownloaderFactory.Create() method -func NewDownloaderFactory(client ociclient.Client, ocicache cache.Cache) *DownloaderFactory { +func NewDownloaderFactory(client ociclient.Client, ocicache cache.Cache, log logr.Logger) *DownloaderFactory { return &DownloaderFactory{ client: client, cache: ocicache, + log: log, } } @@ -37,6 +40,7 @@ func NewDownloaderFactory(client ociclient.Client, ocicache cache.Cache) *Downlo type DownloaderFactory struct { client ociclient.Client cache cache.Cache + log logr.Logger } // Create creates a new downloader defined by a type and a spec @@ -47,7 +51,7 @@ func (f *DownloaderFactory) Create(downloaderType string, spec *json.RawMessage) case OCIArtifactDownloaderType: return NewOCIArtifactDownloader(f.client, f.cache) case extensions.ExecutableType: - return extensions.CreateExecutable(spec) + return extensions.CreateExecutable(spec, f.log) default: return nil, fmt.Errorf("unknown downloader type %s", downloaderType) } diff --git a/pkg/transport/process/extensions/extensions_suite_test.go b/pkg/transport/process/extensions/extensions_suite_test.go index 0d904f4c..ea6bf7e6 100644 --- a/pkg/transport/process/extensions/extensions_suite_test.go +++ b/pkg/transport/process/extensions/extensions_suite_test.go @@ -15,6 +15,7 @@ import ( "time" cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/go-logr/logr" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -76,14 +77,14 @@ var _ = Describe("transport extensions", func() { Context("unix domain socket executable", func() { It("should create processor successfully if env is nil", func() { args := []string{} - _, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, nil) + _, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, nil, logr.Discard()) Expect(err).ToNot(HaveOccurred()) }) It("should modify the processed resource correctly", func() { args := []string{} env := map[string]string{} - processor, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, env) + processor, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, env, logr.Discard()) Expect(err).ToNot(HaveOccurred()) runExampleResourceTest(processor) @@ -94,7 +95,7 @@ var _ = Describe("transport extensions", func() { env := map[string]string{ extensions.ProcessorServerAddressEnv: "/tmp/my-processor.sock", } - _, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, env) + _, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, env, logr.Discard()) Expect(err).To(MatchError(fmt.Sprintf("the env variable %s is not allowed to be set manually", extensions.ProcessorServerAddressEnv))) }) @@ -103,7 +104,7 @@ var _ = Describe("transport extensions", func() { env := map[string]string{ sleepTimeEnv: sleepTime.String(), } - processor, err := extensions.NewUnixDomainSocketExecutable(sleepProcessorBinaryPath, args, env) + processor, err := extensions.NewUnixDomainSocketExecutable(sleepProcessorBinaryPath, args, env, logr.Discard()) Expect(err).ToNot(HaveOccurred()) runTimeoutTest(processor) diff --git a/pkg/transport/process/extensions/unix_domain_socket_executable.go b/pkg/transport/process/extensions/unix_domain_socket_executable.go index 9e6f3864..35f664af 100644 --- a/pkg/transport/process/extensions/unix_domain_socket_executable.go +++ b/pkg/transport/process/extensions/unix_domain_socket_executable.go @@ -13,6 +13,8 @@ import ( "syscall" "time" + "github.com/go-logr/logr" + "github.com/gardener/component-cli/pkg/transport/process" "github.com/gardener/component-cli/pkg/utils" ) @@ -26,11 +28,12 @@ type unixDomainSocketExecutable struct { args []string env []string addr string + log logr.Logger } // NewUnixDomainSocketExecutable returns a resource processor extension which runs an executable in the // background when calling Process(). It communicates with this processor via Unix Domain Sockets. -func NewUnixDomainSocketExecutable(bin string, args []string, env map[string]string) (process.ResourceStreamProcessor, error) { +func NewUnixDomainSocketExecutable(bin string, args []string, env map[string]string, log logr.Logger) (process.ResourceStreamProcessor, error) { if _, ok := env[ProcessorServerAddressEnv]; ok { return nil, fmt.Errorf("the env variable %s is not allowed to be set manually", ProcessorServerAddressEnv) } @@ -52,6 +55,7 @@ func NewUnixDomainSocketExecutable(bin string, args []string, env map[string]str args: args, env: parsedEnv, addr: addr, + log: log, } return &e, nil @@ -66,6 +70,16 @@ func (e *unixDomainSocketExecutable) Process(ctx context.Context, r io.Reader, w if err := cmd.Start(); err != nil { return fmt.Errorf("unable to start processor: %w", err) } + defer func() { + // remove socket file if server hasn't already cleaned up + if _, err := os.Stat(e.addr); err == nil { + if err := os.Remove(e.addr); err != nil { + e.log.Error(err, "unable to remove "+e.addr) + } + } else if !os.IsNotExist(err) { + e.log.Error(err, "unable to get file stats for "+e.addr) + } + }() conn, err := tryConnect(e.addr) if err != nil { @@ -94,15 +108,6 @@ func (e *unixDomainSocketExecutable) Process(ctx context.Context, r io.Reader, w return fmt.Errorf("unable to wait for processor: %w", err) } - // remove socket file if server hasn't already cleaned up - if _, err := os.Stat(e.addr); err == nil { - if err := os.Remove(e.addr); err != nil { - return fmt.Errorf("unable to remove %s: %w", e.addr, err) - } - } else if !os.IsNotExist(err) { - return fmt.Errorf("unable to get file stats for %s: %w", e.addr, err) - } - return nil } diff --git a/pkg/transport/process/extensions/utils.go b/pkg/transport/process/extensions/utils.go index 4a21b1ea..e131de43 100644 --- a/pkg/transport/process/extensions/utils.go +++ b/pkg/transport/process/extensions/utils.go @@ -9,6 +9,8 @@ import ( "sigs.k8s.io/yaml" + "github.com/go-logr/logr" + "github.com/gardener/component-cli/pkg/transport/process" ) @@ -18,7 +20,7 @@ const ( ) // CreateExecutable creates a new executable defined by a spec -func CreateExecutable(rawSpec *json.RawMessage) (process.ResourceStreamProcessor, error) { +func CreateExecutable(rawSpec *json.RawMessage, log logr.Logger) (process.ResourceStreamProcessor, error) { type executableSpec struct { Bin string Args []string @@ -30,5 +32,5 @@ func CreateExecutable(rawSpec *json.RawMessage) (process.ResourceStreamProcessor return nil, fmt.Errorf("unable to parse spec: %w", err) } - return NewUnixDomainSocketExecutable(spec.Bin, spec.Args, spec.Env) + return NewUnixDomainSocketExecutable(spec.Bin, spec.Args, spec.Env, log) } diff --git a/pkg/transport/process/pipeline.go b/pkg/transport/process/pipeline.go deleted file mode 100644 index 98caafba..00000000 --- a/pkg/transport/process/pipeline.go +++ /dev/null @@ -1,92 +0,0 @@ -// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. -// -// SPDX-License-Identifier: Apache-2.0 -package process - -import ( - "context" - "io" - "os" - "time" - - "fmt" - "io/ioutil" - - cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" - - "github.com/gardener/component-cli/pkg/transport/process/utils" -) - -const processorTimeout = 30 * time.Second - -type resourceProcessingPipelineImpl struct { - processors []ResourceStreamProcessor -} - -func (p *resourceProcessingPipelineImpl) Process(ctx context.Context, cd cdv2.ComponentDescriptor, res cdv2.Resource) (*cdv2.ComponentDescriptor, cdv2.Resource, error) { - infile, err := ioutil.TempFile("", "") - if err != nil { - return nil, cdv2.Resource{}, fmt.Errorf("unable to create temporary infile: %w", err) - } - - if err := utils.WriteProcessorMessage(cd, res, nil, infile); err != nil { - return nil, cdv2.Resource{}, fmt.Errorf("unable to write: %w", err) - } - - for _, proc := range p.processors { - outfile, err := p.runProcessor(ctx, infile, proc) - if err != nil { - return nil, cdv2.Resource{}, err - } - - infile = outfile - } - defer infile.Close() - - if _, err := infile.Seek(0, io.SeekStart); err != nil { - return nil, cdv2.Resource{}, fmt.Errorf("unable to seek to beginning of input file: %w", err) - } - - processedCD, processedRes, blobreader, err := utils.ReadProcessorMessage(infile) - if err != nil { - return nil, cdv2.Resource{}, fmt.Errorf("unable to read output data: %w", err) - } - if blobreader != nil { - defer blobreader.Close() - } - - return processedCD, processedRes, nil -} - -func (p *resourceProcessingPipelineImpl) runProcessor(ctx context.Context, infile *os.File, proc ResourceStreamProcessor) (*os.File, error) { - defer infile.Close() - - if _, err := infile.Seek(0, io.SeekStart); err != nil { - return nil, fmt.Errorf("unable to seek to beginning of input file: %w", err) - } - - outfile, err := ioutil.TempFile("", "") - if err != nil { - return nil, fmt.Errorf("unable to create temporary outfile: %w", err) - } - - inreader := infile - outwriter := outfile - - ctx, cancelfunc := context.WithTimeout(ctx, processorTimeout) - defer cancelfunc() - - if err := proc.Process(ctx, inreader, outwriter); err != nil { - return nil, fmt.Errorf("unable to process resource: %w", err) - } - - return outfile, nil -} - -// NewResourceProcessingPipeline returns a new ResourceProcessingPipeline -func NewResourceProcessingPipeline(processors ...ResourceStreamProcessor) ResourceProcessingPipeline { - p := resourceProcessingPipelineImpl{ - processors: processors, - } - return &p -} diff --git a/pkg/transport/process/pipeline_test.go b/pkg/transport/process/pipeline_test.go deleted file mode 100644 index baaff70a..00000000 --- a/pkg/transport/process/pipeline_test.go +++ /dev/null @@ -1,63 +0,0 @@ -// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. -// -// SPDX-License-Identifier: Apache-2.0 -package process_test - -import ( - "context" - "encoding/json" - - cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/gardener/component-cli/pkg/transport/process" - "github.com/gardener/component-cli/pkg/transport/process/processors" -) - -var _ = Describe("pipeline", func() { - - Context("Process", func() { - - It("should correctly process resource", func() { - res := cdv2.Resource{ - IdentityObjectMeta: cdv2.IdentityObjectMeta{ - Name: "my-res", - Version: "v0.1.0", - Type: "ociImage", - }, - } - - l1 := cdv2.Label{ - Name: "processor-0", - Value: json.RawMessage(`"true"`), - } - l2 := cdv2.Label{ - Name: "processor-1", - Value: json.RawMessage(`"true"`), - } - expectedRes := res - expectedRes.Labels = append(expectedRes.Labels, l1) - expectedRes.Labels = append(expectedRes.Labels, l2) - - cd := cdv2.ComponentDescriptor{ - ComponentSpec: cdv2.ComponentSpec{ - Resources: []cdv2.Resource{ - res, - }, - }, - } - - p1 := processors.NewResourceLabeler(l1) - p2 := processors.NewResourceLabeler(l2) - pipeline := process.NewResourceProcessingPipeline(p1, p2) - - actualCD, actualRes, err := pipeline.Process(context.TODO(), cd, res) - Expect(err).ToNot(HaveOccurred()) - - Expect(*actualCD).To(Equal(cd)) - Expect(actualRes).To(Equal(expectedRes)) - }) - - }) -}) diff --git a/pkg/transport/process/processors/oci_artifact_filter.go b/pkg/transport/process/processors/oci_artifact_filter.go new file mode 100644 index 00000000..5f2da990 --- /dev/null +++ b/pkg/transport/process/processors/oci_artifact_filter.go @@ -0,0 +1,248 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package processors + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + + "github.com/containerd/containerd/images" + "github.com/opencontainers/go-digest" + ocispecv1 "github.com/opencontainers/image-spec/specs-go/v1" + + "github.com/gardener/component-cli/ociclient/cache" + "github.com/gardener/component-cli/ociclient/oci" + "github.com/gardener/component-cli/pkg/transport/process" + processutils "github.com/gardener/component-cli/pkg/transport/process/utils" + "github.com/gardener/component-cli/pkg/utils" +) + +type ociArtifactFilter struct { + cache cache.Cache + removePatterns []string +} + +func (f *ociArtifactFilter) Process(ctx context.Context, r io.Reader, w io.Writer) error { + cd, res, blobreader, err := processutils.ReadProcessorMessage(r) + if err != nil { + return fmt.Errorf("unable to read archive: %w", err) + } + if blobreader == nil { + return errors.New("resource blob must not be nil") + } + defer blobreader.Close() + + ociArtifact, err := processutils.DeserializeOCIArtifact(blobreader, f.cache) + if err != nil { + return fmt.Errorf("unable to deserialize oci artifact: %w", err) + } + + if ociArtifact.IsIndex() { + filteredIndex, err := f.filterImageIndex(*ociArtifact.GetIndex()) + if err != nil { + return fmt.Errorf("unable to filter image index: %w", err) + } + if err := ociArtifact.SetIndex(filteredIndex); err != nil { + return fmt.Errorf("unable to set index: %w", err) + } + } else { + filteredImg, err := f.filterImage(*ociArtifact.GetManifest()) + if err != nil { + return fmt.Errorf("unable to filter image: %w", err) + } + if err := ociArtifact.SetManifest(filteredImg); err != nil { + return fmt.Errorf("unable to set manifest: %w", err) + } + } + + blobReader, err := processutils.SerializeOCIArtifact(*ociArtifact, f.cache) + if err != nil { + return fmt.Errorf("unable to serialice oci artifact: %w", err) + } + + if err = processutils.WriteProcessorMessage(*cd, res, blobReader, w); err != nil { + return fmt.Errorf("unable to write archive: %w", err) + } + + return nil +} + +func (f *ociArtifactFilter) filterImageIndex(inputIndex oci.Index) (*oci.Index, error) { + filteredImgs := []*oci.Manifest{} + for _, m := range inputIndex.Manifests { + filteredManifest, err := f.filterImage(*m) + if err != nil { + return nil, fmt.Errorf("unable to filter image %+v: %w", m, err) + } + + manifestBytes, err := json.Marshal(filteredManifest.Data) + if err != nil { + return nil, fmt.Errorf("unable to marshal manifest: ") + } + + if err := f.cache.Add(filteredManifest.Descriptor, io.NopCloser(bytes.NewReader(manifestBytes))); err != nil { + return nil, fmt.Errorf("unable to add filtered manifest to cache: %w", err) + } + + filteredImgs = append(filteredImgs, filteredManifest) + } + + filteredIndex := oci.Index{ + Manifests: filteredImgs, + Annotations: inputIndex.Annotations, + } + + return &filteredIndex, nil +} + +func (f *ociArtifactFilter) filterImage(manifest oci.Manifest) (*oci.Manifest, error) { + // diffIDs := []digest.Digest{} + // unfilteredToFilteredDigestMappings := map[digest.Digest]digest.Digest{} + filteredLayers := []ocispecv1.Descriptor{} + + for _, layer := range manifest.Data.Layers { + layerBlobReader, err := f.cache.Get(layer) + if err != nil { + return nil, err + } + + tmpfile, err := ioutil.TempFile("", "") + if err != nil { + return nil, fmt.Errorf("unable to create tempfile: %w", err) + } + defer tmpfile.Close() + var layerBlobWriter io.WriteCloser = tmpfile + + isGzipCompressedLayer := layer.MediaType == ocispecv1.MediaTypeImageLayerGzip || layer.MediaType == images.MediaTypeDockerSchema2LayerGzip + if isGzipCompressedLayer { + // TODO: detect correct compression and apply to reader and writer + layerBlobReader, err = gzip.NewReader(layerBlobReader) + if err != nil { + return nil, fmt.Errorf("unable to create gzip reader for layer: %w", err) + } + gzipw := gzip.NewWriter(layerBlobWriter) + defer gzipw.Close() + layerBlobWriter = gzipw + } + + uncompressedHasher := sha256.New() + mw := io.MultiWriter(layerBlobWriter, uncompressedHasher) + + if err = utils.FilterTARArchive(layerBlobReader, mw, f.removePatterns); err != nil { + return nil, fmt.Errorf("unable to filter layer blob: %w", err) + } + + if isGzipCompressedLayer { + // close gzip writer (flushes any unwritten data and writes gzip footer) + if err := layerBlobWriter.Close(); err != nil { + return nil, fmt.Errorf("unable to close layer writer: %w", err) + } + } + + if _, err := tmpfile.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("unable to reset input file: %s", err) + } + + filteredDigest, err := digest.FromReader(tmpfile) + if err != nil { + return nil, fmt.Errorf("unable to calculate digest for layer %+v: %w", layer, err) + } + + // unfilteredToFilteredDigestMappings[layer.Digest] = filteredDigest + // diffIDs = append(diffIDs, digest.NewDigestFromEncoded(digest.SHA256, hex.EncodeToString(uncompressedHasher.Sum(nil)))) + + fstat, err := tmpfile.Stat() + if err != nil { + return nil, fmt.Errorf("unable to get file stat: %w", err) + } + + desc := ocispecv1.Descriptor{ + MediaType: layer.MediaType, + Digest: filteredDigest, + Size: fstat.Size(), + URLs: layer.URLs, + Platform: layer.Platform, + Annotations: layer.Annotations, + } + filteredLayers = append(filteredLayers, desc) + + if _, err := tmpfile.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("unable to reset input file: %s", err) + } + if err := f.cache.Add(desc, tmpfile); err != nil { + return nil, fmt.Errorf("unable to add filtered layer blob to cache: %w", err) + } + } + + manifest.Data.Layers = filteredLayers + + cfgBlob, err := f.cache.Get(manifest.Data.Config) + if err != nil { + return nil, fmt.Errorf("unable to get config blob from cache: %w", err) + } + + cfgData, err := io.ReadAll(cfgBlob) + if err != nil { + return nil, fmt.Errorf("unable to read config blob: %w", err) + } + + // TODO: check which modifications on config should be performed + // var config map[string]*json.RawMessage + // if err := json.Unmarshal(data, &config); err != nil { + // return nil, fmt.Errorf("unable to unmarshal config: %w", err) + // } + // rootfs := ocispecv1.RootFS{ + // Type: "layers", + // DiffIDs: diffIDs, + // } + // rootfsRaw, err := utils.RawJSON(rootfs) + // if err != nil { + // return nil, fmt.Errorf("unable to convert rootfs to JSON: %w", err) + // } + // config["rootfs"] = rootfsRaw + // marshaledConfig, err := json.Marshal(cfgData) + // if err != nil { + // return nil, fmt.Errorf("unable to marshal config: %w", err) + // } + // configDesc := ocispecv1.Descriptor{ + // MediaType: ocispecv1.MediaTypeImageConfig, + // Digest: digest.FromBytes(marshaledConfig), + // Size: int64(len(marshaledConfig)), + // } + // manifest.Data.Config = configDesc + + if err := f.cache.Add(manifest.Data.Config, io.NopCloser(bytes.NewReader(cfgData))); err != nil { + return nil, fmt.Errorf("unable to add filtered layer blob to cache: %w", err) + } + + manifestBytes, err := json.Marshal(manifest.Data) + if err != nil { + return nil, fmt.Errorf("unable to marshal manifest: %w", err) + } + + manifest.Descriptor.Size = int64(len(manifestBytes)) + manifest.Descriptor.Digest = digest.FromBytes(manifestBytes) + + return &manifest, nil +} + +// NewOCIArtifactFilter returns a processor that filters files from oci artifact layers +func NewOCIArtifactFilter(cache cache.Cache, removePatterns []string) (process.ResourceStreamProcessor, error) { + if cache == nil { + return nil, errors.New("cache must not be nil") + } + + obj := ociArtifactFilter{ + cache: cache, + removePatterns: removePatterns, + } + return &obj, nil +} diff --git a/pkg/transport/process/processors/oci_artifact_filter_test.go b/pkg/transport/process/processors/oci_artifact_filter_test.go new file mode 100644 index 00000000..5abb5fea --- /dev/null +++ b/pkg/transport/process/processors/oci_artifact_filter_test.go @@ -0,0 +1,274 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package processors_test + +import ( + "bytes" + "context" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + ocispecv1 "github.com/opencontainers/image-spec/specs-go/v1" + + "github.com/gardener/component-cli/ociclient/cache" + "github.com/gardener/component-cli/ociclient/oci" + "github.com/gardener/component-cli/pkg/testutils" + "github.com/gardener/component-cli/pkg/transport/process/processors" + processutils "github.com/gardener/component-cli/pkg/transport/process/utils" +) + +var _ = Describe("ociArtifactFilter", func() { + + Context("Process", func() { + + It("should filter files from oci image", func() { + expectedRes := cdv2.Resource{ + IdentityObjectMeta: cdv2.IdentityObjectMeta{ + Name: "my-res", + Version: "v0.1.0", + Type: "ociImage", + }, + } + expectedCd := cdv2.ComponentDescriptor{ + ComponentSpec: cdv2.ComponentSpec{ + Resources: []cdv2.Resource{ + expectedRes, + }, + }, + } + + removePatterns := []string{ + "filter-this/*", + } + + l1Files := map[string][]byte{ + "test": []byte("test-content"), + "filter-this/file1": []byte("file1-content"), + "filter-this/file2": []byte("file2-content"), + } + + // TODO: add gzipped layer + layers := [][]byte{ + testutils.CreateTARArchive(l1Files).Bytes(), + } + + expectedL1Files := map[string][]byte{ + "test": []byte("test-content"), + } + + expectedLayers := [][]byte{ + testutils.CreateTARArchive(expectedL1Files).Bytes(), + } + + configData := []byte("{}") + + expectedManifestData, expectedManifestDesc := testutils.CreateManifest(configData, expectedLayers, nil) + em := oci.Manifest{ + Descriptor: expectedManifestDesc, + Data: expectedManifestData, + } + expectedOciArtifact, err := oci.NewManifestArtifact(&em) + Expect(err).ToNot(HaveOccurred()) + + ociCache := cache.NewInMemoryCache() + + manifestData, manifestDesc := testutils.CreateManifest(configData, layers, ociCache) + m := oci.Manifest{ + Descriptor: manifestDesc, + Data: manifestData, + } + + ociArtifact, err := oci.NewManifestArtifact(&m) + Expect(err).ToNot(HaveOccurred()) + + r1, err := processutils.SerializeOCIArtifact(*ociArtifact, ociCache) + Expect(err).ToNot(HaveOccurred()) + defer r1.Close() + + inBuf := bytes.NewBuffer([]byte{}) + Expect(processutils.WriteProcessorMessage(expectedCd, expectedRes, r1, inBuf)).To(Succeed()) + + outbuf := bytes.NewBuffer([]byte{}) + proc, err := processors.NewOCIArtifactFilter(ociCache, removePatterns) + Expect(err).ToNot(HaveOccurred()) + Expect(proc.Process(context.TODO(), inBuf, outbuf)).To(Succeed()) + + actualCD, actualRes, actualResBlobReader, err := processutils.ReadProcessorMessage(outbuf) + Expect(err).ToNot(HaveOccurred()) + + Expect(*actualCD).To(Equal(expectedCd)) + Expect(actualRes).To(Equal(expectedRes)) + + deserializeCache := cache.NewInMemoryCache() + actualOciArtifact, err := processutils.DeserializeOCIArtifact(actualResBlobReader, deserializeCache) + Expect(err).ToNot(HaveOccurred()) + Expect(actualOciArtifact).To(Equal(expectedOciArtifact)) + + r, err := deserializeCache.Get(actualOciArtifact.GetManifest().Data.Layers[0]) + Expect(err).ToNot(HaveOccurred()) + testutils.CheckTARArchive(r, expectedL1Files) + }) + + It("should filter files from all images of an oci image index", func() { + expectedRes := cdv2.Resource{ + IdentityObjectMeta: cdv2.IdentityObjectMeta{ + Name: "my-res", + Version: "v0.1.0", + Type: "ociImage", + }, + } + expectedCd := cdv2.ComponentDescriptor{ + ComponentSpec: cdv2.ComponentSpec{ + Resources: []cdv2.Resource{ + expectedRes, + }, + }, + } + + removePatterns := []string{ + "filter-this/*", + } + + l1Files := map[string][]byte{ + "test": []byte("test-content"), + "filter-this/file1": []byte("file1-content"), + "filter-this/file2": []byte("file2-content"), + } + + // TODO: add gzipped layer + layers := [][]byte{ + testutils.CreateTARArchive(l1Files).Bytes(), + } + + expectedL1Files := map[string][]byte{ + "test": []byte("test-content"), + } + + expectedLayers := [][]byte{ + testutils.CreateTARArchive(expectedL1Files).Bytes(), + } + + configData := []byte("{}") + + expectedManifestData, expectedManifestDesc := testutils.CreateManifest(configData, expectedLayers, nil) + ei := oci.Index{ + Manifests: []*oci.Manifest{ + { + Descriptor: ocispecv1.Descriptor{ + MediaType: expectedManifestDesc.MediaType, + Digest: expectedManifestDesc.Digest, + Size: expectedManifestDesc.Size, + Platform: &ocispecv1.Platform{ + Architecture: "amd64", + OS: "linux", + }, + }, + Data: expectedManifestData, + }, + { + Descriptor: ocispecv1.Descriptor{ + MediaType: expectedManifestDesc.MediaType, + Digest: expectedManifestDesc.Digest, + Size: expectedManifestDesc.Size, + Platform: &ocispecv1.Platform{ + Architecture: "amd64", + OS: "windows", + }, + }, + Data: expectedManifestData, + }, + }, + Annotations: map[string]string{ + "test": "test", + }, + } + expectedOciArtifact, err := oci.NewIndexArtifact(&ei) + Expect(err).ToNot(HaveOccurred()) + + ociCache := cache.NewInMemoryCache() + + manifestData, manifestDesc := testutils.CreateManifest(configData, layers, ociCache) + + index := oci.Index{ + Manifests: []*oci.Manifest{ + { + Descriptor: ocispecv1.Descriptor{ + MediaType: manifestDesc.MediaType, + Digest: manifestDesc.Digest, + Size: manifestDesc.Size, + Platform: &ocispecv1.Platform{ + Architecture: "amd64", + OS: "linux", + }, + }, + Data: manifestData, + }, + { + Descriptor: ocispecv1.Descriptor{ + MediaType: manifestDesc.MediaType, + Digest: manifestDesc.Digest, + Size: manifestDesc.Size, + Platform: &ocispecv1.Platform{ + Architecture: "amd64", + OS: "windows", + }, + }, + Data: manifestData, + }, + }, + Annotations: map[string]string{ + "test": "test", + }, + } + + ociArtifact, err := oci.NewIndexArtifact(&index) + Expect(err).ToNot(HaveOccurred()) + + r1, err := processutils.SerializeOCIArtifact(*ociArtifact, ociCache) + Expect(err).ToNot(HaveOccurred()) + defer r1.Close() + + inBuf := bytes.NewBuffer([]byte{}) + Expect(processutils.WriteProcessorMessage(expectedCd, expectedRes, r1, inBuf)).To(Succeed()) + + outbuf := bytes.NewBuffer([]byte{}) + proc, err := processors.NewOCIArtifactFilter(ociCache, removePatterns) + Expect(err).ToNot(HaveOccurred()) + Expect(proc.Process(context.TODO(), inBuf, outbuf)).To(Succeed()) + + actualCD, actualRes, actualResBlobReader, err := processutils.ReadProcessorMessage(outbuf) + Expect(err).ToNot(HaveOccurred()) + + Expect(*actualCD).To(Equal(expectedCd)) + Expect(actualRes).To(Equal(expectedRes)) + + deserializeCache := cache.NewInMemoryCache() + actualOciArtifact, err := processutils.DeserializeOCIArtifact(actualResBlobReader, deserializeCache) + Expect(err).ToNot(HaveOccurred()) + Expect(actualOciArtifact).To(Equal(expectedOciArtifact)) + + firstMan := actualOciArtifact.GetIndex().Manifests[0] + fr, err := deserializeCache.Get(firstMan.Data.Layers[0]) + Expect(err).ToNot(HaveOccurred()) + testutils.CheckTARArchive(fr, expectedL1Files) + + secondMan := actualOciArtifact.GetIndex().Manifests[1] + sr, err := deserializeCache.Get(secondMan.Data.Layers[0]) + Expect(err).ToNot(HaveOccurred()) + testutils.CheckTARArchive(sr, expectedL1Files) + }) + + It("should return error if cache is nil", func() { + _, err := processors.NewOCIArtifactFilter(nil, []string{}) + Expect(err).To(MatchError("cache must not be nil")) + }) + + It("should return error if resource blob reader is nil", func() { + _, err := processors.NewOCIArtifactFilter(nil, []string{}) + Expect(err).To(MatchError("cache must not be nil")) + }) + + }) +}) diff --git a/pkg/transport/process/processors/processor_factory.go b/pkg/transport/process/processors/processor_factory.go new file mode 100644 index 00000000..16361741 --- /dev/null +++ b/pkg/transport/process/processors/processor_factory.go @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package processors + +import ( + "encoding/json" + "fmt" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/go-logr/logr" + "sigs.k8s.io/yaml" + + "github.com/gardener/component-cli/ociclient/cache" + "github.com/gardener/component-cli/pkg/transport/process" + "github.com/gardener/component-cli/pkg/transport/process/extensions" +) + +const ( + // ResourceLabelerProcessorType defines the type of a resource labeler + ResourceLabelerProcessorType = "ResourceLabeler" + + // OCIArtifactFilterProcessorType defines the type of an oci artifact filter + OCIArtifactFilterProcessorType = "OciArtifactFilter" +) + +// NewProcessorFactory creates a new processor factory +func NewProcessorFactory(ociCache cache.Cache, log logr.Logger) *ProcessorFactory { + return &ProcessorFactory{ + cache: ociCache, + log: log, + } +} + +// ProcessorFactory defines a helper struct for creating processors +type ProcessorFactory struct { + cache cache.Cache + log logr.Logger +} + +// Create creates a new processor defined by a type and a spec +func (f *ProcessorFactory) Create(processorType string, spec *json.RawMessage) (process.ResourceStreamProcessor, error) { + switch processorType { + case ResourceLabelerProcessorType: + return f.createResourceLabeler(spec) + case OCIArtifactFilterProcessorType: + return f.createOCIArtifactFilter(spec) + case extensions.ExecutableType: + return extensions.CreateExecutable(spec, f.log) + default: + return nil, fmt.Errorf("unknown processor type %s", processorType) + } +} + +func (f *ProcessorFactory) createResourceLabeler(rawSpec *json.RawMessage) (process.ResourceStreamProcessor, error) { + type processorSpec struct { + Labels cdv2.Labels `json:"labels"` + } + + var spec processorSpec + if err := yaml.Unmarshal(*rawSpec, &spec); err != nil { + return nil, fmt.Errorf("unable to parse spec: %w", err) + } + + return NewResourceLabeler(spec.Labels...), nil +} + +func (f *ProcessorFactory) createOCIArtifactFilter(rawSpec *json.RawMessage) (process.ResourceStreamProcessor, error) { + type processorSpec struct { + RemovePatterns []string `json:"removePatterns"` + } + + var spec processorSpec + if err := yaml.Unmarshal(*rawSpec, &spec); err != nil { + return nil, fmt.Errorf("unable to parse spec: %w", err) + } + + return NewOCIArtifactFilter(f.cache, spec.RemovePatterns) +} diff --git a/pkg/transport/process/processors/resource_labeler_test.go b/pkg/transport/process/processors/resource_labeler_test.go index 263d7a39..232f315b 100644 --- a/pkg/transport/process/processors/resource_labeler_test.go +++ b/pkg/transport/process/processors/resource_labeler_test.go @@ -45,7 +45,7 @@ var _ = Describe("resourceLabeler", func() { expectedRes.Labels = append(expectedRes.Labels, l1) expectedRes.Labels = append(expectedRes.Labels, l2) - cd := cdv2.ComponentDescriptor{ + expectedCd := cdv2.ComponentDescriptor{ ComponentSpec: cdv2.ComponentSpec{ Resources: []cdv2.Resource{ res, @@ -54,17 +54,16 @@ var _ = Describe("resourceLabeler", func() { } inBuf := bytes.NewBuffer([]byte{}) - Expect(utils.WriteProcessorMessage(cd, res, bytes.NewReader(resBytes), inBuf)).To(Succeed()) + Expect(utils.WriteProcessorMessage(expectedCd, res, bytes.NewReader(resBytes), inBuf)).To(Succeed()) outbuf := bytes.NewBuffer([]byte{}) - - p1 := processors.NewResourceLabeler(l1, l2) - Expect(p1.Process(context.TODO(), inBuf, outbuf)).To(Succeed()) + proc := processors.NewResourceLabeler(l1, l2) + Expect(proc.Process(context.TODO(), inBuf, outbuf)).To(Succeed()) actualCD, actualRes, actualResBlobReader, err := utils.ReadProcessorMessage(outbuf) Expect(err).ToNot(HaveOccurred()) - Expect(*actualCD).To(Equal(cd)) + Expect(*actualCD).To(Equal(expectedCd)) Expect(actualRes).To(Equal(expectedRes)) actualResBlobBuf := bytes.NewBuffer([]byte{}) diff --git a/pkg/transport/process/types.go b/pkg/transport/process/types.go index d8b69eb3..cbfd49b8 100644 --- a/pkg/transport/process/types.go +++ b/pkg/transport/process/types.go @@ -6,20 +6,8 @@ package process import ( "context" "io" - - cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" ) -// ResourceProcessingPipeline describes a chain of multiple processors for processing a resource. -// Each processor receives its input from the preceding processor and writes the output for the -// subsequent processor. To work correctly, a pipeline must consist of 1 downloader, 0..n processors, -// and 1..n uploaders. -type ResourceProcessingPipeline interface { - // Process executes all processors for a resource. - // Returns the component descriptor and resource of the last processor. - Process(context.Context, cdv2.ComponentDescriptor, cdv2.Resource) (*cdv2.ComponentDescriptor, cdv2.Resource, error) -} - // ResourceStreamProcessor describes an individual processor for processing a resource. // A processor can upload, modify, or download a resource. type ResourceStreamProcessor interface { diff --git a/pkg/transport/process/uploaders/uploader_factory.go b/pkg/transport/process/uploaders/uploader_factory.go index f7f96d62..90fa2838 100644 --- a/pkg/transport/process/uploaders/uploader_factory.go +++ b/pkg/transport/process/uploaders/uploader_factory.go @@ -8,6 +8,7 @@ import ( "fmt" cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/go-logr/logr" "sigs.k8s.io/yaml" "github.com/gardener/component-cli/ociclient" @@ -29,11 +30,12 @@ const ( // - Add Go file to uploaders package which contains the source code of the new uploader // - Add string constant for new uploader type -> will be used in UploaderFactory.Create() // - Add source code for creating new uploader to UploaderFactory.Create() method -func NewUploaderFactory(client ociclient.Client, ocicache cache.Cache, targetCtx cdv2.OCIRegistryRepository) *UploaderFactory { +func NewUploaderFactory(client ociclient.Client, ocicache cache.Cache, targetCtx cdv2.OCIRegistryRepository, log logr.Logger) *UploaderFactory { return &UploaderFactory{ client: client, cache: ocicache, targetCtx: targetCtx, + log: log, } } @@ -42,6 +44,7 @@ type UploaderFactory struct { client ociclient.Client cache cache.Cache targetCtx cdv2.OCIRegistryRepository + log logr.Logger } // Create creates a new uploader defined by a type and a spec @@ -52,7 +55,7 @@ func (f *UploaderFactory) Create(uploaderType string, spec *json.RawMessage) (pr case OCIArtifactUploaderType: return f.createOCIArtifactUploader(spec) case extensions.ExecutableType: - return extensions.CreateExecutable(spec) + return extensions.CreateExecutable(spec, f.log) default: return nil, fmt.Errorf("unknown uploader type %s", uploaderType) } @@ -65,8 +68,7 @@ func (f *UploaderFactory) createOCIArtifactUploader(rawSpec *json.RawMessage) (p } var spec uploaderSpec - err := yaml.Unmarshal(*rawSpec, &spec) - if err != nil { + if err := yaml.Unmarshal(*rawSpec, &spec); err != nil { return nil, fmt.Errorf("unable to parse spec: %w", err) } diff --git a/pkg/transport/process/utils/oci_artifact_serialization.go b/pkg/transport/process/utils/oci_artifact_serialization.go index 18e10f13..3ea6e707 100644 --- a/pkg/transport/process/utils/oci_artifact_serialization.go +++ b/pkg/transport/process/utils/oci_artifact_serialization.go @@ -276,3 +276,45 @@ func DeserializeOCIArtifact(reader io.Reader, cache cache.Cache) (*oci.Artifact, return ociArtifact, nil } + +func GetManifestOrIndexFromSerializedOCIArtifact(reader io.Reader) (*ocispecv1.Manifest, *ocispecv1.Index, error) { + if reader == nil { + return nil, nil, errors.New("reader must not be nil") + } + + tr := tar.NewReader(reader) + buf := bytes.NewBuffer([]byte{}) + + manifest := &ocispecv1.Manifest{} + index := &ocispecv1.Index{} + + for { + header, err := tr.Next() + if err != nil { + if err == io.EOF { + break + } + return nil, nil, fmt.Errorf("unable to read tar header: %w", err) + } + + if header.Name == ManifestFile { + if _, err := io.Copy(buf, tr); err != nil { + return nil, nil, fmt.Errorf("unable to copy %s to buffer: %w", ManifestFile, err) + } + if err := json.Unmarshal(buf.Bytes(), &manifest); err != nil { + return nil, nil, fmt.Errorf("unable to unmarshal manifest: %w", err) + } + } else if header.Name == IndexFile { + if _, err := io.Copy(buf, tr); err != nil { + return nil, nil, fmt.Errorf("unable to copy %s to buffer: %w", IndexFile, err) + } + if err := json.Unmarshal(buf.Bytes(), &index); err != nil { + return nil, nil, fmt.Errorf("unable to unmarshal image index: %w", err) + } + } else { + continue + } + } + + return manifest, index, nil +} diff --git a/pkg/transport/processing_job.go b/pkg/transport/processing_job.go new file mode 100644 index 00000000..cf530be2 --- /dev/null +++ b/pkg/transport/processing_job.go @@ -0,0 +1,195 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package transport + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "time" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/go-logr/logr" + + "github.com/gardener/component-cli/pkg/transport/process" + "github.com/gardener/component-cli/pkg/transport/process/utils" +) + +func NewProcessingJob( + cd cdv2.ComponentDescriptor, + res cdv2.Resource, + downloaders []NamedResourceStreamProcessor, + processors []NamedResourceStreamProcessor, + uploaders []NamedResourceStreamProcessor, + log logr.Logger, + processorTimeout time.Duration, +) (*ProcessingJob, error) { + if len(downloaders) != 1 { + return nil, fmt.Errorf("a processing job must exactly have 1 downloader, found %d", len(downloaders)) + } + + if len(uploaders) < 1 { + return nil, fmt.Errorf("a processing job must have at least 1 uploader, found %d", len(uploaders)) + } + + if log == nil { + return nil, errors.New("log must not be nil") + } + + j := ProcessingJob{ + ComponentDescriptor: &cd, + Resource: &res, + Downloaders: downloaders, + Processors: processors, + Uploaders: uploaders, + Log: log, + ProcessorTimeout: processorTimeout, + } + return &j, nil +} + +// ProcessingJob defines a type which contains all data for processing a single resource +type ProcessingJob struct { + ComponentDescriptor *cdv2.ComponentDescriptor + Resource *cdv2.Resource + Downloaders []NamedResourceStreamProcessor + Processors []NamedResourceStreamProcessor + Uploaders []NamedResourceStreamProcessor + ProcessedResource *cdv2.Resource + MatchedProcessingRules []string + Log logr.Logger + ProcessorTimeout time.Duration +} + +type NamedResourceStreamProcessor struct { + Processor process.ResourceStreamProcessor + Name string +} + +func (j *ProcessingJob) GetProcessedResource() *cdv2.Resource { + return j.ProcessedResource +} + +func (j *ProcessingJob) GetMatching() map[string][]string { + matching := map[string][]string{ + "processingRules": j.MatchedProcessingRules, + } + + for _, d := range j.Downloaders { + matching["downloaders"] = append(matching["downloaders"], d.Name) + } + + for _, u := range j.Uploaders { + matching["uploaders"] = append(matching["uploaders"], u.Name) + } + + return matching +} + +func (j *ProcessingJob) Validate() error { + if j.ComponentDescriptor == nil { + return errors.New("component descriptor must not be nil") + } + + if j.Resource == nil { + return errors.New("resource must not be nil") + } + + if len(j.Downloaders) != 1 { + return fmt.Errorf("a processing job must exactly have 1 downloader, found %d", len(j.Downloaders)) + } + + if len(j.Uploaders) < 1 { + return fmt.Errorf("a processing job must have at least 1 uploader, found %d", len(j.Uploaders)) + } + + return nil +} + +// Process runs the processing job, by calling downloader, processors, and uploaders sequentially +// for the defined component descriptor and resource. Each processor receives its input from the +// preceding processor and writes the output for the subsequent processor. To work correctly, +// a processing job must consist of 1 downloader, 0..n processors, and 1..n uploaders. +func (j *ProcessingJob) Process(ctx context.Context) error { + if err := j.Validate(); err != nil { + j.Log.Error(err, "invalid processing job") + return err + } + + inputFile, err := ioutil.TempFile("", "") + if err != nil { + j.Log.Error(err, "unable to create temporary input file") + return err + } + + if err := utils.WriteProcessorMessage(*j.ComponentDescriptor, *j.Resource, nil, inputFile); err != nil { + j.Log.Error(err, "unable to write processor message") + return err + } + + processors := []NamedResourceStreamProcessor{} + processors = append(processors, j.Downloaders...) + processors = append(processors, j.Processors...) + processors = append(processors, j.Uploaders...) + + for _, proc := range processors { + procLog := j.Log.WithValues("processor-name", proc.Name) + outputFile, err := j.runProcessor(ctx, inputFile, proc, procLog) + if err != nil { + procLog.Error(err, "unable to run processor") + return err + } + + // set the output file of the current processor as the input file for the next processor + // if the current processor isn't last in the chain -> close file in runProcessor() in next loop iteration + // if the current processor is last in the chain -> explicitely close file after loop + inputFile = outputFile + } + defer inputFile.Close() + + if _, err := inputFile.Seek(0, io.SeekStart); err != nil { + j.Log.Error(err, "unable to seek to beginning of file") + return err + } + + _, processedRes, blobreader, err := utils.ReadProcessorMessage(inputFile) + if err != nil { + j.Log.Error(err, "unable to read processor message") + return err + } + if blobreader != nil { + defer blobreader.Close() + } + + j.ProcessedResource = &processedRes + + return nil +} + +func (p *ProcessingJob) runProcessor(ctx context.Context, infile *os.File, proc NamedResourceStreamProcessor, log logr.Logger) (*os.File, error) { + defer infile.Close() + + if _, err := infile.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("unable to seek to beginning of input file: %w", err) + } + + outfile, err := ioutil.TempFile("", "") + if err != nil { + return nil, fmt.Errorf("unable to create temporary output file: %w", err) + } + + ctx, cancelfunc := context.WithTimeout(ctx, p.ProcessorTimeout) + defer cancelfunc() + + log.V(7).Info("starting processor") + if err := proc.Processor.Process(ctx, infile, outfile); err != nil { + return nil, fmt.Errorf("processor returned with error: %w", err) + } + log.V(7).Info("processor finished successfully") + + return outfile, nil +} diff --git a/pkg/transport/processing_job_factory.go b/pkg/transport/processing_job_factory.go new file mode 100644 index 00000000..dd684053 --- /dev/null +++ b/pkg/transport/processing_job_factory.go @@ -0,0 +1,99 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package transport + +import ( + "fmt" + "time" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/go-logr/logr" + + "github.com/gardener/component-cli/ociclient" + "github.com/gardener/component-cli/ociclient/cache" + "github.com/gardener/component-cli/pkg/transport/config" + "github.com/gardener/component-cli/pkg/transport/process/downloaders" + "github.com/gardener/component-cli/pkg/transport/process/processors" + "github.com/gardener/component-cli/pkg/transport/process/uploaders" +) + +// NewProcessingJobFactory creates a new processing job factory +func NewProcessingJobFactory(transportCfg config.ParsedTransportConfig, ociClient ociclient.Client, ocicache cache.Cache, targetCtx cdv2.OCIRegistryRepository, log logr.Logger, processorTimeout time.Duration) (*ProcessingJobFactory, error) { + df := downloaders.NewDownloaderFactory(ociClient, ocicache, log) + pf := processors.NewProcessorFactory(ocicache, log) + uf := uploaders.NewUploaderFactory(ociClient, ocicache, targetCtx, log) + + f := ProcessingJobFactory{ + parsedConfig: &transportCfg, + downloaderFactory: df, + processorFactory: pf, + uploaderFactory: uf, + log: log, + processorTimeout: processorTimeout, + } + + return &f, nil +} + +// ProcessingJobFactory defines a helper struct for creating processing jobs +type ProcessingJobFactory struct { + parsedConfig *config.ParsedTransportConfig + uploaderFactory *uploaders.UploaderFactory + downloaderFactory *downloaders.DownloaderFactory + processorFactory *processors.ProcessorFactory + log logr.Logger + processorTimeout time.Duration +} + +// Create creates a new processing job for a resource +func (c *ProcessingJobFactory) Create(cd cdv2.ComponentDescriptor, res cdv2.Resource) (*ProcessingJob, error) { + downloaderDefs := c.parsedConfig.MatchDownloaders(cd, res) + downloaders := []NamedResourceStreamProcessor{} + for _, dd := range downloaderDefs { + p, err := c.downloaderFactory.Create(dd.Type, dd.Spec) + if err != nil { + return nil, fmt.Errorf("unable to create downloader: %w", err) + } + downloaders = append(downloaders, NamedResourceStreamProcessor{ + Name: dd.Name, + Processor: p, + }) + } + + processingRuleDefs := c.parsedConfig.MatchProcessingRules(cd, res) + processors := []NamedResourceStreamProcessor{} + for _, rd := range processingRuleDefs { + for _, pd := range rd.Processors { + p, err := c.processorFactory.Create(pd.Type, pd.Spec) + if err != nil { + return nil, fmt.Errorf("unable to create processor: %w", err) + } + processors = append(processors, NamedResourceStreamProcessor{ + Name: pd.Name, + Processor: p, + }) + } + } + + uploaderDefs := c.parsedConfig.MatchUploaders(cd, res) + uploaders := []NamedResourceStreamProcessor{} + for _, ud := range uploaderDefs { + p, err := c.uploaderFactory.Create(ud.Type, ud.Spec) + if err != nil { + return nil, fmt.Errorf("unable to create uploader: %w", err) + } + uploaders = append(uploaders, NamedResourceStreamProcessor{ + Name: ud.Name, + Processor: p, + }) + } + + jobLog := c.log.WithValues("component-name", cd.Name, "component-version", cd.Version, "resource-name", res.Name, "resource-version", res.Version) + job, err := NewProcessingJob(cd, res, downloaders, processors, uploaders, jobLog, c.processorTimeout) + if err != nil { + return nil, fmt.Errorf("unable to create processing job: %w", err) + } + + return job, nil +} diff --git a/pkg/transport/processing_job_factory_test.go b/pkg/transport/processing_job_factory_test.go new file mode 100644 index 00000000..38866c61 --- /dev/null +++ b/pkg/transport/processing_job_factory_test.go @@ -0,0 +1,92 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package transport_test + +import ( + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("processing job", func() { + + Context("processing job factory", func() { + + It("should create processing job", func() { + cd := cdv2.ComponentDescriptor{ + ComponentSpec: cdv2.ComponentSpec{ + ObjectMeta: cdv2.ObjectMeta{ + Name: "github.com/my-component", + Version: "0.1.0", + }, + }, + } + acc, err := cdv2.NewUnstructured(cdv2.NewOCIRegistryAccess("test.com")) + Expect(err).ToNot(HaveOccurred()) + res := cdv2.Resource{ + Access: &acc, + IdentityObjectMeta: cdv2.IdentityObjectMeta{ + Name: "my-res", + Version: "0.1.0", + Type: cdv2.OCIImageType, + }, + } + + job, err := factory.Create(cd, res) + Expect(err).ToNot(HaveOccurred()) + + Expect(*job.ComponentDescriptor).To(Equal(cd)) + Expect(*job.Resource).To(Equal(res)) + + Expect(len(job.Downloaders)).To(Equal(1)) + Expect(job.Downloaders[0].Name).To(Equal("oci-artifact-dl")) + + Expect(len(job.Processors)).To(Equal(3)) + Expect(job.Processors[0].Name).To(Equal("my-oci-filter")) + Expect(job.Processors[1].Name).To(Equal("my-labeler")) + Expect(job.Processors[2].Name).To(Equal("my-extension")) + + Expect(len(job.Uploaders)).To(Equal(1)) + Expect(job.Uploaders[0].Name).To(Equal("oci-artifact-ul")) + }) + + It("should create processing job", func() { + cd := cdv2.ComponentDescriptor{ + ComponentSpec: cdv2.ComponentSpec{ + ObjectMeta: cdv2.ObjectMeta{ + Name: "github.com/my-component", + Version: "0.1.0", + }, + }, + } + acc, err := cdv2.NewUnstructured(cdv2.NewLocalOCIBlobAccess("sha256:123")) + Expect(err).ToNot(HaveOccurred()) + res := cdv2.Resource{ + Access: &acc, + IdentityObjectMeta: cdv2.IdentityObjectMeta{ + Name: "my-res", + Version: "0.1.0", + Type: "helm", + }, + } + + job, err := factory.Create(cd, res) + Expect(err).ToNot(HaveOccurred()) + + Expect(*job.ComponentDescriptor).To(Equal(cd)) + Expect(*job.Resource).To(Equal(res)) + + Expect(len(job.Downloaders)).To(Equal(1)) + Expect(job.Downloaders[0].Name).To(Equal("local-oci-blob-dl")) + + Expect(len(job.Processors)).To(Equal(1)) + Expect(job.Processors[0].Name).To(Equal("my-labeler")) + + Expect(len(job.Uploaders)).To(Equal(1)) + Expect(job.Uploaders[0].Name).To(Equal("local-oci-blob-ul")) + }) + + }) + +}) diff --git a/pkg/transport/processing_job_test.go b/pkg/transport/processing_job_test.go new file mode 100644 index 00000000..583e2405 --- /dev/null +++ b/pkg/transport/processing_job_test.go @@ -0,0 +1,89 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package transport_test + +import ( + "context" + "encoding/json" + "time" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/gardener/component-cli/pkg/transport" + "github.com/gardener/component-cli/pkg/transport/process/processors" +) + +var _ = Describe("processing job", func() { + + Context("processing job", func() { + + It("should correctly process resource", func() { + res := cdv2.Resource{ + IdentityObjectMeta: cdv2.IdentityObjectMeta{ + Name: "my-res", + Version: "v0.1.0", + Type: "ociImage", + }, + } + + l1 := cdv2.Label{ + Name: "processor-0", + Value: json.RawMessage(`"true"`), + } + l2 := cdv2.Label{ + Name: "processor-1", + Value: json.RawMessage(`"true"`), + } + l3 := cdv2.Label{ + Name: "processor-2", + Value: json.RawMessage(`"true"`), + } + expectedRes := res + expectedRes.Labels = append(expectedRes.Labels, l1) + expectedRes.Labels = append(expectedRes.Labels, l2) + expectedRes.Labels = append(expectedRes.Labels, l3) + + cd := cdv2.ComponentDescriptor{ + ComponentSpec: cdv2.ComponentSpec{ + Resources: []cdv2.Resource{ + res, + }, + }, + } + + p1 := transport.NamedResourceStreamProcessor{ + Name: "p1", + Processor: processors.NewResourceLabeler(l1), + } + p2 := transport.NamedResourceStreamProcessor{ + Name: "p2", + Processor: processors.NewResourceLabeler(l2), + } + p3 := transport.NamedResourceStreamProcessor{ + Name: "p3", + Processor: processors.NewResourceLabeler(l3), + } + + pj, err := transport.NewProcessingJob( + cd, + res, + []transport.NamedResourceStreamProcessor{p1}, + []transport.NamedResourceStreamProcessor{p2}, + []transport.NamedResourceStreamProcessor{p3}, + logr.Discard(), + 10*time.Second, + ) + Expect(err).ToNot(HaveOccurred()) + + err = pj.Process(context.TODO()) + Expect(err).ToNot(HaveOccurred()) + Expect(*pj.GetProcessedResource()).To(Equal(expectedRes)) + }) + + }) + +}) diff --git a/pkg/transport/testdata/transport.cfg b/pkg/transport/testdata/transport.cfg new file mode 100644 index 00000000..73fd7edb --- /dev/null +++ b/pkg/transport/testdata/transport.cfg @@ -0,0 +1,91 @@ +meta: + version: v1 + +processors: +- name: 'my-oci-filter' + type: 'OciArtifactFilter' + spec: + removePatterns: + - 'bin/*' +- name: 'my-labeler' + type: 'ResourceLabeler' + spec: + labels: + - name: 'label-name' + value: 'label-value' +- name: 'my-extension' + type: 'Executable' + spec: + bin: '/path/to/processor' + args: + - '-arg1=test' + env: + key1: val1 + +downloaders: +- name: 'oci-artifact-dl' + type: 'OciArtifactDownloader' + filters: + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' +- name: 'local-oci-blob-dl' + type: 'LocalOciBlobDownloader' + filters: + - type: 'AccessTypeFilter' + spec: + includeAccessTypes: + - 'localOciBlob' + +uploaders: +- name: 'oci-artifact-ul' + type: 'OciArtifactUploader' + spec: + baseUrl: 'my-target-registry.com/test' + keepSourceRepo: false + filters: + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' +- name: 'local-oci-blob-ul' + type: 'LocalOciBlobUploader' + filters: + - type: 'AccessTypeFilter' + spec: + includeAccessTypes: + - 'localOciBlob' + +processingRules: +- name: 'generic-image-filtering' + processors: + - name: 'my-oci-filter' + type: 'processor' + filters: + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' +- name: 'my-component-labeling' + processors: + - name: 'my-labeler' + type: 'processor' + filters: + - type: 'ComponentNameFilter' + spec: + includeComponentNames: + - 'github.com/my-component' +- name: 'my-component-special-image-processing' + processors: + - name: 'my-extension' + type: 'processor' + filters: + - type: 'ComponentNameFilter' + spec: + includeComponentNames: + - 'github.com/my-component' + - type: 'ResourceTypeFilter' + spec: + includeResourceTypes: + - 'ociImage' \ No newline at end of file diff --git a/pkg/transport/transport_suite_test.go b/pkg/transport/transport_suite_test.go new file mode 100644 index 00000000..6385bd78 --- /dev/null +++ b/pkg/transport/transport_suite_test.go @@ -0,0 +1,41 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package transport_test + +import ( + "testing" + "time" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/gardener/component-cli/ociclient" + "github.com/gardener/component-cli/ociclient/cache" + "github.com/gardener/component-cli/pkg/transport" + "github.com/gardener/component-cli/pkg/transport/config" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Transport Test Suite") +} + +var ( + factory *transport.ProcessingJobFactory +) + +var _ = BeforeSuite(func() { + transportCfg, err := config.ParseTransportConfig("./testdata/transport.cfg") + Expect(err).ToNot(HaveOccurred()) + + client, err := ociclient.NewClient(logr.Discard()) + Expect(err).ToNot(HaveOccurred()) + ocicache := cache.NewInMemoryCache() + targetCtx := cdv2.NewOCIRegistryRepository("my-target-registry.com/test", "") + + factory, err = transport.NewProcessingJobFactory(*transportCfg, client, ocicache, *targetCtx, logr.Discard(), 30*time.Second) + Expect(err).ToNot(HaveOccurred()) +}, 5) diff --git a/pkg/utils/repo_ctx_override.go b/pkg/utils/repo_ctx_override.go new file mode 100644 index 00000000..d1d4cb0f --- /dev/null +++ b/pkg/utils/repo_ctx_override.go @@ -0,0 +1,84 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package utils + +import ( + "fmt" + "os" + + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + "sigs.k8s.io/yaml" + + "github.com/gardener/component-cli/pkg/transport/filters" +) + +type RepositoryContextOverride struct { + Overrides []Override +} + +type Override struct { + Filter filters.Filter + RepositoryContext *cdv2.OCIRegistryRepository +} + +func ParseRepositoryContextOverrideConfig(configPath string) (*RepositoryContextOverride, error) { + type meta struct { + Version string `json:"version"` + } + + type override struct { + ComponentNameFilterSpec *filters.ComponentNameFilterSpec `json:"componentNameFilterSpec"` + RepositoryContext *cdv2.OCIRegistryRepository `json:"repositoryContext"` + } + + type repositoryContextOverride struct { + Meta meta `json:"meta"` + Overrides []override `json:"overrides"` + } + + repoCtxOverrideCfgYaml, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("unable to read config file: %w", err) + } + + var cfg repositoryContextOverride + if err := yaml.Unmarshal(repoCtxOverrideCfgYaml, &cfg); err != nil { + return nil, fmt.Errorf("unable to parse config file: %w", err) + } + + parsedCfg := RepositoryContextOverride{ + Overrides: []Override{}, + } + + for _, o := range cfg.Overrides { + f, err := filters.NewComponentNameFilter(*o.ComponentNameFilterSpec) + if err != nil { + return nil, fmt.Errorf("unable to create component name filter: %w", err) + } + po := Override{ + Filter: f, + RepositoryContext: o.RepositoryContext, + } + parsedCfg.Overrides = append(parsedCfg.Overrides, po) + } + + return &parsedCfg, nil +} + +func (c *RepositoryContextOverride) GetRepositoryContext(componentName string, defaultRepoCtx cdv2.OCIRegistryRepository) *cdv2.OCIRegistryRepository { + ctx := defaultRepoCtx + for _, o := range c.Overrides { + dummyCd := cdv2.ComponentDescriptor{ + ComponentSpec: cdv2.ComponentSpec{ + ObjectMeta: cdv2.ObjectMeta{ + Name: componentName, + }, + }, + } + if o.Filter.Matches(dummyCd, cdv2.Resource{}) { + ctx = *o.RepositoryContext + } + } + return &ctx +} diff --git a/pkg/utils/repo_ctx_override_test.go b/pkg/utils/repo_ctx_override_test.go new file mode 100644 index 00000000..a07400f1 --- /dev/null +++ b/pkg/utils/repo_ctx_override_test.go @@ -0,0 +1,32 @@ +// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. +// +// SPDX-License-Identifier: Apache-2.0 +package utils_test + +import ( + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("repository context override", func() { + + Context("processing job", func() { + + It("should return overridden repository context if component name matches", func() { + componentName := "github.com/gardener/component-cli" + expectedRepoCtx := cdv2.NewOCIRegistryRepository("example-oci-registry.com/override", "") + actualRepoCtx := repoCtxOverride.GetRepositoryContext(componentName, *defaultRepoCtx) + Expect(actualRepoCtx).To(Equal(expectedRepoCtx)) + }) + + It("should return default repository context if component name doesn't match", func() { + componentName := "github.com/gardener/not-component-cli" + expectedRepoCtx := cdv2.NewOCIRegistryRepository("example-oci-registry.com/base", "") + actualRepoCtx := repoCtxOverride.GetRepositoryContext(componentName, *defaultRepoCtx) + Expect(actualRepoCtx).To(Equal(expectedRepoCtx)) + }) + + }) + +}) diff --git a/pkg/utils/testdata/repo-ctx-override.cfg b/pkg/utils/testdata/repo-ctx-override.cfg new file mode 100644 index 00000000..464f5f34 --- /dev/null +++ b/pkg/utils/testdata/repo-ctx-override.cfg @@ -0,0 +1,8 @@ +overrides: +- repositoryContext: + baseUrl: 'example-oci-registry.com/override' + componentNameMapping: urlPath + type: ociRegistry + componentNameFilterSpec: + includeComponentNames: + - 'github.com/gardener/component-cli' \ No newline at end of file diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index cf08e168..be5488c5 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -178,6 +178,52 @@ func BytesString(bytes uint64, accuracy int) string { return fmt.Sprintf("%s %s", stringValue, unit) } +func FilterTARArchive(inputReader io.Reader, outputWriter io.Writer, removePatterns []string) error { + if inputReader == nil { + return errors.New("inputReader must not be nil") + } + + if outputWriter == nil { + return errors.New("outputWriter must not be nil") + } + + tr := tar.NewReader(inputReader) + tw := tar.NewWriter(outputWriter) + defer tw.Close() + +NEXT_FILE: + for { + header, err := tr.Next() + if err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("unable to read header: %w", err) + } + + for _, removePattern := range removePatterns { + removeFile, err := filepath.Match(removePattern, header.Name) + if err != nil { + return fmt.Errorf("unable to match filename against pattern: %w", err) + } + + if removeFile { + continue NEXT_FILE + } + } + + if err := tw.WriteHeader(header); err != nil { + return fmt.Errorf("unable to write header: %w", err) + } + + if _, err = io.Copy(tw, tr); err != nil { + return fmt.Errorf("unable to write file: %w", err) + } + } + + return nil +} + // WriteFileToTARArchive writes a new file with name=filename and content=inputReader to outputWriter func WriteFileToTARArchive(filename string, inputReader io.Reader, outputWriter *tar.Writer) error { if filename == "" { diff --git a/pkg/utils/utils_suite_test.go b/pkg/utils/utils_suite_test.go index 03afdd59..8a9d8cab 100644 --- a/pkg/utils/utils_suite_test.go +++ b/pkg/utils/utils_suite_test.go @@ -6,11 +6,26 @@ package utils_test import ( "testing" + cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + "github.com/gardener/component-cli/pkg/utils" ) func TestConfig(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Utils Test Suite") } + +var ( + defaultRepoCtx *cdv2.OCIRegistryRepository + repoCtxOverride *utils.RepositoryContextOverride +) + +var _ = BeforeSuite(func() { + defaultRepoCtx = cdv2.NewOCIRegistryRepository("example-oci-registry.com/base", "") + var err error + repoCtxOverride, err = utils.ParseRepositoryContextOverrideConfig("./testdata/repo-ctx-override.cfg") + Expect(err).ToNot(HaveOccurred()) +}, 5) diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index ef0dd993..eb8d0129 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -11,6 +11,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/gardener/component-cli/pkg/testutils" "github.com/gardener/component-cli/pkg/utils" ) @@ -83,4 +84,41 @@ var _ = Describe("utils", func() { }) + Context("FilterTARArchive", func() { + + It("should filter archive", func() { + removePatterns := []string{ + "second/*", + } + + inputFiles := map[string][]byte{ + "first/testfile": []byte("some-content"), + "second/testfile": []byte("more-content"), + "second/testfile-2": []byte("other-content"), + } + + expectedFiles := map[string][]byte{ + "first/testfile": []byte("some-content"), + } + + archive := testutils.CreateTARArchive(inputFiles) + + outBuf := bytes.NewBuffer([]byte{}) + Expect(utils.FilterTARArchive(archive, outBuf, removePatterns)).To(Succeed()) + + testutils.CheckTARArchive(outBuf, expectedFiles) + }) + + It("should return error if inputReader is nil", func() { + outWriter := bytes.NewBuffer([]byte{}) + Expect(utils.FilterTARArchive(nil, outWriter, []string{})).To(MatchError("inputReader must not be nil")) + }) + + It("should return error if outputWriter is nil", func() { + inputReader := bytes.NewReader([]byte{}) + Expect(utils.FilterTARArchive(inputReader, nil, []string{})).To(MatchError("outputWriter must not be nil")) + }) + + }) + })