Skip to content

Commit

Permalink
Clean any orphaned mounts and tracking files at Trident node start-up
Browse files Browse the repository at this point in the history
This commit performs a force detach for any orphaned mounts and removes stale tracking files before the Trident CSI Node Server, or any self-healing thread activates.
  • Loading branch information
jwebster7 authored Feb 24, 2024
1 parent 4d87533 commit 370f5e4
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 36 deletions.
84 changes: 48 additions & 36 deletions frontend/csi/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,31 +1729,47 @@ func (p *Plugin) stopReconcilingNodePublications(ctx context.Context) {
if p.stopNodePublicationLoop != nil {
close(p.stopNodePublicationLoop)
}
Logc(ctx).Info("Stopped the node publication reconciliation service.")
}

// reconcileNodePublicationState cleans any stale published path for volumes on the node by rectifying the actual state
// of publications (published paths on the node) against the desired state of publications from the CSI controller.
// If all published paths are cleaned successfully and the node is cleanable, it updates the Trident node CR via
// the CSI controller REST API.
// If a node is not in a cleanable state, it will not mark the node as clean / reconciled.
// If a node is not in a cleanable state, it will not mark the node as clean.
func (p *Plugin) reconcileNodePublicationState(ctx context.Context) error {
defer func() {
// Reset the Timer only after the cleanup process is complete, regardless of if it fails or not.
p.nodePublicationTimer.Reset(p.refreshTimerPeriod(ctx))
}()

// For force detach purposes, always get the node and check if it needs to be updated.
node, err := p.restClient.GetNode(ctx, p.nodeName)
if err != nil {
Logc(ctx).WithError(err).Error("Failed to get node state from the CSI controller server.")
return err
}

// For now, only cleanup the node iff the node is not clean.
if node.PublicationState == utils.NodeClean {
Logc(ctx).Debug("Node is clean, nothing to do.")
return nil
}

if err := p.performNodeCleanup(ctx); err != nil {
Logc(ctx).WithError(err).Error("Failed to clean stale node publications.")
return err
}

return p.updateNodePublicationState(ctx, node.PublicationState)
}

// performNodeCleanup will discover the difference between the volume tracking information stored on the node, and the
// publication records stored in the controller's persistence. It will then force unstage any stale volume attachments
// and remove their relevant tracking files. This is only intended to be called after the node has registered with
// the controller.
func (p *Plugin) performNodeCleanup(ctx context.Context) error {
Logc(ctx).Debug("Performing node cleanup.")

// Discover the desired publication state.
desiredPublicationState, err := p.discoverDesiredPublicationState(ctx)
if err != nil {
Expand All @@ -1766,32 +1782,24 @@ func (p *Plugin) reconcileNodePublicationState(ctx context.Context) error {
return errors.WrapWithReconcileFailedError(err, "reconcile failed")
}

// Discover and clean stale publications iff there are entries in the actual publication state.
if len(actualPublicationState) != 0 {
// Get the delta between the actual and desired states and attempt to clean up stale publications.
stalePublications := p.discoverStalePublications(ctx, actualPublicationState, desiredPublicationState)
err = p.cleanStalePublications(ctx, stalePublications)
if err != nil {
Logc(ctx).WithError(err).Error("Failed to clean node publication state.")
// Check for stale publication records. If any exist, clean them.
stalePublications := p.discoverStalePublications(ctx, actualPublicationState, desiredPublicationState)
if len(stalePublications) != 0 {
if err = p.cleanStalePublications(ctx, stalePublications); err != nil {
return errors.WrapWithReconcileFailedError(err, "reconcile failed")
}
} else {
Logc(ctx).Debug("No publication state found on this node.")
}

// Nodes are only ready to move to a clean state if they are in a cleanable state.
return p.updateNodePublicationState(ctx, node.PublicationState)
return nil
}

// discoverDesiredPublicationState discovers the desired state of published volumes on the CSI controller and returns
// a mapping of volumeID -> publications.
func (p *Plugin) discoverDesiredPublicationState(ctx context.Context) (map[string]*utils.VolumePublicationExternal, error) {
Logc(ctx).Debug("Retrieving desired publication state.")
defer Logc(ctx).Debug("Retrieved desired publication state.")
Logc(ctx).Debug("Discovering desired publication state.")

publications, err := p.restClient.ListVolumePublicationsForNode(ctx, p.nodeName)
if err != nil {
Logc(ctx).Debug("Failed to get desired publication state.")
return nil, fmt.Errorf("failed to get desired publication state")
}

Expand All @@ -1806,12 +1814,10 @@ func (p *Plugin) discoverDesiredPublicationState(ctx context.Context) (map[strin
// discoverActualPublicationState discovers the actual state of published volumes on the node and returns
// a mapping of volumeID -> tracking information.
func (p *Plugin) discoverActualPublicationState(ctx context.Context) (map[string]*utils.VolumeTrackingInfo, error) {
Logc(ctx).Debug("Retrieving actual publication state.")
defer Logc(ctx).Debug("Retrieved actual publication state.")
Logc(ctx).Debug("Discovering actual publication state.")

actualPublicationState, err := p.nodeHelper.ListVolumeTrackingInfo(ctx)
if err != nil && !errors.IsNotFoundError(err) {
Logc(ctx).Debug("Failed to get actual publication state.")
return nil, fmt.Errorf("failed to get actual publication state")
}

Expand All @@ -1826,9 +1832,8 @@ func (p *Plugin) discoverStalePublications(
desiredPublicationState map[string]*utils.VolumePublicationExternal,
) map[string]*utils.VolumeTrackingInfo {
Logc(ctx).Debug("Discovering stale volume publications.")
defer Logc(ctx).Debug("Discovered stale volume publications.")

// Track the deltas between actual and desired publication state.
// Track the delta between actual (node-side) and desired (controller-side) publication state.
stalePublications := make(map[string]*utils.VolumeTrackingInfo, 0)

// Reconcile the actual state of publications to the desired state of publications.
Expand All @@ -1838,8 +1843,7 @@ func (p *Plugin) discoverStalePublications(
// If we find the publication in the desired state, then we don't want to do anything.
// Otherwise, remove the published paths and tracking info on the node.
if _, ok := desiredPublicationState[volumeID]; !ok {
Logc(ctx).WithFields(fields).Debug("Volume found with no matching CSI controller publication record; " +
"unstaging the volume.")
Logc(ctx).WithFields(fields).Debug("Volume has no matching volume publication record.")
stalePublications[volumeID] = trackingInfo
}
}
Expand All @@ -1851,41 +1855,49 @@ func (p *Plugin) discoverStalePublications(
// object in the CSI controller. It should never publish volumes to the node.
func (p *Plugin) cleanStalePublications(ctx context.Context, stalePublications map[string]*utils.VolumeTrackingInfo) error {
Logc(ctx).Debug("Cleaning stale node publication state.")
defer Logc(ctx).Debug("Cleaned stale node publication state.")

// Clean stale volume publication state.
var err error
for volumeID, trackingInfo := range stalePublications {
var fields LogFields

// If no published paths exist for a still staged volume, then it means CO / kubelet
// died before it could finish CSI unpublish and unstage for this given volume.
// These unpublish calls act as a best-effort to abide by and act within the CSI workflow.
for targetPath := range trackingInfo.PublishedPaths {
fields = LogFields{
"volumeID": volumeID,
"targetPath": targetPath,
}

// Both VolumeID and TargetPath are required for NodeUnpublishVolume.
unpublishReq := &csi.NodeUnpublishVolumeRequest{
VolumeId: volumeID,
TargetPath: targetPath,
}
if _, err := p.NodeUnpublishVolume(ctx, unpublishReq); err != nil {
Logc(ctx).WithFields(LogFields{
"volumeID": volumeID,
"targetPath": targetPath,
}).Debug("Failed to unpublish the volume.")
err = multierr.Combine(err, fmt.Errorf("failed to unpublish the volume; %v", err))
if _, unpublishErr := p.NodeUnpublishVolume(ctx, unpublishReq); unpublishErr != nil {
Logc(ctx).WithFields(fields).WithError(unpublishErr).Debug("Failed to unpublish volume.")
err = multierr.Combine(unpublishErr, fmt.Errorf("failed to unpublish volume; %v", unpublishErr))
} else {
Logc(ctx).WithFields(fields).Debug("Unpublished stale volume.")
}
}

fields = LogFields{
"volumeID": volumeID,
"stagingTargetPath": trackingInfo.StagingTargetPath,
}

// Both VolumeID and StagingTargetPath are required for nodeUnstageVolume.
unstageReq := &csi.NodeUnstageVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: trackingInfo.StagingTargetPath,
}
if _, err := p.nodeUnstageVolume(ctx, unstageReq, true); err != nil {
Logc(ctx).WithFields(LogFields{
"volumeID": volumeID,
"stagingTargetPath": trackingInfo.StagingTargetPath,
}).Debug("Failed to force unstage the volume.")
err = multierr.Combine(err, fmt.Errorf("failed to force unstage the volume; %v", err))
if _, unstageErr := p.nodeUnstageVolume(ctx, unstageReq, true); unstageErr != nil {
Logc(ctx).WithFields(fields).WithError(unstageErr).Debug("Failed to force unstage volume.")
err = multierr.Combine(unstageErr, fmt.Errorf("failed to force unstage volume; %v", unstageErr))
} else {
Logc(ctx).WithFields(fields).Debug("Force detached stale volume attachment.")
}
}

Expand Down
108 changes: 108 additions & 0 deletions frontend/csi/node_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,114 @@ func TestDiscoverStalePublications_DiscoversStalePublicationsCorrectly(t *testin
assert.NotContains(t, stalePublications, volumeThree, fmt.Sprintf("expected %s to not exist in stale publications", volumeThree))
}

func TestPerformNodeCleanup_ShouldNotDiscoverAnyStalePublications(t *testing.T) {
ctx := context.Background()
nodeName := "bar"
volume := "pvc-85987a99-648d-4d84-95df-47d0256ca2ab"
desiredPublicationState := []*utils.VolumePublicationExternal{
{
Name: utils.GenerateVolumePublishName(volume, nodeName),
NodeName: nodeName,
VolumeName: volume,
},
}
actualPublicationState := map[string]*utils.VolumeTrackingInfo{
volume: {
VolumePublishInfo: utils.VolumePublishInfo{},
StagingTargetPath: "/var/lib/kubelet/plugins/kubernetes.io/csi/csi.trident.netapp.io/" +
"6b1f46a23d50f8d6a2e2f24c63c3b6e73f82e8b982bdb41da4eb1d0b49d787dd/globalmount",
PublishedPaths: map[string]struct{}{
"/var/lib/kubelet/pods/b9f476af-47f4-42d8-8cfa-70d49394d9e3/volumes/kubernetes.io~csi/" +
volume + "/mount": {},
},
},
}

mockCtrl := gomock.NewController(t)
mockRestClient := mockControllerAPI.NewMockTridentController(mockCtrl)
mockNodeHelper := mockNodeHelpers.NewMockNodeHelper(mockCtrl)
mockRestClient.EXPECT().ListVolumePublicationsForNode(ctx, nodeName).Return(desiredPublicationState, nil)
mockNodeHelper.EXPECT().ListVolumeTrackingInfo(ctx).Return(actualPublicationState, nil)

nodeServer := &Plugin{
role: CSINode,
nodeName: nodeName,
restClient: mockRestClient,
nodeHelper: mockNodeHelper,
enableForceDetach: true,
}
err := nodeServer.performNodeCleanup(ctx)
assert.NoError(t, err, "expected no error")
}

func TestPerformNodeCleanup_ShouldFailToDiscoverDesiredPublicationsFromControllerAPI(t *testing.T) {
ctx := context.Background()
nodeName := "bar"
volume := "pvc-85987a99-648d-4d84-95df-47d0256ca2ab"
desiredPublicationState := []*utils.VolumePublicationExternal{
{
Name: utils.GenerateVolumePublishName(volume, nodeName),
NodeName: nodeName,
VolumeName: volume,
},
}

mockCtrl := gomock.NewController(t)
mockRestClient := mockControllerAPI.NewMockTridentController(mockCtrl)
mockRestClient.EXPECT().ListVolumePublicationsForNode(
ctx, nodeName,
).Return(desiredPublicationState, errors.New("api error"))

nodeServer := &Plugin{
role: CSINode,
nodeName: nodeName,
restClient: mockRestClient,
enableForceDetach: true,
}
err := nodeServer.performNodeCleanup(ctx)
assert.Error(t, err, "expected an error")
}

func TestPerformNodeCleanup_ShouldFailToDiscoverActualPublicationsFromHost(t *testing.T) {
ctx := context.Background()
nodeName := "bar"
volume := "pvc-85987a99-648d-4d84-95df-47d0256ca2ab"
desiredPublicationState := []*utils.VolumePublicationExternal{
{
Name: utils.GenerateVolumePublishName(volume, nodeName),
NodeName: nodeName,
VolumeName: volume,
},
}
actualPublicationState := map[string]*utils.VolumeTrackingInfo{
volume: {
VolumePublishInfo: utils.VolumePublishInfo{},
StagingTargetPath: "/var/lib/kubelet/plugins/kubernetes.io/csi/csi.trident.netapp.io/" +
"6b1f46a23d50f8d6a2e2f24c63c3b6e73f82e8b982bdb41da4eb1d0b49d787dd/globalmount",
PublishedPaths: map[string]struct{}{
"/var/lib/kubelet/pods/b9f476af-47f4-42d8-8cfa-70d49394d9e3/volumes/kubernetes.io~csi/" +
volume + "/mount": {},
},
},
}

mockCtrl := gomock.NewController(t)
mockRestClient := mockControllerAPI.NewMockTridentController(mockCtrl)
mockNodeHelper := mockNodeHelpers.NewMockNodeHelper(mockCtrl)
mockRestClient.EXPECT().ListVolumePublicationsForNode(ctx, nodeName).Return(desiredPublicationState, nil)
mockNodeHelper.EXPECT().ListVolumeTrackingInfo(ctx).Return(actualPublicationState, errors.New("file I/O error"))

nodeServer := &Plugin{
role: CSINode,
nodeName: nodeName,
restClient: mockRestClient,
nodeHelper: mockNodeHelper,
enableForceDetach: true,
}
err := nodeServer.performNodeCleanup(ctx)
assert.Error(t, err, "expected an error")
}

func TestUpdateNodePublicationState_NodeNotCleanable(t *testing.T) {
ctx := context.Background()
nodeState := utils.NodeDirty
Expand Down
8 changes: 8 additions & 0 deletions frontend/csi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,20 @@ func (p *Plugin) Activate() error {

if p.role == CSINode || p.role == CSIAllInOne {
p.nodeRegisterWithController(ctx, 0) // Retry indefinitely

// Cleanup any stale volume publication state immediately so self-healing works with current data.
if err := p.performNodeCleanup(ctx); err != nil {
Logc(ctx).WithError(err).Warn("Failed to clean node; self-healing features may be unreliable.")
}

// Populate the published sessions IFF iSCSI/NVMe self-healing is enabled.
if p.iSCSISelfHealingInterval > 0 || p.nvmeSelfHealingInterval > 0 {
p.populatePublishedSessions(ctx)
}

p.startISCSISelfHealingThread(ctx)
p.startNVMeSelfHealingThread(ctx)

if p.enableForceDetach {
p.startReconcilingNodePublications(ctx)
}
Expand Down

0 comments on commit 370f5e4

Please sign in to comment.