Skip to content

Commit

Permalink
feat(sdk/backend): add data_source parameter to PVC creation
Browse files Browse the repository at this point in the history
The kpf.kubernetes SDK now supports creating PVC from a data source.
This feature would enable users to create PVCs with pre-populated data,
aligning with Kubernetes capabilities for cloning or restoring PVCs
from existing volumes or snapshots.

This is how it can be done:

```python
pvc = kfp.Kubernetes.CreatePVC(
    pvc_name_suffix="-foo",
    access_modes=["ReadWriteOnce"],
    size="100Gi",
    data_source={"api_group": "snapshot.storage.k8s.io",
        "kind": "VolumeSnapshot",
        "name": "my-snap",
    },
)
```

Resolves: #11420
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Dec 4, 2024
1 parent b4799df commit 11dd7a0
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* **internal:** Adding proto field to support regional quota ([c8f08ba](https://github.com/kubeflow/pipelines/commit/c8f08ba49f92f53269d71425666c0bc3a687615d))
* **kubernetes_platform:** Add empty dir mount ([\#10892](https://github.com/kubeflow/pipelines/issues/10892)) ([10aaf43](https://github.com/kubeflow/pipelines/commit/10aaf431367e974bf6c73306acf6a7fd40e36942))
* **kubernetes_platform:** Update kubernetes_platform go package to include EnabledSharedMemory ([\#10703](https://github.com/kubeflow/pipelines/issues/10703)) ([7c63599](https://github.com/kubeflow/pipelines/commit/7c6359984314472bf801ea1ba8b0e8c5d9e2be2c))

* **kubernetes_platform:** Update kubernetes_platform go package to include node DataSource on PersistentVolumeClaim's creation ([\#11420](https://github.com/kubeflow/pipelines/issues/11420)

### Bug Fixes

Expand Down
33 changes: 33 additions & 0 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,15 @@ func createPVC(
},
}

// Optional input: data_source
if pvcDataSourceInput, ok := inputs.ParameterValues["data_source"]; ok {
dataSource, err := buildPVCDataSource(pvcDataSourceInput)
if err != nil {
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to build data source: %w", err)
}
pvc.Spec.DataSource = dataSource
}

// Create the PVC in the cluster
createdPVC, err := k8sClient.CoreV1().PersistentVolumeClaims(opts.Namespace).Create(context.Background(), pvc, metav1.CreateOptions{})
if err != nil {
Expand All @@ -1811,6 +1820,30 @@ func createPVC(
return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil
}

// buildPVCDataSource converts a protobuf Value representing a PVC data source
// into a Kubernetes TypedLocalObjectReference. If the input is nil or if JSON
// marshaling/unmarshaling fails, it returns an error. Field validation is
// deferred to the Kubernetes API during PVC creation.
func buildPVCDataSource(pvcDataSourceInput *structpb.Value) (*k8score.TypedLocalObjectReference, error) {
if pvcDataSourceInput == nil {
return nil, fmt.Errorf("data_source is nil")
}

var dataSource k8score.TypedLocalObjectReference
dataSourceStructByte, err := pvcDataSourceInput.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal data_source %v: %w", pvcDataSourceInput.String(), err)
}

if err := json.Unmarshal(dataSourceStructByte, &dataSource); err != nil {
return nil, fmt.Errorf("failed to unmarshal data_source: %v. %w", string(dataSourceStructByte), err)
}

// Don't do any validation here, if there is a missing required fields, the k8s API will return
// an error during the PVC creation.
return &dataSource, nil
}

func deletePVC(
ctx context.Context,
k8sClient kubernetes.Interface,
Expand Down
55 changes: 55 additions & 0 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"encoding/json"
"testing"

"google.golang.org/protobuf/types/known/structpb"
k8sres "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/spf13/viper"
Expand Down Expand Up @@ -1621,3 +1623,56 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
})
}
}

func TestBuildPVCWithDataSource(t *testing.T) {
tests := []struct {
name string
input *structpb.Value
expected *k8score.TypedLocalObjectReference
expectError bool
}{
{
name: "Valid data source",
input: structpb.NewStructValue(
&structpb.Struct{
Fields: map[string]*structpb.Value{
"apiGroup": structpb.NewStringValue("snapshot.storage.k8s.io"),
"kind": structpb.NewStringValue("VolumeSnapshot"),
"name": structpb.NewStringValue("snapshot-name"),
},
},
),
expected: &k8score.TypedLocalObjectReference{
APIGroup: util.StringPointer("snapshot.storage.k8s.io"),
Kind: "VolumeSnapshot",
Name: "snapshot-name",
},
expectError: false,
},
{
name: "Nil input",
input: nil,
expected: nil,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Call the function
result, err := buildPVCDataSource(tt.input)

// Assert
if tt.expectError {
assert.Error(t, err, "Expected an error but got none")
assert.Nil(t, result, "Expected result to be nil on error")
} else {
assert.NoError(t, err, "Unexpected error: %v", err)
assert.NotNil(t, result, "Expected result to be non-nil")
assert.Equal(t, tt.expected.Name, result.Name, "Mismatched Name field")
assert.Equal(t, tt.expected.Kind, result.Kind, "Mismatched Kind field")
assert.Equal(t, tt.expected.APIGroup, result.APIGroup, "Mismatched APIGroup field")
}
})
}
}
1 change: 1 addition & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Bump image for Structured Data pipelines.
* Add strategy to v1 GCPC custom job components/utils
* Apply latest GCPC image vulnerability resolutions (base OS and software updates)
* Support PVC creation using the data_source parameter.

## Release 2.17.0
* Fix Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline` after output schema change.
Expand Down
2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def CreatePVC(
storage_class_name: Optional[str] = '',
volume_name: Optional[str] = None,
annotations: Optional[Dict[str, str]] = None,
data_source: Optional[Dict[str, str]] = None,
):
"""Create a PersistentVolumeClaim, which can be used by downstream tasks.
See `PersistentVolume <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistent-volumes>`_ and `PersistentVolumeClaim <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims>`_ documentation for more information about
Expand All @@ -54,6 +55,7 @@ def CreatePVC(
provisioned PersistentVolumeClaim. Used for statically
specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaimSpec>`_.
annotations: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
data_source: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#beta-level>`_.
Returns:
``name: str`` \n\t\t\tName of the generated PVC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def my_pipeline():
access_modes=['ReadWriteOnce'],
size='5Mi',
storage_class_name='standard',
data_source={
'api_group': 'snapshot.storage.k8s.io',
'kind': 'VolumeSnapshot',
'name': 'my-snap',
},
)

task1 = producer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ components:
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
isOptional: true
parameterType: STRUCT
data_source:
description: Data source to use for the PVC's creation. Corresponds to `PersistentVolumeClaim.spec.dataSource
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#beta-level>`_.
isOptional: true
parameterType: STRUCT
pvc_name:
description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name
<https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
Expand Down Expand Up @@ -96,7 +101,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -132,7 +137,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -177,6 +182,12 @@ root:
runtimeValue:
constant:
- ReadWriteOnce
data_source:
runtimeValue:
constant:
api_group: snapshot.storage.k8s.io
kind: VolumeSnapshot
name: my-snap
pvc_name_suffix:
runtimeValue:
constant: -my-pvc
Expand Down Expand Up @@ -214,7 +225,7 @@ root:
taskInfo:
name: producer
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0
sdkVersion: kfp-2.10.1
---
platforms:
kubernetes:
Expand Down

0 comments on commit 11dd7a0

Please sign in to comment.