diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index f54e5baab..4ac62fccf 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -2294,7 +2294,15 @@ def download(self, msg, options) -> int: return 1 # generalized send... - def send(self, msg, options): + def send(self, msg, options) -> int: + """ + send/transfer one file based on message, return int: + + return 0 -- failed temporarily, retry later. + return >0 -- OK download successful. + return <0 -- download failed permanently, retry not useful. + """ + self.o = options sendTo=self.o.sendTo logger.debug( f"{self.scheme}_transport sendTo: {sendTo}" ) @@ -2306,14 +2314,16 @@ def send(self, msg, options): for plugin in self.plugins['send']: try: ok = plugin(msg) - if type(ok) is not bool: - logger.error( f"{plugin} returned {type(ok)}. Should return boolean" ) + if type(ok) not in [ bool, int ]: + logger.error( f"{plugin} returned {type(ok)}. Must return boolean or integer, discarding." ) + return -1 except Exception as ex: logger.error( f'flowCallback plugin {plugin} crashed: {ex}' ) logger.debug( "details:", exc_info=True ) - if not ok: return False - return True + if type(ok) is bool and not ok: return 0 + elif type(ok) is int: return ok + return 1 if self.o.baseDir: local_path = self.o.variableExpansion(self.o.baseDir, @@ -2342,9 +2352,8 @@ def send(self, msg, options): try: os.chdir(local_dir) except Exception as ex: - logger.error("could not chdir to %s to write: %s" % (local_dir, ex)) - return False - + logger.error("could not chdir locally to %s: %s" % (local_dir, ex)) + return -1 try: if not self.o.dry_run: @@ -2361,7 +2370,7 @@ def send(self, msg, options): self.proto[self.scheme] = sarracenia.transfer.Transfer.factory( self.scheme, options) ok = self.proto[self.scheme].connect() - if not ok: return False + if not ok: return 0 self.cdir = None self.metrics['flow']['transferConnected'] = True self.metrics['flow']['transferConnectStart'] = time.time() @@ -2382,7 +2391,7 @@ def send(self, msg, options): msg['blocks']['method'] == 'inplace'): logger.error("%s, inplace part file not supported" % self.scheme) - return False + return -1 #================================= # if umask, check that the protocol supports it ... @@ -2414,7 +2423,7 @@ def send(self, msg, options): cwd = self.proto[self.scheme].getcwd() except Exception as ex: logger.error( f"could not getcwd on {sendTo} : {ex}" ) - return False + return 0 if cwd != new_dir: logger.debug("%s_transport send cd to %s" % @@ -2424,7 +2433,7 @@ def send(self, msg, options): self.proto[self.scheme].cd_forced(new_dir) except Exception as ex: logger.error( f"could not chdir to {sendTo} {new_dir}: {ex}" ) - return False + return -1 #================================= # delete event @@ -2446,14 +2455,14 @@ def send(self, msg, options): self.proto[self.scheme].delete(new_file) except Exception as ex: logger.error( f"could not delete {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return -1 msg.setReport(201, f'file or directory removed') self.metrics['flow']['transferTxFiles'] += 1 self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted'] - return True + return 1 logger.error("%s, delete not supported" % self.scheme) - return False + return -1 if 'rename' in msg['fileOp'] : if hasattr(self.proto[self.scheme], 'delete'): @@ -2463,14 +2472,14 @@ def send(self, msg, options): self.proto[self.scheme].rename(msg['fileOp']['rename'], new_file) except Exception as ex: logger.error( f"could not rename {sendTo} (in {msg['new_dir']} {msg['fileOp']['rename']} to {new_file}: {ex}" ) - return False + return -1 msg.setReport(201, f'file renamed') self.metrics['flow']['transferTxFiles'] += 1 self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted'] - return True + return 1 logger.error("%s, delete not supported" % self.scheme) - return False + return -1 if 'directory' in msg['fileOp'] : if 'contentType' not in msg: @@ -2482,13 +2491,13 @@ def send(self, msg, options): self.proto[self.scheme].mkdir(new_file) except Exception as ex: logger.error( f"could not mkdir {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 msg.setReport(201, f'directory created') self.metrics['flow']['transferTxFiles'] += 1 self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted'] - return True + return 1 logger.error("%s, mkdir not supported" % self.scheme) - return False + return -1 #================================= @@ -2505,10 +2514,10 @@ def send(self, msg, options): self.proto[self.scheme].link(msg['fileOp']['hlink'], new_file) except Exception as ex: logger.error( f"could not link {sendTo} in {msg['new_dir']}{os.sep}{msg['fileOp']['hlink']} to {new_file}: {ex}" ) - return False - return True + return 0 + return 1 logger.error("%s, hardlinks not supported" % self.scheme) - return False + return -1 elif 'link' in msg['fileOp']: if 'contentType' not in msg: msg['contentType'] = 'text/link' @@ -2519,13 +2528,13 @@ def send(self, msg, options): self.proto[self.scheme].symlink(msg['fileOp']['link'], new_file) except Exception as ex: logger.error( f"could not symlink {sendTo} in {msg['new_dir']} {msg['fileOp']['link']} to {new_file}: {ex}" ) - return False + return 0 msg.setReport(201, f'file linked') self.metrics['flow']['transferTxFiles'] += 1 self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted'] - return True + return 1 logger.error("%s, symlink not supported" % self.scheme) - return False + return -1 #================================= # send event @@ -2537,7 +2546,7 @@ def send(self, msg, options): "product collision or base_dir not set, file %s does not exist" % local_file) time.sleep(0.01) - return False + return 0 elif 'size' not in msg: msg['size'] = os.path.getsize(local_file) @@ -2580,7 +2589,7 @@ def send(self, msg, options): len_written = self.proto[self.scheme].put( msg, local_file, new_file) except Exception as ex: logger.error( f"could not send {local_dir}{os.sep}{local_file} to inflight=None {sendTo} {msg['new_dir']} ... {new_file}: {ex}" ) - return False + return 0 elif (('blocks' in msg) and (msg['blocks']['method'] == 'inplace')): @@ -2589,8 +2598,8 @@ def send(self, msg, options): self.proto[self.scheme].put(msg, local_file, new_file, offset, new_offset, msg['size']) except Exception as ex: - logger.error( f"could not send {local_dir}{os.sep}{local_file} inplace {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + logger.error( f"could not send {local_dir}{os.sep}{local_file} inplace {sendTo} {msg['new_dir']} ... {new_file}: {ex}" ) + return 0 elif inflight == '.': new_inflight_path = '.' + new_file @@ -2604,12 +2613,12 @@ def send(self, msg, options): msg, local_file, new_inflight_path) except Exception as ex: logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 try: self.proto[self.scheme].rename(new_inflight_path, new_file) except Exception as ex: logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 else: len_written = msg['size'] @@ -2624,12 +2633,12 @@ def send(self, msg, options): len_written = self.proto[self.scheme].put(msg, local_file, new_inflight_path) except Exception as ex: logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 try: self.proto[self.scheme].rename(new_inflight_path, new_file) except Exception as ex: logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 elif options.inflight[-1] == '/': if not self.o.dry_run: try: @@ -2649,12 +2658,12 @@ def send(self, msg, options): msg, local_file, new_inflight_path) except Exception as ex: logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 try: self.proto[self.scheme].rename(new_inflight_path, new_file) except Exception as ex: logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 else: len_written = msg['size'] elif inflight == 'umask': @@ -2669,12 +2678,12 @@ def send(self, msg, options): msg, local_file, new_file) except Exception as ex: logger.error( f"could not send {local_dir}{os.sep}{local_file} inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 try: self.proto[self.scheme].put(msg, local_file, new_file) except Exception as ex: logger.error( f"could not rename inflight={inflight} {sendTo} {msg['new_dir']}/{new_file}: {ex}" ) - return False + return 0 else: len_written = msg['size'] @@ -2693,7 +2702,7 @@ def send(self, msg, options): (local_path, str_range, new_dir, new_file, offset, offset + msg['size'] - 1)) - return True + return 1 except Exception as err: @@ -2724,7 +2733,7 @@ def send(self, msg, options): msg['new_file']) logger.debug('Exception details: ', exc_info=True) - return False + return 0 # set_local_file_attributes def set_local_file_attributes(self, local_file, msg): @@ -2867,13 +2876,16 @@ def do_send(self): if i != 1: logger.warning("sending again, attempt %d" % i) - ok = self.send(msg, self.o) - if ok: + retval = self.send(msg, self.o) + if retval > 0: self.worklist.ok.append(msg) break + elif retval < 0: + self.worklist.rejected.append(msg) + break i = i + 1 - if not ok: + if retval == 0: self.worklist.failed.append(msg) self.worklist.incoming = [] diff --git a/sarracenia/flowcb/__init__.py b/sarracenia/flowcb/__init__.py index e6c2f88c8..825114ad7 100755 --- a/sarracenia/flowcb/__init__.py +++ b/sarracenia/flowcb/__init__.py @@ -180,11 +180,11 @@ def post(self,worklist) -> None:: to indicate failure to process a message, append to worklist.failed. worklist.failed processing should occur in here as it will be zeroed out after this step. - def send(self,msg) -> bool:: + def send(self,msg) -> int:: Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer of a single file. - return True on a successful transfer, False otherwise. + return >0 on a successful transfer, 0 if failed but temporarily, <0 if failure is permanent. if self.o.dry_run is set, simulate the output of a send without performing it.