diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ad9eaebd03..0560a37383e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,7 @@ * **internal:** Adding proto field to support regional quota ([c8f08ba](https://github.com/kubeflow/pipelines/commit/c8f08ba49f92f53269d71425666c0bc3a687615d)) * **kubernetes_platform:** Add empty dir mount ([\#10892](https://github.com/kubeflow/pipelines/issues/10892)) ([10aaf43](https://github.com/kubeflow/pipelines/commit/10aaf431367e974bf6c73306acf6a7fd40e36942)) * **kubernetes_platform:** Update kubernetes_platform go package to include EnabledSharedMemory ([\#10703](https://github.com/kubeflow/pipelines/issues/10703)) ([7c63599](https://github.com/kubeflow/pipelines/commit/7c6359984314472bf801ea1ba8b0e8c5d9e2be2c)) - +* **kubernetes_platform:** Update kubernetes_platform go package to include node DataSource on PersistentVolumeClaim's creation ([\#11420](https://github.com/kubeflow/pipelines/issues/11420) ### Bug Fixes diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 68899f14a5b..f3b9c761aaa 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1789,6 +1789,15 @@ func createPVC( }, } + // Optional input: data_source + if pvcDataSourceInput, ok := inputs.ParameterValues["data_source"]; ok { + dataSource, err := buildPVCDataSource(pvcDataSourceInput) + if err != nil { + return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to build data source: %w", err) + } + pvc.Spec.DataSource = dataSource + } + // Create the PVC in the cluster createdPVC, err := k8sClient.CoreV1().PersistentVolumeClaims(opts.Namespace).Create(context.Background(), pvc, metav1.CreateOptions{}) if err != nil { @@ -1811,6 +1820,30 @@ func createPVC( return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil } +// buildPVCDataSource converts a protobuf Value representing a PVC data source +// into a Kubernetes TypedLocalObjectReference. If the input is nil or if JSON +// marshaling/unmarshaling fails, it returns an error. Field validation is +// deferred to the Kubernetes API during PVC creation. +func buildPVCDataSource(pvcDataSourceInput *structpb.Value) (*k8score.TypedLocalObjectReference, error) { + if pvcDataSourceInput == nil { + return nil, fmt.Errorf("data_source is nil") + } + + var dataSource k8score.TypedLocalObjectReference + dataSourceStructByte, err := pvcDataSourceInput.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal data_source %v: %w", pvcDataSourceInput.String(), err) + } + + if err := json.Unmarshal(dataSourceStructByte, &dataSource); err != nil { + return nil, fmt.Errorf("failed to unmarshal data_source: %v. %w", string(dataSourceStructByte), err) + } + + // Don't do any validation here, if there is a missing required fields, the k8s API will return + // an error during the PVC creation. + return &dataSource, nil +} + func deletePVC( ctx context.Context, k8sClient kubernetes.Interface, diff --git a/backend/src/v2/driver/driver_test.go b/backend/src/v2/driver/driver_test.go index 130291f3cac..410656e2773 100644 --- a/backend/src/v2/driver/driver_test.go +++ b/backend/src/v2/driver/driver_test.go @@ -17,10 +17,12 @@ import ( "encoding/json" "testing" + "google.golang.org/protobuf/types/known/structpb" k8sres "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" + "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/kubeflow/pipelines/backend/src/v2/metadata" "github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform" "github.com/spf13/viper" @@ -1621,3 +1623,56 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) { }) } } + +func TestBuildPVCWithDataSource(t *testing.T) { + tests := []struct { + name string + input *structpb.Value + expected *k8score.TypedLocalObjectReference + expectError bool + }{ + { + name: "Valid data source", + input: structpb.NewStructValue( + &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "apiGroup": structpb.NewStringValue("snapshot.storage.k8s.io"), + "kind": structpb.NewStringValue("VolumeSnapshot"), + "name": structpb.NewStringValue("snapshot-name"), + }, + }, + ), + expected: &k8score.TypedLocalObjectReference{ + APIGroup: util.StringPointer("snapshot.storage.k8s.io"), + Kind: "VolumeSnapshot", + Name: "snapshot-name", + }, + expectError: false, + }, + { + name: "Nil input", + input: nil, + expected: nil, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Call the function + result, err := buildPVCDataSource(tt.input) + + // Assert + if tt.expectError { + assert.Error(t, err, "Expected an error but got none") + assert.Nil(t, result, "Expected result to be nil on error") + } else { + assert.NoError(t, err, "Unexpected error: %v", err) + assert.NotNil(t, result, "Expected result to be non-nil") + assert.Equal(t, tt.expected.Name, result.Name, "Mismatched Name field") + assert.Equal(t, tt.expected.Kind, result.Kind, "Mismatched Kind field") + assert.Equal(t, tt.expected.APIGroup, result.APIGroup, "Mismatched APIGroup field") + } + }) + } +} diff --git a/components/google-cloud/RELEASE.md b/components/google-cloud/RELEASE.md index 9657a8b4691..f5f6ebc4ed8 100644 --- a/components/google-cloud/RELEASE.md +++ b/components/google-cloud/RELEASE.md @@ -11,6 +11,7 @@ * Bump image for Structured Data pipelines. * Add strategy to v1 GCPC custom job components/utils * Apply latest GCPC image vulnerability resolutions (base OS and software updates) +* Support PVC creation using the data_source parameter. ## Release 2.17.0 * Fix Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline` after output schema change. diff --git a/kubernetes_platform/python/kfp/kubernetes/volume.py b/kubernetes_platform/python/kfp/kubernetes/volume.py index 4535d45bcd9..2abe571e70e 100644 --- a/kubernetes_platform/python/kfp/kubernetes/volume.py +++ b/kubernetes_platform/python/kfp/kubernetes/volume.py @@ -32,6 +32,7 @@ def CreatePVC( storage_class_name: Optional[str] = '', volume_name: Optional[str] = None, annotations: Optional[Dict[str, str]] = None, + data_source: Optional[Dict[str, str]] = None, ): """Create a PersistentVolumeClaim, which can be used by downstream tasks. See `PersistentVolume `_ and `PersistentVolumeClaim `_ documentation for more information about @@ -54,6 +55,7 @@ def CreatePVC( provisioned PersistentVolumeClaim. Used for statically specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName `_. annotations: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations `_. + data_source: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource `_. Returns: ``name: str`` \n\t\t\tName of the generated PVC. diff --git a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.py b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.py index a94a4aa1f3c..048ff42b0dd 100644 --- a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.py +++ b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.py @@ -41,6 +41,11 @@ def my_pipeline(): access_modes=['ReadWriteOnce'], size='5Mi', storage_class_name='standard', + data_source={ + 'api_group': 'snapshot.storage.k8s.io', + 'kind': 'VolumeSnapshot', + 'name': 'my-snap', + }, ) task1 = producer() diff --git a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.yaml b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.yaml index 124430a41d3..42f0dd516b7 100644 --- a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.yaml +++ b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_dynamic_pvc.yaml @@ -25,6 +25,11 @@ components: `_. isOptional: true parameterType: STRUCT + data_source: + description: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource + `_. + isOptional: true + parameterType: STRUCT pvc_name: description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name `_. @@ -96,7 +101,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -132,7 +137,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -177,6 +182,12 @@ root: runtimeValue: constant: - ReadWriteOnce + data_source: + runtimeValue: + constant: + api_group: snapshot.storage.k8s.io + kind: VolumeSnapshot + name: my-snap pvc_name_suffix: runtimeValue: constant: -my-pvc @@ -214,7 +225,7 @@ root: taskInfo: name: producer schemaVersion: 2.1.0 -sdkVersion: kfp-2.4.0 +sdkVersion: kfp-2.10.1 --- platforms: kubernetes: diff --git a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc.yaml b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc.yaml index d42aadc2c9a..6ec57e8782d 100644 --- a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc.yaml +++ b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc.yaml @@ -21,6 +21,11 @@ components: `_. isOptional: true parameterType: STRUCT + data_source: + description: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource + `_. + isOptional: true + parameterType: STRUCT pvc_name: description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name `_. @@ -86,7 +91,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -161,7 +166,7 @@ root: taskInfo: name: deletepvc schemaVersion: 2.1.0 -sdkVersion: kfp-2.4.0 +sdkVersion: kfp-2.10.1 --- platforms: kubernetes: diff --git a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc_from_task_output.yaml b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc_from_task_output.yaml index 1348100c461..6d14fa15cef 100644 --- a/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc_from_task_output.yaml +++ b/kubernetes_platform/python/test/snapshot/data/create_mount_delete_existing_pvc_from_task_output.yaml @@ -21,6 +21,11 @@ components: `_. isOptional: true parameterType: STRUCT + data_source: + description: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource + `_. + isOptional: true + parameterType: STRUCT pvc_name: description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name `_. @@ -92,7 +97,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -126,7 +131,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -205,7 +210,7 @@ root: taskInfo: name: get-pvc-name schemaVersion: 2.1.0 -sdkVersion: kfp-2.4.0 +sdkVersion: kfp-2.10.1 --- platforms: kubernetes: