diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 8e6fbc23b..3e64db865 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -1,4 +1,4 @@ -name: sr_insects Flow Tests. +name: Flow Tests / AMQP on: pull_request: @@ -36,7 +36,7 @@ jobs: runs-on: ${{ matrix.osver }} - name: ${{ matrix.which_test }} test on ${{ matrix.osver }} + name: ${{ matrix.which_test }} on ${{ matrix.osver }} timeout-minutes: 40 steps: diff --git a/.github/workflows/flow_amqp_consumer.yml b/.github/workflows/flow_amqp_consumer.yml index a813b4a5a..a5d861bd5 100644 --- a/.github/workflows/flow_amqp_consumer.yml +++ b/.github/workflows/flow_amqp_consumer.yml @@ -1,11 +1,11 @@ -name: sr_insects Flow Tests using AMQP Consumer. +name: Flow Tests / AMQP Consumer on: pull_request: types: [opened, edited, reopened] push: branches: - - issue_457_amqp_consumer + - development paths-ignore: - '.github/**' @@ -33,7 +33,7 @@ jobs: runs-on: ${{ matrix.osver }} - name: ${{ matrix.which_test }} test on ${{ matrix.osver }} + name: ${{ matrix.which_test }} on ${{ matrix.osver }} timeout-minutes: 40 steps: @@ -48,7 +48,7 @@ jobs: sudo sh -c 'echo "MaxStartups 750" >> /etc/ssh/sshd_config' sudo systemctl restart ssh echo "amqp_consumer True" >> ${HOME}/.config/sr3/default.conf - echo "set sarracenia.moth.amqpconsumer.AMQPConsumer.logLevel debug" >> ${HOME}/.config/sr3/default.conf + #echo "set sarracenia.moth.amqpconsumer.AMQPConsumer.logLevel debug" >> ${HOME}/.config/sr3/default.conf - name: Setup ${{ matrix.which_test }} test. run: | diff --git a/.github/workflows/flow_mqtt.yml b/.github/workflows/flow_mqtt.yml index 097bd5694..e5f8d5921 100644 --- a/.github/workflows/flow_mqtt.yml +++ b/.github/workflows/flow_mqtt.yml @@ -1,4 +1,4 @@ -name: Sr3 sr_insects Flow Tests using MQTT. +name: Flow Tests / MQTT on: pull_request: @@ -33,7 +33,7 @@ jobs: runs-on: ${{ matrix.osver }} - name: ${{ matrix.which_test }} test on ${{ matrix.osver }} + name: ${{ matrix.which_test }} on ${{ matrix.osver }} timeout-minutes: 45 steps: diff --git a/.github/workflows/flow_redis.yml b/.github/workflows/flow_redis.yml index 242bfbad6..5502e0cda 100644 --- a/.github/workflows/flow_redis.yml +++ b/.github/workflows/flow_redis.yml @@ -1,4 +1,4 @@ -name: Flow Tests (with Redis). +name: Flow Tests / Redis on: pull_request: @@ -6,6 +6,7 @@ on: push: branches: - v03_disabled + paths-ignore: - '.github/**' - 'debian/changelog' diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index dd05730de..29057e35e 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -2,27 +2,30 @@ name: Unit Testing on: pull_request: - types: [opened, edited, reopened] - push: - paths-ignore: - - '.github/**' + types: [opened, edited, reopened, closed, synchronize] + paths: + - '**.py' + branches: + - 'development' + # push: + # paths-ignore: + # - '.github/**' + workflow_dispatch: +permissions: + contents: write + checks: write + pull-requests: write jobs: build: + runs-on: ubuntu-22.04 - strategy: - fail-fast: false - matrix: - osver: [ "ubuntu-20.04", "ubuntu-22.04" ] - - runs-on: ${{ matrix.osver }} - - name: Unit test on ${{ matrix.osver }} + name: Unit test timeout-minutes: 10 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install dependencies run: | @@ -38,42 +41,72 @@ jobs: --cov-config=tests/.coveragerc --cov=sarracenia --cov-report=html --cov-report=lcov --cov-report=xml \ --html=tests/report.html --self-contained-html + - name: Publish Test Results + uses: EnricoMi/publish-unit-test-result-action/linux@v2 + if: ${{ always() }} + with: + files: | + tests/junit/test-results.xml + + # - name: Publish Coverage Results + # uses: orgoro/coverage@v3.1 + # with: + # coverageFile: tests/coverage/coverage.xml + + # - name: Pytest coverage comment + # uses: MishaKav/pytest-coverage-comment@main + # if: ${{ always() }} + # with: + # #pytest-coverage-path: tests/coverage/coverage.lcov + # pytest-xml-coverage-path: tests/coverage/coverage.xml + # #title: My Coverage Report Title + # #badge-title: My Badge Coverage Title + # #hide-badge: false + # #hide-report: false + # create-new-comment: false + # hide-comment: false + # report-only-changed-files: true + # remove-link-from-badge: false + # #unique-id-for-comment: python3.8 + # junitxml-path: tests/junit/test-results.xml + # junitxml-title: Test Results + - name: Upload pytest junit results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: results-junit-${{ matrix.osver }} + name: results-junit path: tests/junit/test-results.xml # Use always() to always run this step to publish test results when there are test failures if: ${{ always() }} - name: Upload pytest HTML report - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: results-report-${{ matrix.osver }} + name: results-report path: tests/report.html # Use always() to always run this step to publish test results when there are test failures if: ${{ always() }} - name: Upload code coverage report (HTML) - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: coverage-report-${{ matrix.osver }} + name: coverage-report path: tests/coverage/html_report # Use always() to always run this step to publish test results when there are test failures if: ${{ always() }} - name: Upload code coverage report (LCOV) - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: coverage-lcov-${{ matrix.osver }} + name: coverage-lcov path: tests/coverage/coverage.lcov # Use always() to always run this step to publish test results when there are test failures if: ${{ always() }} - name: Upload code coverage report (XML) - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: coverage-xml-${{ matrix.osver }} + name: coverage-xml path: tests/coverage/coverage.xml # Use always() to always run this step to publish test results when there are test failures if: ${{ always() }} diff --git a/debian/changelog b/debian/changelog index e3654188c..5915e40fe 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,49 @@ +metpx-sr3 (3.00.53) unstable; urgency=medium + + * NEW! merge PR #1019 Azure Transfer protocol driver (experimental) + * NEW! merge PR #1030 WMO-00 Accumulated file support (experimental.) + * NEW! PR #1040 #1041 exchangeSplitOverride + documentation of overrides. + * PR #1038 prefer local plugins over app provided ones. + * PR #1037 clearer, more succinct error messages for sender. + * api improvement Message.updatePaths() with no paths has defaults. + * api improvement part of above, Message.getContent() now works for local files. + * api improvement Message.updatePaths() now makes reasonable guesses about missing arguments. + * improved support for WMO WIS2 messages. + * fix #980 additional header verification of receipt of AM messages. + * fix #908 AM server must reap dead children. + * fix #1027 Eumetsat Poll plugin improvements. + * fix #1023,#1025 topic processing improvements, topicCopy option added. + * fix #1022 various fixes to work better with DMS. + * fix #1020 Added sourceFromMessage option for v2 compatibility. + * fix #1012 *persist* is now a configurable option (was always on.) + * fix #947 sr3 status should ignore files if not .inc or .conf + * NEW! merge PR #1010 S3 Transfer protocol driver (experimental) + * fix #1004 make persistence an option (was always persistent until now.) + * fix #1002 sftp accellerator failure when colons in the destination name. + * fix #997 flowcb/gather/am refactor to fix many issues. + * fix #994 resolved issues with unit tests. + * fix #992 parsing of module declared options to ensure correct type. + * fix #988 flowcb/poll/NASA_CMR podack checksum calculation changed. + * fix #986 poll stop re-writing "directory" as path for poll. + * fix #984 refactored flow/run routine 288lines -> 159. more + logical/readable. + * fix #983 add S3 sender plugin. + * fix #982 no cleanup when statehost set. + * fix #979, #980 AM fixes. + * fix #978 housekeeping log messages reduuced 16 lines less than 5. + * fix #976 remove "downloaded" log message when nothing was downloaded. + * fix #971 sr3 overview crash. + * fix #967 define hostname as a string option (was defaulting to list.) + * fix #974 refactor poll to consume messages more frequently in wVip. + * #974 poll implement 5 minute default interval + * fix #973 AM ... add missing tokIsYear better AM message filtering. + * fix #971 Add missing to list of possible process status + * docs: Add topicPrefix for #961 + * docs: some revisions for clarity/syntax. + * some fixes for misnaming of some (rare) files received by gather/am + + -- Peter Silva Thu, 17 May 2024 09:31:04 -0400 + metpx-sr3 (3.00.52) unstable; urgency=medium * copernicus marine data store polling support ( #959 ) diff --git a/docs/source/Contribution/Development.rst b/docs/source/Contribution/Development.rst index 66f3cfda8..b1476fcee 100644 --- a/docs/source/Contribution/Development.rst +++ b/docs/source/Contribution/Development.rst @@ -117,18 +117,22 @@ To run the sr_insects tests, the repository must be cloned with the development A gate for merging to development is for a second developer to run the flow_tests. **For v03, these tests must run: static_flow, flakey_broker, dynamic_flow, transform_flow** -Planned by 2022/04/11: - * stable will be merged from (version 3) development, so the default version for new work is sr3. - * launchpad has recipes to produce metpx-sr3 packages from the stable branch. + * launchpad has recipes to produce metpx-sr3 packages from various branches. * The *MetPX Daily* repository is a snapshot of the development branch. * The *MetPX Pre-Release* repository should receive versions ending in rcX (release candidate) + The packages here from from pre-release branch which comes from snapshots of the development branch. + There is also a pre-release-py36 branch for building pre-release packages for older operating systems. - * The *MetPX* repository should only contain stable releases that have graduated from the rcX series. + * stable comes from on snapshots of (version 3) pre-release branch. + * The *MetPX* repository should only contain stable releases that have graduated from the rcX series. + there is a stable_py36 branch to build packages for older operating systems that have + python 3.6 (redhat 8, ubuntu 18, ubuntu 20) or are too old to use hatchling installer. + sr_insects ~~~~~~~~~~ @@ -342,17 +346,20 @@ The options are detailed below: Committing Code ~~~~~~~~~~~~~~~ -What should be done prior to committing to the stable branch? +What should be done prior to committing to the development branch? Checklist: - do development on some other branch. Usually the branch will be named after the issue being addressed. Example: issue240, if we give up on an initial approach and start another one, there may be issue240_2 for a second attempt. There may also be feature branches, such as v03. -- **sr_insects tests works** (See Testing) The stable branch should always be functional, do not commit code if the sr_insects tests are not working. + +- **sr_insects tests works** (See Testing) The development branch should always be functional, do not commit code if the sr_insects tests are not working. + - Natural consequence: if the code changes means tests need to change, include the test change in the commit. + - **update doc/** manual pages should get their updates ideally at the same time as the code. -Usually there will be many such cycles on a development branch before one is ready +Usually there will be many such cycles on an issueXXX branch before one is ready to issue a pull request. Eventually, we get to `Commits to the Development Branch`_ @@ -566,7 +573,7 @@ Running Flow Tests This section documents these steps in much more detail. Before one can run the sr_insects tests, some pre-requisites must be taken care of. -Note that there is Github Actions integration for at least the stable branch +Note that there is Github Actions integration for at least the development branch to verify functionality on a variety of python version. Consult:: https://github.com/MetPX/sarracenia/actions @@ -829,7 +836,7 @@ Then check show it went with flow_check.sh:: If the flow_check.sh passes, then one has a reasonable confidence in the overall functionality of the python application, but the test coverage is not exhaustive. This is the lowest gate for committing -changes to thy python code into the stable branch. It is more qualitative sampling of the most +changes to thy python code into the development branch. It is more qualitative sampling of the most common use cases rather than a thorough examination of all functionality. While not thorough, it is good to know the flows are working. @@ -1215,6 +1222,8 @@ A Second developer will review the pull request and the reviewer will decide on merging is appropriate. The developer is expected to examine each commit, and understand it to some degree. +If the pull-request has one of the following (substantial changes, new functionality, modifications to critical code structure) , it is recommended to have a Third developer also review the pull request. The expectation from this developer are the same as from the previous. + The github Actions looks at pull requests and will flow tests on them. If the tests pass, then that is good qualitative indicator, however the tests are a bit fragile at the moment, so if they fail, it would be ideal for the reviewer to run @@ -1228,13 +1237,16 @@ Key Branches There is a long running discussion about `Which Version is stable `_ The current set up is that there are four principal branches: -* stable branch is the release version of sr3, merging from development. used to build sr3 packages in the +* stable branch is the release version of sr3, merging from pre-release. used to build sr3 packages in the `MetPX `_ repository. -* development ... The `version 3 `_ work in progress branch is a next version of sarracenia in development. +* development ... The `version 3 `_ work in progress branch is a next version of sarracenia. the development branch is used to build sr3 packages for the `Daily `_ and `Pre-Release `_ repositories on launchpad.net. +* stable_py36 and pre-relrease-36 are branched from stable and pre_release respectively to adjust for building + packages on older operating systems that have older versions of python (and no support for hatchling.) + * issue branches to be merged to development, it should be start with issueXXX or suggested branch names from github are ok also. * sometimes, multiple branches are needed for a single issue, say for variations of a fix, eg. issueXXX_2_do_it_this_way . diff --git a/docs/source/Explanation/CommandLineGuide.rst b/docs/source/Explanation/CommandLineGuide.rst index 9133b1ca5..6ad09967a 100644 --- a/docs/source/Explanation/CommandLineGuide.rst +++ b/docs/source/Explanation/CommandLineGuide.rst @@ -934,11 +934,23 @@ POLLING Polling is doing the same job as a post, except for files on a remote server. In the case of a poll, the post will have its url built from the *pollUrl* -option, with the product's path (*directory*/"matched file"). There is one -post per file. The file's size is taken from the directory "ls"... but its +option, with the product's path (*path*/"matched file"). There is one +post per file. The file's size is taken from the directory "ls"... but its checksum cannot be determined, so the default identity method is "cod", asking clients to calculate the identity Checksum On Download. +To set when to poll, use the *scheduled_interval* or *scheduled_hour* and *scheduled_minute* +settings. for example:: + + scheduled_interval 30m + +to poll the remote resources every thirty minutes. Alternatively:: + + scheduled_hour 1,13,19 + scheduled_minute 27 + +specifies that poll be run at 1:27, 13:27, and 19:27 each day. + By default, sr_poll sends its post notification message to the broker with default exchange (the prefix *xs_* followed by the broker username). The *post_broker* is mandatory. It can be given incomplete if it is well defined in the credentials.conf file. @@ -946,11 +958,10 @@ It can be given incomplete if it is well defined in the credentials.conf file. Refer to `sr3_post(1) <../Reference/sr3_post.1.html>`_ - to understand the complete notification process. Refer to `sr_post(7) <../Reference/sr_post.7.html>`_ - to understand the complete notification format. - These options set what files the user wants to be notified for and where it will be placed, and under which name. -- **directory (default: .)** +- **path (default: .)** - **accept [rename=] (must be set)** - **reject (optional)** - **permDefault (default: 0o400)** @@ -959,15 +970,7 @@ These options set what files the user wants to be notified for and where nodupe_fileAgeMax should be less than nodupe_ttl when using duplicate suppression, to avoid re-ingesting of files that have aged out of the nodupe cache. -The option *filename* can be used to set a global rename to the products. -Ex.: - -**filename rename=/naefs/grib2/** - -For all posts created, the *rename* option would be set to '/naefs/grib2/filename' -because I specified a directory (path that ends with /). - -The option *directory* defines where to get the files on the server. +The option *path* defines where to get the files on the server. Combined with **accept** / **reject** options, the user can select the files of interest and their directories of residence. @@ -975,12 +978,8 @@ The **accept** and **reject** options use regular expressions (regexp) to ma These options are processed sequentially. The URL of a file that matches a **reject** pattern is not published. Files matching an **accept** pattern are published. -Again a *rename* can be added to the *accept* option... matching products -for that *accept* option would get renamed as described... unless the *accept* matches -one file, the *rename* option should describe a directory into which the files -will be placed (prepending instead of replacing the file name). -The directory can have some patterns. These supported patterns concern date/time . +The path can have some patterns. These supported patterns concern date/time . They are fixed... **${YYYY} current year** @@ -995,14 +994,13 @@ They are fixed... :: - ex. directory /mylocaldirectory/myradars - accept .*RADAR.* + ex. path /mylocaldirectory/myradars + path /mylocaldirectory/mygribs + path /mylocaldirectory/${YYYYMMDD}/mydailies - directory /mylocaldirectory/mygribs + accept .*RADAR.* reject .*Reg.* accept .*GRIB.* - - directory /mylocaldirectory/${YYYYMMDD}/mydailies accept .*observations.* The **permDefault** option allows users to specify a linux-style numeric octal @@ -1115,7 +1113,8 @@ notify about the new product. The notification protocol is defined here `sr_post(7) <../Reference/sr_post.7.html>`_ -**poll** connects to a *broker*. Every *sleep* seconds, it connects to +**poll** connects to a *broker*. Every *scheduled_interval* seconds (or can used +combination of *scheduled_hour* and *scheduled_minute*) , it connects to a *pollUrl* (sftp, ftp, ftps). For each of the *directory* defined, it lists the contents. Polling is only intended to be used for recently modified files. The *fileAgeMax* option eliminates files that are too old diff --git a/docs/source/Explanation/Concepts.rst b/docs/source/Explanation/Concepts.rst index 0ba1a295b..78231feb3 100644 --- a/docs/source/Explanation/Concepts.rst +++ b/docs/source/Explanation/Concepts.rst @@ -71,25 +71,27 @@ The components just have different default settings: :align: center +------------------------+--------------------------+------------------------+ - | Component | Use of the algorithm | Config File Equivalent | + | Component | Use of the algorithm | Key Option Settings | +------------------------+--------------------------+------------------------+ +------------------------+--------------------------+------------------------+ | *subscribe* | Gather = gather.message | flowMain subscribe | | | | | - | Download file from a | Filter | | - | pump. | | | + | Download files from a | Filter | mirror off | + | pump. | | (all others it is on) | | | Work = Download | | - | default mirror=False | | | - | All others it is True | Post = optional | | + | | Post = optional | | + | | | | +------------------------+--------------------------+------------------------+ | *sarra* | Gather = gather.message | flowMain sarra | | | | | - | Used on pumps. | | | - | | | | - | Download file from a | Filter | | + | Used on pumps. | | mirror on | + | | | download on | + | Download files from a | Filter | | | pump | | | | | | | - | publish it. | | | + | Publish a message | | | + | downstream for | | | + | consumers | | | | | | | | Subscribers to | Work = Download | | | this pump can | | | @@ -98,55 +100,59 @@ The components just have different default settings: +------------------------+--------------------------+------------------------+ | *poll* | Gather | flowMain poll | | | if has_vip: poll | | + | | | pollUrl | + | | | path | | | | | - | Find files on other | Filter | | - | servers to post to | | | + | Find files on other | Filter | download off | + | servers to post to | | mirror on | | a pump. | if has_vip: | | - | | Work = nil | | - | Uses has_vip* | | | - | (see notes below) | Post = yes | | + | | Work = Download | scheduled_interval | + | Uses has_vip* | | scheduled_hour | + | (see notes below) | Post = yes | schedules_minute | + | | | | +------------------------+--------------------------+------------------------+ | *shovel* | Gather = gather.message | acceptUnmatched True | | | | | | | | nodupe_ttl 0 | - | Move notification | Filter (shovel cache=off)| | - | messages around. | | callback gather.message| + | Move notification | Filter (shovel cache=off)| download off | + | messages around. | | | + | | Work = nil | callback gather.message| | | | | - | | Work = nil | callback post.message | + | | Post = yes | callback post.message | | | | | - | | Post = yes | | +------------------------+--------------------------+------------------------+ | *winnow* | Gather = gather.message | acceptUnmatched True | | | | | | | | nodupe_ttl 300 | - | Move notification | Filter (shovel cache=off)| | + | Move notification | Filter (shovel cache=on) | | | messages around | | callback gather.message| | | | | | | Work = nil | callback post.message | - | suppress duplicates | | | - | | Post = yes | | + | Suppress duplicates | | | + | through caching and | | | + | a shared VIP | Post = yes | | + | | | | +------------------------+--------------------------+------------------------+ - | *post/watch* | Gather = gather.file | | + | *post/watch* | Gather = gather.file | path /file/to/post | | | | | | Find file on a | Filter | sleep -1 # for post | - | | | | - | local server to | | sleep 5 # for watch | - | publish | Work = nil | | + | **local** server to | | | + | publish | | sleep 5 # for watch | + | | Work = nil | | | | | callback gather.file | | | | | | | Post = yes | callback post.message | | | Message?, File? | | + | | | | +------------------------+--------------------------+------------------------+ | *sender* | Gather = gather.message | flowMain sender | | | | | - | Send files from a | Filter | | - | pump. | | | - | | | | - | publish message after | | | + | Send files from a | Filter | sendTo | + | pump somewhere else | | | | | Do = sendfile | | + | Optional: | | | + | Publish message after | | | | | | | - | | Outlet = optional | | +------------------------+--------------------------+------------------------+ In the left hand column, one can see the name and general description of each component. diff --git a/docs/source/Explanation/SarraPluginDev.rst b/docs/source/Explanation/SarraPluginDev.rst index cc7364354..8927b46c5 100644 --- a/docs/source/Explanation/SarraPluginDev.rst +++ b/docs/source/Explanation/SarraPluginDev.rst @@ -585,12 +585,17 @@ for detailed information about call signatures and return values, etc... | | permanent name. | | | | | | return the new name for the downloaded/sent file. | +| | | +---------------------+----------------------------------------------------+ | download(self,msg) | replace built-in downloader return true on success | | | takes message as argument. | +---------------------+----------------------------------------------------+ | gather(self) | gather messages from a source, returns a list of | | | messages. | +| | can also return a tuple where the first element | +| | is a boolean flag keep_going indicating whether | +| | to stop gather processing. | +| | | +---------------------+----------------------------------------------------+ | | Called every housekeeping interval (minutes) | | | used to clean cache, check for occasional issues. | diff --git a/docs/source/How2Guides/Email_Ingesting_With_Sarracenia.rst b/docs/source/How2Guides/Email_Ingesting_With_Sarracenia.rst index ff9c0c0f0..44f823d81 100644 --- a/docs/source/How2Guides/Email_Ingesting_With_Sarracenia.rst +++ b/docs/source/How2Guides/Email_Ingesting_With_Sarracenia.rst @@ -50,7 +50,7 @@ What did we get?:: post_broker amqp://tsource@${FLOWBROKER} post_exchange xs_tsource - sleep 60 + scheduled_interval 60 pollUrl ://@:/ diff --git a/docs/source/How2Guides/FlowCallbacks.rst b/docs/source/How2Guides/FlowCallbacks.rst index 8469f2412..564f69e17 100644 --- a/docs/source/How2Guides/FlowCallbacks.rst +++ b/docs/source/How2Guides/FlowCallbacks.rst @@ -215,7 +215,11 @@ Other entry_points, extracted from sarracenia/flowcb/__init__.py :: def gather(self): - Task: gather notification messages from a source... return a list of notification messages. + Task: gather notification messages from a source... return either: + * a list of notification messages, or + * a tuple, (bool:keep_going, list of messages) + * to curtail further gathers in this cycle. + return [] def metrics_report(self) -> dict: @@ -282,6 +286,92 @@ of the one from the message, as the original is necessary for successful upstrea * msg['new_subtopic'] ... the subtopic hierarchy that will be encoded in the notification message for downstream consumers. +Override Fields +--------------- + +To change processing of messages, one can set overrides to change how built-in algorithms work. +For example: + +* msg['nodupe_override'] = { 'key': ..., 'path': ... } changes how the duplicate detection operates. +* msg['topic'] ... defines the topic of a published message (instead of being calculated from other fields.) +* msg['exchangeSplitOverride'] = int ... changes how post_ExchangeSplit chooses among multiple postExchanges. + + + +Customizing Duplicate Suppression +--------------------------------- + +The built-in processing for duplicates is to use the identity field as a key, and store the path as the value. +So if a file is received with the same key, and the path is already present, then it is considered a duplicate +and dropped. + +In some cases, we may want only the file name to be used, so if any file with the same name is received twice, +regardless of content, then it should be considered a duplicate and dropped. This is useful when multiple systems +are producing the same products, but they are not bitwise identical. The built-in flowcb that implements +that functionality is below:: + + + import logging + from sarracenia.flowcb import FlowCB + + logger = logging.getLogger(__name__) + + + class Name(FlowCB): + """ + Override the the comparison so that files with the same name, + regardless of what directory they are in, are considered the same. + This is useful when receiving data from two different sources (two different trees) + and winnowing between them. + """ + def after_accept(self, worklist): + for m in worklist.incoming: + if not 'nodupe_override' in m: + m['_deleteOnPost'] \|= set(['nodupe_override']) + m['nodupe_override'] = {} + + m['nodupe_override']['path'] = m['relPath'].split('/')[-1] + m['nodupe_override']['key'] = m['relPath'].split('/')[-1] + + +Customizing post_exchangeSplit +------------------------------ + +The exchangeSplit function allows a single flow to send outputs to different exchanges, +numbered 1...n to provide load distribution. The built-in processing does this in a +fixed way based on the hash of the identify field. The purpose of exchangeSplit is to +allow a common set of downstream paths to receive a subset of the total flow, and for +products with similar "routing" to land on the same downstream node. For example, a file +with a given checksum, for winnowing to work, has to land on the same downstream node. + +It could be that, rather than using a checksum, one would prefer to use some other +method to decide which exchange is used:: + + import logging + from sarracenia.flowcb import FlowCB + import hashlib + logger = logging.getLogger(__name__) + + + class Distbydir(FlowCB): + """ + Override the use of the identity field so that products can be grouped by directory in the relPath + This ensures that all products received from the same directory get posted to the same + exchange when post_exchangeSplit is active. + """ + def after_accept(self, worklist): + for m in worklist.incoming: + m['_deleteOnPost'] |= set(['exchangeSplitOverride']) + m['exchangeSplitOverride'] = int(hashlib.md5(m['relPath'].split(os.sep)[-2]).hexdigest()[0]) + + +This routine sets the exchangeSplitOverride field, which needs to be an integer +that will be used to pick which of the n exchanges in the post_exchangeSplit +exchanges defined. This routine calculates a checksum of the directory +containing the file and then converts the first character of that checksum +to an integer. If the directory is the same, the exchange chosen will be the same. + + Sample Flowcb Sub-Class ----------------------- diff --git a/docs/source/How2Guides/Hydro_Examples.rst b/docs/source/How2Guides/Hydro_Examples.rst index 5e398516d..07b580579 100644 --- a/docs/source/How2Guides/Hydro_Examples.rst +++ b/docs/source/How2Guides/Hydro_Examples.rst @@ -40,7 +40,7 @@ station observations and predictions data through a GET RESTful web service, ava and Currents website `_. For example, if you want to access the water temperature data from the last hour in Honolulu, you can navigate to `https://tidesandcurrents.noaa.gov/api/datagetter?range=1&station=1612340&product=water_temperature&units=metric&time_zone=gmt&application=web_services&format=csv`. A new observation gets recorded every six minutes, so if you wanted to advertise solely new data through -Sarracenia, you would configure an sr_poll instance to connect to the API, sleep every hour, and build +Sarracenia, you would configure an sr_poll instance to connect to the API, set a one hour *scheduled_interval* , and build it a GET request to announce every time it woke up (this operates under the potentially misguided assumption that the data source is maintaining their end of the bargain). To download this shiny new file, you would connect an sr_subscribe to the same exchange it got announced on, and it would retrieve the URL, which a *do_download* diff --git a/docs/source/How2Guides/UPGRADING.rst b/docs/source/How2Guides/UPGRADING.rst index 3611a1696..dc93d8283 100644 --- a/docs/source/How2Guides/UPGRADING.rst +++ b/docs/source/How2Guides/UPGRADING.rst @@ -39,6 +39,14 @@ Installation Instructions git --- +3.0.53 +------ + +*CHANGE*: *directory* option in poll will no longer be converted to *path* silently. +Use *path* explicitly instead. It is still converted when upgrading from v2 with +*sr3 convert*, but in v3 configurations, *directory* now acts as it does in all other +components as a download specifier. + 3.0.52 ------ diff --git a/docs/source/How2Guides/subscriber.rst b/docs/source/How2Guides/subscriber.rst index 1b4f444f0..aa1b7f55c 100644 --- a/docs/source/How2Guides/subscriber.rst +++ b/docs/source/How2Guides/subscriber.rst @@ -26,11 +26,10 @@ view the tree with a web browser. For simple immediate needs, one can download data using the browser itself, or a standard tool such as wget. -The usual intent is to automatically download the data -wanted to a directory on a subscriber +The usual intent is to automatically download the dsesired data to a directory on a subscriber machine where other software can process it. Please note: -- The tool is entirely command line driven (there is no GUI) More accurately, it is mostly configuration file driven. +- The tool is entirely command line driven (there is no GUI). More accurately, it is mostly configuration file driven. Most of the *interface* involves using a text editor to modify configuration files. - While written to be compatible with other environments, the focus is on Linux usage. - The tool can be used as either an end-user tool, or a system-wide transfer engine. @@ -47,13 +46,13 @@ A data pump's web server will just expose web accessible folders and the root of the tree is the date, in YYYYMMDD format. These dates do not represent anything about the data other than when it was put into the pumping network, and since Sarracenia -always uses Universal Co-ordinated Time, the dates might not correspond +always uses Universal Co-ordinated Time, the dates might not correspond to the current date/time in the location of the subscriber:: Index of / Name Last modified Size Description - Parent Directory - + Parent Directory - 20151105/ 2015-11-27 06:44 - 20151106/ 2015-11-27 06:44 - 20151107/ 2015-11-27 06:44 - @@ -61,7 +60,7 @@ the current date/time in the location of the subscriber:: 20151109/ 2015-11-27 06:44 - 20151110/ 2015-11-27 06:44 - -A variable number of days are stored on each data pump, for those +A variable number of days are stored on each data pump; for those with an emphasis on real-time reliable delivery, the number of days will be shorter. For other pumps, where long term outages need to be tolerated, more days will be kept. @@ -74,7 +73,7 @@ way to the visible ones:: Index of /20151110 Name Last modified Size Description - Parent Directory - + Parent Directory - UNIDATA-UCAR/ 2015-11-27 06:44 - NOAAPORT/ 2015-11-27 06:44 - MSC-CMC/ 2015-11-27 06:44 - @@ -83,22 +82,22 @@ way to the visible ones:: NWS-OPSNET/ 2015-11-27 06:44 - The data under each of these directories was obtained from the named -source. In these examples, it is actually injected by DataInterchange +source. In these examples, it is actually injected by Data Interchange staff, and the names are chosen to represent the origin of the data. -The original Environment and Climate Change Canada data mart, is +The original Environment and Climate Change Canada data mart is one "source" in this sense, showing up on hpfx as WXO-DD, or the same tree being available at the root of:: https://dd.weather.gc.ca -once down to the viewing the content from a given source, +Once down to the viewing the content from a given source, products are organized in a way defined by the source:: Icon Name Last modified Size Description - [TXT] about_dd_apropos.txt 2021-05-17 13:23 1.0K + [TXT] about_dd_apropos.txt 2021-05-17 13:23 1.0K [DIR] air_quality/ 2020-12-10 14:47 - [DIR] alerts/ 2022-07-13 12:00 - [DIR] analysis/ 2022-07-13 13:17 - @@ -122,7 +121,7 @@ The configuration files are a few lines of configuration, and sr3 includes some examples. -To list the available configurations with *sr3 list* :: +You can list the available configurations with *sr3 list*:: $ sr3 list examples Sample Configurations: (from: /usr/lib/python3/dist-packages/sarracenia/examples ) @@ -142,7 +141,7 @@ broadcast system for weather data in the 1980's. It is a continuous stream of te and each message is limited to 14000 bytes. The service was transitioned to an internet streaming feed in the early 2000's, and the streaming version is still fed to those interested in air and maritime navigation across the country. -One can download a continuous feed of such traditional weather bulletins from the original datamart using the subscribe/dd_amis.conf +One can download a continuous feed of such traditional weather bulletins from the original data mart using the subscribe/dd_amis.conf configuration example:: $ sr3 add subscribe/dd_amis.conf @@ -162,6 +161,7 @@ To view a configuration, give it to `sr3 list` as an argument:: # this is a feed of wmo bulletin (a set called AMIS in the old times) broker amqps://dd.weather.gc.ca/ + topicPrefix v02.post # instances: number of downloading processes to run at once. defaults to 1. Not enough for this case instances 5 @@ -174,24 +174,24 @@ To view a configuration, give it to `sr3 list` as an argument:: directory /tmp/dd_amis accept .* -Then it can be run interactively *sr3 foreground subscribe/dd_amis* or as a service -with *sr3 start subscribe/dd_amis* in both cases, files will be downloaded from +Then it can be run interactively with *sr3 foreground subscribe/dd_amis* or as a service +with *sr3 start subscribe/dd_amis*. In both cases, files will be downloaded from dd.weather.gc.ca into the local machine's /tmp/dd_amis directory. -more: +More information: * `CLI Introduction (Jupyter Notebook) <../Tutorials/1_CLI_introduction.html>`_ -* `Setup a Remote Subscriber <../Tutorials/Setup_a_remote_subscriber.html>`_ +* `Set up a Remote Subscriber <../Tutorials/Setup_a_remote_subscriber.html>`_ * `Options in the configuration file <../Reference/sr3_options.7.rst>`_ -Server Side Resources Allocated for Subscribers +Server-Side Resources Allocated for Subscribers ----------------------------------------------- Every configuration results in corresponding resources being declared on the broker, whose lifetime is controlled by the *expire* setting. The default *expire* is set to 300 seconds to avoid cluttering up servers with small experiments. Set *expire* -the the value that makes the most sense for your application (long enough to cover -outages you may experience.) In a configuration file, something like:: +to the value that makes the most sense for your application (long enough to cover +outages you may experience). In a configuration file, something like:: expire 3h @@ -209,7 +209,7 @@ subscriber starts up again, the queued notification messages are forwarded to th So when the *subtopic* option is changed, since it is already defined on the server, one ends up adding a binding rather than replacing it. For example, if one has a subtopic that contains SATELLITE, and then stops the subscriber, -edit the file and now the topic contains only RADAR, when the subscriber is +edits the file and now the topic contains only RADAR, when the subscriber is restarted, not only will all the queued satellite files be sent to the consumer, but the RADAR is added to the bindings, rather than replacing them, so the subscriber will get both the SATELLITE and RADAR data even though the configuration @@ -219,7 +219,7 @@ Also, if one is experimenting, and a queue is to be stopped for a very long time, it may accumulate a large number of notification messages. The total number of notification messages on a data pump has an effect on the pump performance for all users. It is therefore advisable to have the pump de-allocate resources when they will not be needed -for an extended periods, or when experimenting with different settings. +for an extended period, or when experimenting with different settings. Working with Multiple Configurations @@ -234,8 +234,7 @@ that directory: dd_amis.conf and hpfx_amis.conf, one could then run:: fractal% -to start the CMC downloading configuration. One can use by -using the sr3 command to start/stop multiple configurations at once. +to start the CMC downloading configuration. One can use the sr3 command to start/stop multiple configurations at once. The sr3 command will go through the default directories and start up all the configurations it finds:: @@ -263,11 +262,11 @@ all the configurations it finds:: will start up some sr3 processes as configured by CMC.conf and others to match hpfx_amis.conf. Sr3 stop will also do what you would expect. As will sr3 status. -Note that there are 5 sr_subscribe processes start with the CMC +Note that there are 5 sr_subscribe processes that start with the CMC configuration and 3 NWS ones. These are *instances* and share the same download queue. -more: +More information: * `Command line Guide <../Explanation/CommandLineGuide.html>`_ * `Sr3 Manual page <../Reference/sr3.1.html>`_ @@ -386,6 +385,7 @@ Note the following:: $ sr3 edit subscribe/swob broker amqps://anonymous@dd.weather.gc.ca + topicPrefix v02.post accept .*/observations/swob-ml/.* #write all SWOBS into the current working directory @@ -412,6 +412,7 @@ then the option mirror should be set:: $ sr3 edit subscribe/swob broker amqps://anonymous@dd.weather.gc.ca + topicPrefix v02.post subtopic observations.swob-ml.# directory /tmp mirror True @@ -428,6 +429,7 @@ it will cause files to be placed relative to that directory:: $ sr3 edit subscribe/ddi_ninjo_part1.conf broker amqps://ddi.cmc.ec.gc.ca/ + topicPrefix v02.post subtopic ec.ops.*.*.ninjo-a.# directory /tmp/apps/ninjo/import/point/reports/in @@ -804,6 +806,7 @@ $ mknod /home/peter/test/.rxpipe p $ sr3 edit subscribe/swob broker amqps://anonymous@dd.weather.gc.ca + topicPrefix v02.post subtopic observations.swob-ml.# rxpipe_name /home/peter/test/.rxpipe @@ -839,7 +842,7 @@ package, then one can add the following to a subscriber configuration file:: broker amqps://dd.weather.gc.ca - topicPredix v02.post + topicPrefix v02.post batch 1 callback clamav subtopic observations.swob-ml.# diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 3ef81e96e..245270105 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -686,7 +686,10 @@ durable (default: True) The AMQP **durable** option, on queue declarations. If set to True, the broker will preserve the queue across broker reboots. -It means writes the queue is on disk if the broker is restarted. +The queue will be written to and recovered from disk if the broker is restarted. + +Note: only *persistent* messages will remain in a durable queue after a broker restart. +Persistent messages can be published by enabling the **persistent** option (it is enabled by default). fileEvents ---------------------------- @@ -1305,6 +1308,18 @@ a file to be accepted. The **permDefault** options specifies a mask, that is the permissions must be at least what is specified. +persistent (default: True) +---------------------------------- + +The **persistent** option sets the *delivery_mode* for an AMQP message. When True, +persistent messages will be published (``delivery_mode=2``), when False, transient +(non-durable, ``delivery_mode=1``) messages will be published. + +Persistent messages are written to disk by the broker, and will survive a broker restart. +Any transient messages in a queue will be lost when a broker is restarted. Note: persistent +messages will only survive a broker restart *when they reside in a durable queue*. Non-durable +queues, including all messages inside them, will be lost when a broker is restarted. + pollUrl ------------- @@ -1578,6 +1593,25 @@ sanity_log_dead (default: 1.5*housekeeping) The **sanity_log_dead** option sets how long to consider too long before restarting a component. +scheduled_interval,scheduled_hour,scheduled_minute +-------------------------------------------------- + +When working with scheduled flows, such as polls, one can configure a duration +(no units defaults to seconds, suffixes: m-minute, h-hour) at which to run a +given activity:: + + scheduled_interval 30 + +run the flow or poll every 30 seconds. If no duration is set, then the +flowcb.scheduled.Scheduled class will look for the other two time specifiers:: + + scheduled_hour 1,4,5,23 + scheduled_minute 14,17,29 + + +which will have the poll run each day at: 01:14, 01:17, 01:29, then the same minutes +after each of 4h, 5h and 23h. + shim_defer_posting_to_exit (EXPERIMENTAL) ----------------------------------------- @@ -1613,11 +1647,14 @@ shim_skip_parent_open_files (EXPERIMENTAL) sleep