Skip to content

Commit

Permalink
fix #1206 change send signature to support permanent failures
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed Nov 25, 2024
1 parent a3f30e2 commit 1dea041
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 46 deletions.
100 changes: 56 additions & 44 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,15 @@ def download(self, msg, options) -> int:
return 1

# generalized send...
def send(self, msg, options):
def send(self, msg, options) -> int:
"""
send/transfer one file based on message, return int:
return 0 -- failed temporarily, retry later.
return >0 -- OK download successful.
return <0 -- download failed permanently, retry not useful.
"""

self.o = options
sendTo=self.o.sendTo
logger.debug( f"{self.scheme}_transport sendTo: {sendTo}" )
Expand All @@ -2306,14 +2314,16 @@ def send(self, msg, options):
for plugin in self.plugins['send']:
try:
ok = plugin(msg)
if type(ok) is not bool:
logger.error( f"{plugin} returned {type(ok)}. Should return boolean" )
if type(ok) not in [ bool, int ]:
logger.error( f"{plugin} returned {type(ok)}. Must return boolean or integer, discarding." )
return -1
except Exception as ex:
logger.error( f'flowCallback plugin {plugin} crashed: {ex}' )
logger.debug( "details:", exc_info=True )

if not ok: return False
return True
if type(ok) is bool and not ok: return 0
elif type(ok) is int: return ok
return 1

if self.o.baseDir:
local_path = self.o.variableExpansion(self.o.baseDir,
Expand Down Expand Up @@ -2342,9 +2352,8 @@ def send(self, msg, options):
try:
os.chdir(local_dir)
except Exception as ex:
logger.error("could not chdir to %s to write: %s" % (local_dir, ex))
return False

logger.error("could not chdir locally to %s: %s" % (local_dir, ex))
return -1
try:

if not self.o.dry_run:
Expand All @@ -2361,7 +2370,7 @@ def send(self, msg, options):
self.proto[self.scheme] = sarracenia.transfer.Transfer.factory( self.scheme, options)

ok = self.proto[self.scheme].connect()
if not ok: return False
if not ok: return 0
self.cdir = None
self.metrics['flow']['transferConnected'] = True
self.metrics['flow']['transferConnectStart'] = time.time()
Expand All @@ -2382,7 +2391,7 @@ def send(self, msg, options):
msg['blocks']['method'] == 'inplace'):
logger.error("%s, inplace part file not supported" %
self.scheme)
return False
return -1

#=================================
# if umask, check that the protocol supports it ...
Expand Down Expand Up @@ -2414,7 +2423,7 @@ def send(self, msg, options):
cwd = self.proto[self.scheme].getcwd()
except Exception as ex:
logger.error( f"could not getcwd on {sendTo} : {ex}" )
return False
return 0

if cwd != new_dir:
logger.debug("%s_transport send cd to %s" %
Expand All @@ -2424,7 +2433,7 @@ def send(self, msg, options):
self.proto[self.scheme].cd_forced(new_dir)
except Exception as ex:
logger.error( f"could not chdir to {sendTo} {new_dir}: {ex}" )
return False
return -1

#=================================
# delete event
Expand All @@ -2446,14 +2455,14 @@ def send(self, msg, options):
self.proto[self.scheme].delete(new_file)
except Exception as ex:
logger.error( f"could not delete {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return -1

msg.setReport(201, f'file or directory removed')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
return 1
logger.error("%s, delete not supported" % self.scheme)
return False
return -1

if 'rename' in msg['fileOp'] :
if hasattr(self.proto[self.scheme], 'delete'):
Expand All @@ -2463,14 +2472,14 @@ def send(self, msg, options):
self.proto[self.scheme].rename(msg['fileOp']['rename'], new_file)
except Exception as ex:
logger.error( f"could not rename {sendTo} (in {msg['new_dir']} {msg['fileOp']['rename']} to {new_file}: {ex}" )
return False
return -1

msg.setReport(201, f'file renamed')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
return 1
logger.error("%s, delete not supported" % self.scheme)
return False
return -1

if 'directory' in msg['fileOp'] :
if 'contentType' not in msg:
Expand All @@ -2482,13 +2491,13 @@ def send(self, msg, options):
self.proto[self.scheme].mkdir(new_file)
except Exception as ex:
logger.error( f"could not mkdir {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
msg.setReport(201, f'directory created')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
return 1
logger.error("%s, mkdir not supported" % self.scheme)
return False
return -1


#=================================
Expand All @@ -2505,10 +2514,10 @@ def send(self, msg, options):
self.proto[self.scheme].link(msg['fileOp']['hlink'], new_file)
except Exception as ex:
logger.error( f"could not link {sendTo} in {msg['new_dir']}{os.sep}{msg['fileOp']['hlink']} to {new_file}: {ex}" )
return False
return True
return 0
return 1
logger.error("%s, hardlinks not supported" % self.scheme)
return False
return -1
elif 'link' in msg['fileOp']:
if 'contentType' not in msg:
msg['contentType'] = 'text/link'
Expand All @@ -2519,13 +2528,13 @@ def send(self, msg, options):
self.proto[self.scheme].symlink(msg['fileOp']['link'], new_file)
except Exception as ex:
logger.error( f"could not symlink {sendTo} in {msg['new_dir']} {msg['fileOp']['link']} to {new_file}: {ex}" )
return False
return 0
msg.setReport(201, f'file linked')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
return 1
logger.error("%s, symlink not supported" % self.scheme)
return False
return -1

#=================================
# send event
Expand All @@ -2537,7 +2546,7 @@ def send(self, msg, options):
"product collision or base_dir not set, file %s does not exist"
% local_file)
time.sleep(0.01)
return False
return 0
elif 'size' not in msg:
msg['size'] = os.path.getsize(local_file)

Expand Down Expand Up @@ -2580,7 +2589,7 @@ def send(self, msg, options):
len_written = self.proto[self.scheme].put( msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not send {local_dir}{os.sep}{local_file} to inflight=None {sendTo} {msg['new_dir']} ... {new_file}: {ex}" )
return False
return 0

elif (('blocks' in msg)
and (msg['blocks']['method'] == 'inplace')):
Expand All @@ -2589,8 +2598,8 @@ def send(self, msg, options):
self.proto[self.scheme].put(msg, local_file, new_file, offset,
new_offset, msg['size'])
except Exception as ex:
logger.error( f"could not send {local_dir}{os.sep}{local_file} inplace {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
logger.error( f"could not send {local_dir}{os.sep}{local_file} inplace {sendTo} {msg['new_dir']} ... {new_file}: {ex}" )
return 0

elif inflight == '.':
new_inflight_path = '.' + new_file
Expand All @@ -2604,12 +2613,12 @@ def send(self, msg, options):
msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
else:
len_written = msg['size']

Expand All @@ -2624,12 +2633,12 @@ def send(self, msg, options):
len_written = self.proto[self.scheme].put(msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
elif options.inflight[-1] == '/':
if not self.o.dry_run:
try:
Expand All @@ -2649,12 +2658,12 @@ def send(self, msg, options):
msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
else:
len_written = msg['size']
elif inflight == 'umask':
Expand All @@ -2669,12 +2678,12 @@ def send(self, msg, options):
msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
try:
self.proto[self.scheme].put(msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" )
return False
return 0
else:
len_written = msg['size']

Expand All @@ -2693,7 +2702,7 @@ def send(self, msg, options):
(local_path, str_range, new_dir, new_file, offset,
offset + msg['size'] - 1))

return True
return 1

except Exception as err:

Expand Down Expand Up @@ -2724,7 +2733,7 @@ def send(self, msg, options):
msg['new_file'])
logger.debug('Exception details: ', exc_info=True)

return False
return 0

# set_local_file_attributes
def set_local_file_attributes(self, local_file, msg):
Expand Down Expand Up @@ -2867,13 +2876,16 @@ def do_send(self):
if i != 1:
logger.warning("sending again, attempt %d" % i)

ok = self.send(msg, self.o)
if ok:
retval = self.send(msg, self.o)
if retval > 0:
self.worklist.ok.append(msg)
break
elif retval < 0:
self.worklist.rejected.append(msg)
break

i = i + 1
if not ok:
if retval == 0:
self.worklist.failed.append(msg)
self.worklist.incoming = []

Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ def post(self,worklist) -> None::
to indicate failure to process a message, append to worklist.failed.
worklist.failed processing should occur in here as it will be zeroed out after this step.
def send(self,msg) -> bool::
def send(self,msg) -> int::
Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer
of a single file.
return True on a successful transfer, False otherwise.
return >0 on a successful transfer, 0 if failed but temporarily, <0 if failure is permanent.
if self.o.dry_run is set, simulate the output of a send without
performing it.
Expand Down

0 comments on commit 1dea041

Please sign in to comment.