Skip to content

Commit

Permalink
I am happy with this example
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Oct 5, 2023
1 parent 50b82f1 commit 94b89d5
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 5 deletions.
280 changes: 280 additions & 0 deletions examples/cfdp/file-copy-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
#!/usr/bin/env python3
"""This example shows a end-to-end transfer of a small file using the CFDP high level
components provided by the tmtccmd package."""
import copy
import logging
import os
import threading
import time
from logging import basicConfig
from multiprocessing import Queue
from pathlib import Path
from queue import Empty
from typing import Any

from spacepackets.cfdp import GenericPduPacket
from spacepackets.cfdp.defs import ChecksumType, ConditionCode, TransmissionMode
from spacepackets.cfdp.pdu import AbstractFileDirectiveBase
from spacepackets.util import ByteFieldU16

from tmtccmd.cfdp.defs import CfdpStates, TransactionId
from tmtccmd.cfdp.handler.dest import DestHandler
from tmtccmd.cfdp.handler.source import SourceHandler
from tmtccmd.cfdp.mib import (
DefaultFaultHandlerBase,
IndicationCfg,
LocalEntityCfg,
RemoteEntityCfg,
RemoteEntityCfgTable,
)
from tmtccmd.cfdp.request import PutRequest
from tmtccmd.cfdp.user import (
CfdpUserBase,
FileSegmentRecvdParams,
MetadataRecvParams,
TransactionFinishedParams,
)
from tmtccmd.util.seqcnt import SeqCountProvider

SOURCE_ENTITY_ID = ByteFieldU16(1)
DEST_ENTITY_ID = ByteFieldU16(2)

FILE_CONTENT = "Hello World!\n"
FILE_SEGMENT_SIZE = len(FILE_CONTENT)
SOURCE_FILE = Path("/tmp/cfdp-test-source.txt")
DEST_FILE = Path("/tmp/cfdp-test-dest.txt")


_LOGGER = logging.getLogger()


REMOTE_CFG_FOR_SOURCE_ENTITY = RemoteEntityCfg(
entity_id=SOURCE_ENTITY_ID,
max_file_segment_len=FILE_SEGMENT_SIZE,
closure_requested=True,
crc_on_transmission=False,
default_transmission_mode=TransmissionMode.UNACKNOWLEDGED,
crc_type=ChecksumType.CRC_32,
check_limit=None,
)
REMOTE_CFG_FOR_DEST_ENTITY = copy.copy(REMOTE_CFG_FOR_SOURCE_ENTITY)
REMOTE_CFG_FOR_DEST_ENTITY.entity_id = DEST_ENTITY_ID

# These queues will be used to exchange PDUs between threads.
SOURCE_TO_DEST_QUEUE = Queue()
DEST_TO_SOURCE_QUEUE = Queue()


class CfdpFaultHandler(DefaultFaultHandlerBase):
def notice_of_suspension_cb(self, cond: ConditionCode):
_LOGGER.warn(f"Received Notice of Suspension with condition code {cond!r}")

def notice_of_cancellation_cb(self, cond: ConditionCode):
_LOGGER.warn(f"Received Notice of Cancellation with condition code {cond!r}")

def abandoned_cb(self, cond: ConditionCode):
_LOGGER.warn(f"Received Abandoned Fault with condition code {cond!r}")

def ignore_cb(self, cond: ConditionCode):
_LOGGER.warn(f"Received Ignored Fault with condition code {cond!r}")


class CfdpUser(CfdpUserBase):
def __init__(self, base_str: str):
self.base_str = base_str
super().__init__()

def transaction_indication(self, transaction_id: TransactionId):
"""This indication is used to report the transaction ID to the CFDP user"""
_LOGGER.info(f"{self.base_str}: Transaction.indication for {transaction_id}")

def eof_sent_indication(self, transaction_id: TransactionId):
_LOGGER.info(f"{self.base_str}: EOF-Sent.indication for {transaction_id}")

def transaction_finished_indication(self, params: TransactionFinishedParams):
_LOGGER.info(
f"{self.base_str}: Transaction-Finished.indication for {params.transaction_id}."
)

def metadata_recv_indication(self, params: MetadataRecvParams):
_LOGGER.info(
f"{self.base_str}: Metadata-Recv.indication for {params.transaction_id}."
)

def file_segment_recv_indication(self, params: FileSegmentRecvdParams):
_LOGGER.info(
f"{self.base_str}: File-Segment-Recv.indication for {params.transaction_id}."
)

def report_indication(self, transaction_id: TransactionId, status_report: Any):
# TODO: p.28 of the CFDP standard specifies what information the status report parameter
# could contain. I think it would be better to not hardcode the type of the status
# report here, but something like Union[any, CfdpStatusReport] with CfdpStatusReport
# being an implementation which supports all three information suggestions would be
# nice
pass

def suspended_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode
):
_LOGGER.info(
f"{self.base_str}: Suspended.indication for {transaction_id} | Condition Code: {cond_code}"
)

def resumed_indication(self, transaction_id: TransactionId, progress: int):
_LOGGER.info(
f"{self.base_str}: Resumed.indication for {transaction_id} | Progress: {progress} bytes"
)

def fault_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
_LOGGER.info(
f"{self.base_str}: Fault.indication for {transaction_id} | Condition Code: {cond_code} | "
f"Progress: {progress} bytes"
)

def abandoned_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
_LOGGER.info(
f"{self.base_str}: Abandoned.indication for {transaction_id} | Condition Code: {cond_code} |"
f" Progress: {progress} bytes"
)

def eof_recv_indication(self, transaction_id: TransactionId):
_LOGGER.info(f"{self.base_str}: EOF-Recv.indication for {transaction_id}")


def main():
basicConfig(level=logging.INFO)
if SOURCE_FILE.exists():
os.remove(SOURCE_FILE)
if DEST_FILE.exists():
os.remove(DEST_FILE)
with open(SOURCE_FILE, "w") as file:
file.write(FILE_CONTENT)
# Enable all indications.
src_indication_cfg = IndicationCfg()
src_fault_handler = CfdpFaultHandler()
src_entity_cfg = LocalEntityCfg(
SOURCE_ENTITY_ID, src_indication_cfg, src_fault_handler
)
# 16 bit sequence count for transactions.
src_seq_count_provider = SeqCountProvider(16)
src_user = CfdpUser("SRC ENTITY")
source_handler = SourceHandler(src_entity_cfg, src_seq_count_provider, src_user)
# Spawn a new thread and move the source handler there. This is scalable: If multiple number
# of concurrent file operations are required, a new thread with a new source handler can
# be spawned for each one.
source_thread = threading.Thread(
target=source_entity_handler, args=[source_handler]
)

# Enable all indications.
dest_indication_cfg = IndicationCfg()
dest_fault_handler = CfdpFaultHandler()
dest_entity_cfg = LocalEntityCfg(
DEST_ENTITY_ID, dest_indication_cfg, dest_fault_handler
)
dest_user = CfdpUser("DEST ENTITY")
remote_cfg_table = RemoteEntityCfgTable()
remote_cfg_table.add_config(REMOTE_CFG_FOR_SOURCE_ENTITY)
dest_handler = DestHandler(dest_entity_cfg, dest_user, remote_cfg_table)
# Spawn a new thread and move the destination handler there. This is scalable. One example
# approach could be to keep a dictionary of active file copy operations, where the transaction
# ID is the key. If a new Metadata PDU with a new transaction ID is detected, a new
# destination handler in a new thread could be spawned to handle the file copy operation.
dest_thread = threading.Thread(target=dest_entity_handler, args=[dest_handler])

source_thread.start()
dest_thread.start()
source_thread.join()
dest_thread.join()

src_file_content = None
with open(SOURCE_FILE) as file:
src_file_content = file.read()
dest_file_content = None
with open(DEST_FILE) as file:
dest_file_content = file.read()
assert src_file_content == dest_file_content
_LOGGER.info("Source and destination file content are equal. Deleting files.")
if SOURCE_FILE.exists():
os.remove(SOURCE_FILE)
if DEST_FILE.exists():
os.remove(DEST_FILE)
_LOGGER.info("Done.")


def source_entity_handler(source_handler: SourceHandler):
# This put request could in principle also be sent from something like a front end application.
put_request = PutRequest(
destination_id=DEST_ENTITY_ID,
source_file=SOURCE_FILE,
dest_file=DEST_FILE,
trans_mode=TransmissionMode.UNACKNOWLEDGED,
closure_requested=True,
)
no_packet_received = False
print(f"SRC HANDLER: Inserting Put Request: {put_request}")
with open(SOURCE_FILE) as file:
file_content = file.read()
print(f"File content of source file {SOURCE_FILE}: {file_content}")
assert source_handler.put_request(put_request, REMOTE_CFG_FOR_DEST_ENTITY)
while True:
try:
# We are getting the packets from a Queue here, they could for example also be polled
# from a network.
packet: AbstractFileDirectiveBase = DEST_TO_SOURCE_QUEUE.get(False)
source_handler.insert_packet(packet)
no_packet_received = False
except Empty:
no_packet_received = True
fsm_result = source_handler.state_machine()
if fsm_result.states.packet_ready:
SOURCE_TO_DEST_QUEUE.put(fsm_result.pdu_holder.pdu)
source_handler.confirm_packet_sent_advance_fsm()
no_packet_sent = False
else:
no_packet_sent = True
if no_packet_received and no_packet_sent:
time.sleep(0.5)
# Transaction done
if fsm_result.states.state == CfdpStates.IDLE:
_LOGGER.info("Source entity operation done.")
break


def dest_entity_handler(dest_handler: DestHandler):
first_packet = True
no_packet_received = False
while True:
try:
packet: GenericPduPacket = SOURCE_TO_DEST_QUEUE.get(False)
dest_handler.insert_packet(packet)
no_packet_received = False
if first_packet:
first_packet = False
except Empty:
no_packet_received = True
fsm_result = dest_handler.state_machine()
if fsm_result.states.packet_ready:
DEST_TO_SOURCE_QUEUE.put(fsm_result.pdu_holder.pdu)
dest_handler.confirm_packet_sent_advance_fsm()
no_packet_sent = False
else:
no_packet_sent = True
if no_packet_received and no_packet_sent:
time.sleep(0.5)
# Transaction done
if not first_packet and fsm_result.states.state == CfdpStates.IDLE:
_LOGGER.info("Destination entity operation done.")
break
with open(DEST_FILE) as file:
file_content = file.read()
print(f"File content of destination file {DEST_FILE}: {file_content}")


if __name__ == "__main__":
main()
8 changes: 7 additions & 1 deletion tmtccmd/cfdp/handler/dest.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,19 @@ def __init__(

"""This is the primary call to run the state machine after packet insertion and/or after
having sent any packets which need to be sent to the sender of a file transaction."""

def state_machine(self) -> FsmResult:
if self.states.state == CfdpStates.IDLE:
self.__idle_fsm()
if self.states.state == CfdpStates.BUSY_CLASS_1_NACKED:
self.__busy_naked_fsm()
return FsmResult(self.states, self.pdu_holder)

def closure_requested(self) -> bool:
"""Returns whether a closure was requested for the current transaction. Please note that
this variable is only valid as long as the state is not IDLE"""
return self._params.closure_requested

def finish(self):
self._params.reset()
# Not fully sure this is the best approach, but I think this is ok for now
Expand Down Expand Up @@ -282,7 +288,7 @@ def __transaction_start_metadata_pdu_to_params(self, metadata_pdu: MetadataPdu):
elif metadata_pdu.pdu_header.trans_mode == TransmissionMode.ACKNOWLEDGED:
self.states.state = CfdpStates.BUSY_CLASS_2_ACKED
self._crc_helper.checksum_type = metadata_pdu.checksum_type
self._closure_requested = metadata_pdu.closure_requested
self._params.closure_requested = metadata_pdu.closure_requested
if metadata_pdu.dest_file_name is None:
self._params.fp.no_file_data = True
else:
Expand Down
2 changes: 1 addition & 1 deletion tmtccmd/cfdp/handler/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ def advance_fsm(self):
"""
if self.states.packet_ready:
raise PacketSendNotConfirmed(
f"Must send current packet {self.pdu_holder.base} before "
f"Must send current packet {self.pdu_holder.pdu} before "
"advancing state machine"
)
if self.states.state == CfdpStates.BUSY_CLASS_1_NACKED:
Expand Down
6 changes: 3 additions & 3 deletions tmtccmd/cfdp/user.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from abc import abstractmethod, ABC
from dataclasses import dataclass
from typing import List, Optional
from typing import List, Optional, Any

from spacepackets.cfdp import ConditionCode, FileStoreResponseTlv, MessageToUserTlv
from spacepackets.cfdp.pdu.file_data import RecordContinuationState
Expand Down Expand Up @@ -30,7 +30,7 @@ class TransactionFinishedParams:
file_status: FileDeliveryStatus
delivery_code: DeliveryCode
fs_responses: Optional[List[FileStoreResponseTlv]] = None
status_report: Optional[any] = None
status_report: Optional[Any] = None


@dataclass
Expand Down Expand Up @@ -93,7 +93,7 @@ def file_segment_recv_indication(self, params: FileSegmentRecvdParams):
print(params)

@abstractmethod
def report_indication(self, transaction_id: TransactionId, status_report: any):
def report_indication(self, transaction_id: TransactionId, status_report: Any):
# TODO: p.28 of the CFDP standard specifies what information the status report parameter
# could contain. I think it would be better to not hardcode the type of the status
# report here, but something like Union[any, CfdpStatusReport] with CfdpStatusReport
Expand Down

0 comments on commit 94b89d5

Please sign in to comment.