diff --git a/frontend/csi/node_server.go b/frontend/csi/node_server.go index 1655fa1d3..e48a8d031 100644 --- a/frontend/csi/node_server.go +++ b/frontend/csi/node_server.go @@ -1729,31 +1729,47 @@ func (p *Plugin) stopReconcilingNodePublications(ctx context.Context) { if p.stopNodePublicationLoop != nil { close(p.stopNodePublicationLoop) } - Logc(ctx).Info("Stopped the node publication reconciliation service.") } // reconcileNodePublicationState cleans any stale published path for volumes on the node by rectifying the actual state // of publications (published paths on the node) against the desired state of publications from the CSI controller. // If all published paths are cleaned successfully and the node is cleanable, it updates the Trident node CR via // the CSI controller REST API. -// If a node is not in a cleanable state, it will not mark the node as clean / reconciled. +// If a node is not in a cleanable state, it will not mark the node as clean. func (p *Plugin) reconcileNodePublicationState(ctx context.Context) error { defer func() { // Reset the Timer only after the cleanup process is complete, regardless of if it fails or not. p.nodePublicationTimer.Reset(p.refreshTimerPeriod(ctx)) }() + // For force detach purposes, always get the node and check if it needs to be updated. node, err := p.restClient.GetNode(ctx, p.nodeName) if err != nil { Logc(ctx).WithError(err).Error("Failed to get node state from the CSI controller server.") return err } + // For now, only cleanup the node iff the node is not clean. if node.PublicationState == utils.NodeClean { Logc(ctx).Debug("Node is clean, nothing to do.") return nil } + if err := p.performNodeCleanup(ctx); err != nil { + Logc(ctx).WithError(err).Error("Failed to clean stale node publications.") + return err + } + + return p.updateNodePublicationState(ctx, node.PublicationState) +} + +// performNodeCleanup will discover the difference between the volume tracking information stored on the node, and the +// publication records stored in the controller's persistence. It will then force unstage any stale volume attachments +// and remove their relevant tracking files. This is only intended to be called after the node has registered with +// the controller. +func (p *Plugin) performNodeCleanup(ctx context.Context) error { + Logc(ctx).Debug("Performing node cleanup.") + // Discover the desired publication state. desiredPublicationState, err := p.discoverDesiredPublicationState(ctx) if err != nil { @@ -1766,32 +1782,24 @@ func (p *Plugin) reconcileNodePublicationState(ctx context.Context) error { return errors.WrapWithReconcileFailedError(err, "reconcile failed") } - // Discover and clean stale publications iff there are entries in the actual publication state. - if len(actualPublicationState) != 0 { - // Get the delta between the actual and desired states and attempt to clean up stale publications. - stalePublications := p.discoverStalePublications(ctx, actualPublicationState, desiredPublicationState) - err = p.cleanStalePublications(ctx, stalePublications) - if err != nil { - Logc(ctx).WithError(err).Error("Failed to clean node publication state.") + // Check for stale publication records. If any exist, clean them. + stalePublications := p.discoverStalePublications(ctx, actualPublicationState, desiredPublicationState) + if len(stalePublications) != 0 { + if err = p.cleanStalePublications(ctx, stalePublications); err != nil { return errors.WrapWithReconcileFailedError(err, "reconcile failed") } - } else { - Logc(ctx).Debug("No publication state found on this node.") } - // Nodes are only ready to move to a clean state if they are in a cleanable state. - return p.updateNodePublicationState(ctx, node.PublicationState) + return nil } // discoverDesiredPublicationState discovers the desired state of published volumes on the CSI controller and returns // a mapping of volumeID -> publications. func (p *Plugin) discoverDesiredPublicationState(ctx context.Context) (map[string]*utils.VolumePublicationExternal, error) { - Logc(ctx).Debug("Retrieving desired publication state.") - defer Logc(ctx).Debug("Retrieved desired publication state.") + Logc(ctx).Debug("Discovering desired publication state.") publications, err := p.restClient.ListVolumePublicationsForNode(ctx, p.nodeName) if err != nil { - Logc(ctx).Debug("Failed to get desired publication state.") return nil, fmt.Errorf("failed to get desired publication state") } @@ -1806,12 +1814,10 @@ func (p *Plugin) discoverDesiredPublicationState(ctx context.Context) (map[strin // discoverActualPublicationState discovers the actual state of published volumes on the node and returns // a mapping of volumeID -> tracking information. func (p *Plugin) discoverActualPublicationState(ctx context.Context) (map[string]*utils.VolumeTrackingInfo, error) { - Logc(ctx).Debug("Retrieving actual publication state.") - defer Logc(ctx).Debug("Retrieved actual publication state.") + Logc(ctx).Debug("Discovering actual publication state.") actualPublicationState, err := p.nodeHelper.ListVolumeTrackingInfo(ctx) if err != nil && !errors.IsNotFoundError(err) { - Logc(ctx).Debug("Failed to get actual publication state.") return nil, fmt.Errorf("failed to get actual publication state") } @@ -1826,9 +1832,8 @@ func (p *Plugin) discoverStalePublications( desiredPublicationState map[string]*utils.VolumePublicationExternal, ) map[string]*utils.VolumeTrackingInfo { Logc(ctx).Debug("Discovering stale volume publications.") - defer Logc(ctx).Debug("Discovered stale volume publications.") - // Track the deltas between actual and desired publication state. + // Track the delta between actual (node-side) and desired (controller-side) publication state. stalePublications := make(map[string]*utils.VolumeTrackingInfo, 0) // Reconcile the actual state of publications to the desired state of publications. @@ -1838,8 +1843,7 @@ func (p *Plugin) discoverStalePublications( // If we find the publication in the desired state, then we don't want to do anything. // Otherwise, remove the published paths and tracking info on the node. if _, ok := desiredPublicationState[volumeID]; !ok { - Logc(ctx).WithFields(fields).Debug("Volume found with no matching CSI controller publication record; " + - "unstaging the volume.") + Logc(ctx).WithFields(fields).Debug("Volume has no matching volume publication record.") stalePublications[volumeID] = trackingInfo } } @@ -1851,41 +1855,49 @@ func (p *Plugin) discoverStalePublications( // object in the CSI controller. It should never publish volumes to the node. func (p *Plugin) cleanStalePublications(ctx context.Context, stalePublications map[string]*utils.VolumeTrackingInfo) error { Logc(ctx).Debug("Cleaning stale node publication state.") - defer Logc(ctx).Debug("Cleaned stale node publication state.") // Clean stale volume publication state. var err error for volumeID, trackingInfo := range stalePublications { + var fields LogFields // If no published paths exist for a still staged volume, then it means CO / kubelet // died before it could finish CSI unpublish and unstage for this given volume. // These unpublish calls act as a best-effort to abide by and act within the CSI workflow. for targetPath := range trackingInfo.PublishedPaths { + fields = LogFields{ + "volumeID": volumeID, + "targetPath": targetPath, + } + // Both VolumeID and TargetPath are required for NodeUnpublishVolume. unpublishReq := &csi.NodeUnpublishVolumeRequest{ VolumeId: volumeID, TargetPath: targetPath, } - if _, err := p.NodeUnpublishVolume(ctx, unpublishReq); err != nil { - Logc(ctx).WithFields(LogFields{ - "volumeID": volumeID, - "targetPath": targetPath, - }).Debug("Failed to unpublish the volume.") - err = multierr.Combine(err, fmt.Errorf("failed to unpublish the volume; %v", err)) + if _, unpublishErr := p.NodeUnpublishVolume(ctx, unpublishReq); unpublishErr != nil { + Logc(ctx).WithFields(fields).WithError(unpublishErr).Debug("Failed to unpublish volume.") + err = multierr.Combine(unpublishErr, fmt.Errorf("failed to unpublish volume; %v", unpublishErr)) + } else { + Logc(ctx).WithFields(fields).Debug("Unpublished stale volume.") } } + fields = LogFields{ + "volumeID": volumeID, + "stagingTargetPath": trackingInfo.StagingTargetPath, + } + // Both VolumeID and StagingTargetPath are required for nodeUnstageVolume. unstageReq := &csi.NodeUnstageVolumeRequest{ VolumeId: volumeID, StagingTargetPath: trackingInfo.StagingTargetPath, } - if _, err := p.nodeUnstageVolume(ctx, unstageReq, true); err != nil { - Logc(ctx).WithFields(LogFields{ - "volumeID": volumeID, - "stagingTargetPath": trackingInfo.StagingTargetPath, - }).Debug("Failed to force unstage the volume.") - err = multierr.Combine(err, fmt.Errorf("failed to force unstage the volume; %v", err)) + if _, unstageErr := p.nodeUnstageVolume(ctx, unstageReq, true); unstageErr != nil { + Logc(ctx).WithFields(fields).WithError(unstageErr).Debug("Failed to force unstage volume.") + err = multierr.Combine(unstageErr, fmt.Errorf("failed to force unstage volume; %v", unstageErr)) + } else { + Logc(ctx).WithFields(fields).Debug("Force detached stale volume attachment.") } } diff --git a/frontend/csi/node_server_test.go b/frontend/csi/node_server_test.go index cf9eb26c9..a306b8922 100644 --- a/frontend/csi/node_server_test.go +++ b/frontend/csi/node_server_test.go @@ -767,6 +767,114 @@ func TestDiscoverStalePublications_DiscoversStalePublicationsCorrectly(t *testin assert.NotContains(t, stalePublications, volumeThree, fmt.Sprintf("expected %s to not exist in stale publications", volumeThree)) } +func TestPerformNodeCleanup_ShouldNotDiscoverAnyStalePublications(t *testing.T) { + ctx := context.Background() + nodeName := "bar" + volume := "pvc-85987a99-648d-4d84-95df-47d0256ca2ab" + desiredPublicationState := []*utils.VolumePublicationExternal{ + { + Name: utils.GenerateVolumePublishName(volume, nodeName), + NodeName: nodeName, + VolumeName: volume, + }, + } + actualPublicationState := map[string]*utils.VolumeTrackingInfo{ + volume: { + VolumePublishInfo: utils.VolumePublishInfo{}, + StagingTargetPath: "/var/lib/kubelet/plugins/kubernetes.io/csi/csi.trident.netapp.io/" + + "6b1f46a23d50f8d6a2e2f24c63c3b6e73f82e8b982bdb41da4eb1d0b49d787dd/globalmount", + PublishedPaths: map[string]struct{}{ + "/var/lib/kubelet/pods/b9f476af-47f4-42d8-8cfa-70d49394d9e3/volumes/kubernetes.io~csi/" + + volume + "/mount": {}, + }, + }, + } + + mockCtrl := gomock.NewController(t) + mockRestClient := mockControllerAPI.NewMockTridentController(mockCtrl) + mockNodeHelper := mockNodeHelpers.NewMockNodeHelper(mockCtrl) + mockRestClient.EXPECT().ListVolumePublicationsForNode(ctx, nodeName).Return(desiredPublicationState, nil) + mockNodeHelper.EXPECT().ListVolumeTrackingInfo(ctx).Return(actualPublicationState, nil) + + nodeServer := &Plugin{ + role: CSINode, + nodeName: nodeName, + restClient: mockRestClient, + nodeHelper: mockNodeHelper, + enableForceDetach: true, + } + err := nodeServer.performNodeCleanup(ctx) + assert.NoError(t, err, "expected no error") +} + +func TestPerformNodeCleanup_ShouldFailToDiscoverDesiredPublicationsFromControllerAPI(t *testing.T) { + ctx := context.Background() + nodeName := "bar" + volume := "pvc-85987a99-648d-4d84-95df-47d0256ca2ab" + desiredPublicationState := []*utils.VolumePublicationExternal{ + { + Name: utils.GenerateVolumePublishName(volume, nodeName), + NodeName: nodeName, + VolumeName: volume, + }, + } + + mockCtrl := gomock.NewController(t) + mockRestClient := mockControllerAPI.NewMockTridentController(mockCtrl) + mockRestClient.EXPECT().ListVolumePublicationsForNode( + ctx, nodeName, + ).Return(desiredPublicationState, errors.New("api error")) + + nodeServer := &Plugin{ + role: CSINode, + nodeName: nodeName, + restClient: mockRestClient, + enableForceDetach: true, + } + err := nodeServer.performNodeCleanup(ctx) + assert.Error(t, err, "expected an error") +} + +func TestPerformNodeCleanup_ShouldFailToDiscoverActualPublicationsFromHost(t *testing.T) { + ctx := context.Background() + nodeName := "bar" + volume := "pvc-85987a99-648d-4d84-95df-47d0256ca2ab" + desiredPublicationState := []*utils.VolumePublicationExternal{ + { + Name: utils.GenerateVolumePublishName(volume, nodeName), + NodeName: nodeName, + VolumeName: volume, + }, + } + actualPublicationState := map[string]*utils.VolumeTrackingInfo{ + volume: { + VolumePublishInfo: utils.VolumePublishInfo{}, + StagingTargetPath: "/var/lib/kubelet/plugins/kubernetes.io/csi/csi.trident.netapp.io/" + + "6b1f46a23d50f8d6a2e2f24c63c3b6e73f82e8b982bdb41da4eb1d0b49d787dd/globalmount", + PublishedPaths: map[string]struct{}{ + "/var/lib/kubelet/pods/b9f476af-47f4-42d8-8cfa-70d49394d9e3/volumes/kubernetes.io~csi/" + + volume + "/mount": {}, + }, + }, + } + + mockCtrl := gomock.NewController(t) + mockRestClient := mockControllerAPI.NewMockTridentController(mockCtrl) + mockNodeHelper := mockNodeHelpers.NewMockNodeHelper(mockCtrl) + mockRestClient.EXPECT().ListVolumePublicationsForNode(ctx, nodeName).Return(desiredPublicationState, nil) + mockNodeHelper.EXPECT().ListVolumeTrackingInfo(ctx).Return(actualPublicationState, errors.New("file I/O error")) + + nodeServer := &Plugin{ + role: CSINode, + nodeName: nodeName, + restClient: mockRestClient, + nodeHelper: mockNodeHelper, + enableForceDetach: true, + } + err := nodeServer.performNodeCleanup(ctx) + assert.Error(t, err, "expected an error") +} + func TestUpdateNodePublicationState_NodeNotCleanable(t *testing.T) { ctx := context.Background() nodeState := utils.NodeDirty diff --git a/frontend/csi/plugin.go b/frontend/csi/plugin.go index ebc72499a..8762b15b1 100644 --- a/frontend/csi/plugin.go +++ b/frontend/csi/plugin.go @@ -309,12 +309,20 @@ func (p *Plugin) Activate() error { if p.role == CSINode || p.role == CSIAllInOne { p.nodeRegisterWithController(ctx, 0) // Retry indefinitely + + // Cleanup any stale volume publication state immediately so self-healing works with current data. + if err := p.performNodeCleanup(ctx); err != nil { + Logc(ctx).WithError(err).Warn("Failed to clean node; self-healing features may be unreliable.") + } + // Populate the published sessions IFF iSCSI/NVMe self-healing is enabled. if p.iSCSISelfHealingInterval > 0 || p.nvmeSelfHealingInterval > 0 { p.populatePublishedSessions(ctx) } + p.startISCSISelfHealingThread(ctx) p.startNVMeSelfHealingThread(ctx) + if p.enableForceDetach { p.startReconcilingNodePublications(ctx) }