Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk/backend): add data_source parameter to PVC creation #11439

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* **internal:** Adding proto field to support regional quota ([c8f08ba](https://github.com/kubeflow/pipelines/commit/c8f08ba49f92f53269d71425666c0bc3a687615d))
* **kubernetes_platform:** Add empty dir mount ([\#10892](https://github.com/kubeflow/pipelines/issues/10892)) ([10aaf43](https://github.com/kubeflow/pipelines/commit/10aaf431367e974bf6c73306acf6a7fd40e36942))
* **kubernetes_platform:** Update kubernetes_platform go package to include EnabledSharedMemory ([\#10703](https://github.com/kubeflow/pipelines/issues/10703)) ([7c63599](https://github.com/kubeflow/pipelines/commit/7c6359984314472bf801ea1ba8b0e8c5d9e2be2c))

* **kubernetes_platform:** Update kubernetes_platform go package to include node DataSource on PersistentVolumeClaim's creation ([\#11420](https://github.com/kubeflow/pipelines/issues/11420)

### Bug Fixes

Expand Down
33 changes: 33 additions & 0 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,15 @@ func createPVC(
},
}

// Optional input: data_source
if pvcDataSourceInput, ok := inputs.ParameterValues["data_source"]; ok {
dataSource, err := buildPVCDataSource(pvcDataSourceInput)
if err != nil {
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to build data source: %w", err)
}
pvc.Spec.DataSource = dataSource
}

// Create the PVC in the cluster
createdPVC, err := k8sClient.CoreV1().PersistentVolumeClaims(opts.Namespace).Create(context.Background(), pvc, metav1.CreateOptions{})
if err != nil {
Expand All @@ -1811,6 +1820,30 @@ func createPVC(
return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil
}

// buildPVCDataSource converts a protobuf Value representing a PVC data source
// into a Kubernetes TypedLocalObjectReference. If the input is nil or if JSON
// marshaling/unmarshaling fails, it returns an error. Field validation is
// deferred to the Kubernetes API during PVC creation.
func buildPVCDataSource(pvcDataSourceInput *structpb.Value) (*k8score.TypedLocalObjectReference, error) {
if pvcDataSourceInput == nil {
return nil, fmt.Errorf("data_source is nil")
}

var dataSource k8score.TypedLocalObjectReference
dataSourceStructByte, err := pvcDataSourceInput.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal data_source %v: %w", pvcDataSourceInput.String(), err)
}

if err := json.Unmarshal(dataSourceStructByte, &dataSource); err != nil {
return nil, fmt.Errorf("failed to unmarshal data_source: %v. %w", string(dataSourceStructByte), err)
}

// Don't do any validation here, if there is a missing required fields, the k8s API will return
// an error during the PVC creation.
return &dataSource, nil
}

func deletePVC(
ctx context.Context,
k8sClient kubernetes.Interface,
Expand Down
55 changes: 55 additions & 0 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"encoding/json"
"testing"

"google.golang.org/protobuf/types/known/structpb"
k8sres "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/spf13/viper"
Expand Down Expand Up @@ -1621,3 +1623,56 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
})
}
}

func TestBuildPVCWithDataSource(t *testing.T) {
tests := []struct {
name string
input *structpb.Value
expected *k8score.TypedLocalObjectReference
expectError bool
}{
{
name: "Valid data source",
input: structpb.NewStructValue(
&structpb.Struct{
Fields: map[string]*structpb.Value{
"apiGroup": structpb.NewStringValue("snapshot.storage.k8s.io"),
"kind": structpb.NewStringValue("VolumeSnapshot"),
"name": structpb.NewStringValue("snapshot-name"),
},
},
),
expected: &k8score.TypedLocalObjectReference{
APIGroup: util.StringPointer("snapshot.storage.k8s.io"),
Kind: "VolumeSnapshot",
Name: "snapshot-name",
},
expectError: false,
},
{
name: "Nil input",
input: nil,
expected: nil,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Call the function
result, err := buildPVCDataSource(tt.input)

// Assert
if tt.expectError {
assert.Error(t, err, "Expected an error but got none")
assert.Nil(t, result, "Expected result to be nil on error")
} else {
assert.NoError(t, err, "Unexpected error: %v", err)
assert.NotNil(t, result, "Expected result to be non-nil")
assert.Equal(t, tt.expected.Name, result.Name, "Mismatched Name field")
assert.Equal(t, tt.expected.Kind, result.Kind, "Mismatched Kind field")
assert.Equal(t, tt.expected.APIGroup, result.APIGroup, "Mismatched APIGroup field")
}
})
}
}
1 change: 1 addition & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Bump image for Structured Data pipelines.
* Add strategy to v1 GCPC custom job components/utils
* Apply latest GCPC image vulnerability resolutions (base OS and software updates)
* Support PVC creation using the data_source parameter.

## Release 2.17.0
* Fix Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline` after output schema change.
Expand Down
2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def CreatePVC(
storage_class_name: Optional[str] = '',
volume_name: Optional[str] = None,
annotations: Optional[Dict[str, str]] = None,
data_source: Optional[Dict[str, str]] = None,
):
"""Create a PersistentVolumeClaim, which can be used by downstream tasks.
See `PersistentVolume <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistent-volumes>`_ and `PersistentVolumeClaim <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims>`_ documentation for more information about
Expand All @@ -54,6 +55,7 @@ def CreatePVC(
provisioned PersistentVolumeClaim. Used for statically
specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaimSpec>`_.
annotations: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
data_source: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#beta-level>`_.

Returns:
``name: str`` \n\t\t\tName of the generated PVC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def my_pipeline():
access_modes=['ReadWriteOnce'],
size='5Mi',
storage_class_name='standard',
data_source={
'api_group': 'snapshot.storage.k8s.io',
'kind': 'VolumeSnapshot',
'name': 'my-snap',
},
)

task1 = producer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ components:
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
isOptional: true
parameterType: STRUCT
data_source:
description: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#beta-level>`_.
isOptional: true
parameterType: STRUCT
pvc_name:
description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
Expand Down Expand Up @@ -96,7 +101,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -132,7 +137,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -177,6 +182,12 @@ root:
runtimeValue:
constant:
- ReadWriteOnce
data_source:
runtimeValue:
constant:
api_group: snapshot.storage.k8s.io
kind: VolumeSnapshot
name: my-snap
pvc_name_suffix:
runtimeValue:
constant: -my-pvc
Expand Down Expand Up @@ -214,7 +225,7 @@ root:
taskInfo:
name: producer
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0
sdkVersion: kfp-2.10.1
---
platforms:
kubernetes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ components:
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
isOptional: true
parameterType: STRUCT
data_source:
description: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#beta-level>`_.
isOptional: true
parameterType: STRUCT
pvc_name:
description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
Expand Down Expand Up @@ -86,7 +91,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -161,7 +166,7 @@ root:
taskInfo:
name: deletepvc
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0
sdkVersion: kfp-2.10.1
---
platforms:
kubernetes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ components:
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
isOptional: true
parameterType: STRUCT
data_source:
description: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#beta-level>`_.
isOptional: true
parameterType: STRUCT
pvc_name:
description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
Expand Down Expand Up @@ -92,7 +97,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -126,7 +131,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -205,7 +210,7 @@ root:
taskInfo:
name: get-pvc-name
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0
sdkVersion: kfp-2.10.1
---
platforms:
kubernetes:
Expand Down
Loading