Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add POC async implementation, example using storescp #542

Merged
merged 49 commits into from
Oct 23, 2024

Conversation

naterichman
Copy link
Contributor

POC async implementation.

  • Created a feature flag tokio which controls the importing of read_pdu and write_pdu from either [read|write].rs or [read_nonblocking|write_nonblocking].rs
  • Created feature gated implementations of the methods in Client and Server which depend on using read_pdu/write_pdu or any read/write to the socket themselves.

A couple other notes:

  • We could consider using the maybe_async crate.
    • Since most of the code in read_pdu/write_pdu is copied for the sync/async implementation I thought this could be nice to use, but I found that I'd have to extend the macro to adjust the trait bounds for the sync/async implementation and I couldn't figure out the best way to do that
  • I had to change a little bit of the internal API for write_chunk_u16 and write_chunk_u32
  • For now I completely ignored the PDataReader and PDataWriter since I'm not exactly clear where they should be used or not used (storescp doesn't use those at all, and in my implementation of c-find I didn't end up using either at all either)
  • I just added the timeouts in another MR, but it seems like the concept of socket level timeouts doesn't exist in an async context? So Future work will be adding the timeout to the individual methods like send, receive, establish, etc. via something like tokio::time::timeout

Also note this is meant to just be one possible implementation, and I totally expect completely rewriting this MR however you want it to look! Thanks in advance!

@naterichman
Copy link
Contributor Author

I've been doing some more reading up, and unfortunately it seems like the options are

  1. Repeat a bunch of code either manually or with a nice macro to write truly sync and async versions of ul, but that would (and the current PR) break users expecting cargo install dicom-ul --all-features to work.
  2. Make the code async only, and expose a blocking client which just calls block_on for the blocking client

@Enet4
Copy link
Owner

Enet4 commented Jul 21, 2024

Much appreciated!

I've been doing some more reading up, and unfortunately it seems like the options are

  1. Repeat a bunch of code either manually or with a nice macro to write truly sync and async versions of ul, but that would (and the current PR) break users expecting cargo install dicom-ul --all-features to work.
  2. Make the code async only, and expose a blocking client which just calls block_on for the blocking client

There ought to be a bit of redundancy in this process unfortunately. Whether to go async or not requires a different function "color" which affects its inner workings altogether.

At best, there is a way to reduce the amount of redundancy, which is to write non-blocking and non-polling implementations of the PDU readers. While this can be a bit trickier than the existing logic at read_pdu, this would couple nicely with the concept of framing without requiring users to depend on tokio if they only intend to use the blocking API.

Let me know if more guidance is needed here.

@naterichman
Copy link
Contributor Author

Let me make sure I'm understanding where you're going. For the reader implementation, it would be something like:

  • Rename read_pdu to something like parse_pdu and leave the trait bound as R: Read, keeping the logic essentially as is but changing the return type to something like Result<Option<Pdu>> instead of Result<Pdu>
  • Making an async receive and leaving the existing receive method on the ClientAssociation, changing the logic to something like:
pub async fn receive(&mut self) -> Result<Pdu> {
    loop {

        if let Some(pdu) = parse_pdu(&mut self.buffer)? {
            return Ok(Some(pdu));
        }

        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}

And similar for pub fn receive? (Mostly copy/pasting code from the framing article you linked 😄 )

@Enet4
Copy link
Owner

Enet4 commented Jul 23, 2024

Let me make sure I'm understanding where you're going. For the reader implementation, it would be something like:

  • Rename read_pdu to something like parse_pdu and leave the trait bound as R: Read, keeping the logic essentially as is but changing the return type to something like Result<Option<Pdu>> instead of Result<Pdu>

That could work if we use a reader that keeps a buffer of all partial data, so that it can be read multiple times until a complete frame is available. In practice, we may be better off making this function receive a value with a better trait bound than Read, providing a peekable source of bytes which can be extended as more data arrives from the network, and only consumes the front bytes when individual frames comprising those bytes have been fully processed. The bytes crate (part of the tokio ecosystem) offers some nice data types and traits for this, so it may be worth trying it out.

@naterichman
Copy link
Contributor Author

I'm not really following, sorry! What kind of trait bound might be better, something like Buf or BufMut? At that point why not just make read_pdu take &mut BytesMut or Cursor<&[u8]>. Any chance you could provide a little code snippet of something you had in mind so I know which direction to go?

@Enet4
Copy link
Owner

Enet4 commented Jul 24, 2024

What kind of trait bound might be better, something like Buf or BufMut? At that point why not just make read_pdu take &mut BytesMut or Cursor<&[u8]>.

Both BytesMut and Cursor<&[u8]> implement Buf, but they assume a contiguous portion of bytes. The way I see it, the trait would make room for memory usage optimizations by allowing us to read from chained portions of data (thinking network packet payloads).

Any chance you could provide a little code snippet of something you had in mind so I know which direction to go?

I would just try and see if you can tweak the implementation so that the function signature is fn read_pdu(x: impl Buf) -> Result<Option<Pdu>>. BufMut would offer write access, which we do not need for reading.

Change is working with async storescp, still need to try how it
would work with the sync version
Copy link
Contributor Author

@naterichman naterichman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Called out a few specific changes. I have it working again as POC for the storescp command, still need to figure out how the sync client would look. LMK your thoughts/if this is the direction you had in mind

ul/src/association/client.rs Outdated Show resolved Hide resolved
ul/src/pdu/reader.rs Outdated Show resolved Hide resolved
ul/src/association/client.rs Outdated Show resolved Hide resolved
* Finish exposing various methods of client/server as feature-gated
  async
* Finish async PDataWriter (still having issues)
@naterichman
Copy link
Contributor Author

Okay, I'm happier with the implementation now. I have a POC fully working for storescp, but I'm having trouble getting the AsyncPDataWriter working for use with storescu. Currently I get the association working fine, but the send_pdata section is not working.

Orthanc logs:

======================= END A-ASSOCIATE-AC ======================
T0807 10:13:53.602828          DICOM-1 CommandDispatcher.cpp:760] (dicom) Received Command:
===================== INCOMING DIMSE MESSAGE ====================
Message Type                  : C-STORE RQ
Presentation Context ID       : 1
Message ID                    : 1
Affected SOP Class UID        : MRImageStorage
Affected SOP Instance UID     : 1.3.6.1.4.1.14519.5.2.1.8421.4004.102660487069874494712993403337
Data Set                      : present
Priority                      : medium
======================= END DIMSE MESSAGE =======================
I0807 10:13:53.602850          DICOM-1 main.cpp:353] Incoming Store request from AET STORE-SCU on IP 127.0.0.1, calling AET ANY-SCP
E0807 10:13:53.603049          DICOM-1 StoreScp.cpp:273] Store SCP Failed: DIMSE Failed to receive message

Wireshark capture:
image

Can attach the actual capture file if needed

I'm hoping you could

  1. Go over existing implementation of read_pdu and write_pdu and changes to the interface of the server/client from a high level so I know if I'm approaching it right.
  2. Similar for high level review of how I've introduced the async functionality into storescu and storescp
  3. Help me with the AsyncPDataWriter issue hopefully

After that, I'd like to start writing tests, I just didn't want to write them yet if the interface is going to change.

And then I'd like to do some benchmarking of the async vs sync code and also the new sync code (with the framing) vs. the old sync code.

LMK your thoughts! Thanks again!

@Enet4 Enet4 added A-lib Area: library A-tool Area: tooling C-ul Crate: dicom-ul C-storescu Crate: dicom-storescu C-storescp Crate: dicom-storescp labels Aug 7, 2024
Copy link
Owner

@Enet4 Enet4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for continuing with this PR. There is a lot to go through, and it is not clear from the code what the problem with PData reading/writing could be. I can try testing this against other platforms. Until then, I left some feedback inline for things which should be taken care of.

storescp/Cargo.toml Outdated Show resolved Hide resolved
Cargo.toml Outdated Show resolved Hide resolved
encoding/src/text.rs Outdated Show resolved Hide resolved
storescu/Cargo.toml Outdated Show resolved Hide resolved
storescu/src/main.rs Outdated Show resolved Hide resolved
ul/src/association/client.rs Outdated Show resolved Hide resolved
ul/src/association/client.rs Outdated Show resolved Hide resolved
ul/src/association/client.rs Outdated Show resolved Hide resolved
ul/src/association/pdata.rs Outdated Show resolved Hide resolved
@naterichman
Copy link
Contributor Author

Regarding the storescu to storescp. I was not able to reproduce that, I also added real concurrency to storescu so now there is a -c flag which represents how many tasks to spin up for sending a lot of files. I tried that a few times and had no issues.

Additionally, I changed the options for storescp so that it is no longer async by default.

Where are those files you were testing on? I'd like to see if I can reproduce using that file, or maybe its because you are on windows and there are some minor differences (I'm on linux).

I also updated the documentation in ul!

@Enet4
Copy link
Owner

Enet4 commented Oct 16, 2024

Where are those files you were testing on? I'd like to see if I can reproduce using that file, or maybe its because you are on windows and there are some minor differences (I'm on linux).

The file I used was MG1_JPLY from the WG04 test file set, though I'm not sure if the problem was specific to this file. It can also be downloaded from dicom-test-files.

I was using Linux this time, but I can try again on both machines in any case and get back to you. :)

Copy link
Owner

@Enet4 Enet4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the same test again and this time it worked! Aside from the comments inline, all that should be left to do here is fix the tests.

storescp/src/main.rs Outdated Show resolved Hide resolved
storescu/src/main.rs Show resolved Hide resolved
Copy link
Owner

@Enet4 Enet4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the liberty of making some corrections around feature-gating on "async" (this was explained in one of the comments inline). I only have a suggestion for one of the doctests which I would like you to validate, then we're very likely ready to merge.

ul/src/association/server.rs Outdated Show resolved Hide resolved
.github/workflows/rust.yml Show resolved Hide resolved
@Enet4 Enet4 linked an issue Oct 17, 2024 that may be closed by this pull request
@Enet4
Copy link
Owner

Enet4 commented Oct 18, 2024

I would have greatly wished to merge this, but right now the transfer goes wrong whenever the concurrency option in dicom-storescu is set, even when I set it to 1. For instance:

cargo run --bin dicom-storescu -- -c 1 ANYSCP@localhost:1111 .../dicom-test-files/data/WG04/JPLL

Our store SCP starts reporting errors of this kind at random:

024-10-18T13:50:03.021555Z  INFO dicom_storescp::store_async: New association from STORE-SCU
2024-10-18T13:50:03.031017Z  INFO dicom_storescp::store_async: Stored storage\1.3.6.1.4.1.5962.1.1.20.1.4.20040826185059.5457.dcm
2024-10-18T13:50:03.060996Z ERROR dicom_storescp: failed to read DICOM data object

Caused by these errors (recent errors listed first):
  1: Could not read data set token
  2: Could not read item header
  3: Could not decode element header at position 1311814
  4: Failed to read the item header
  5: failed to fill whole buffer

And often the files won't end up saved to disk (I had to set concurrency to a number greater than 1 and keep trying to find valid files in storage).

Also tested with another store SCP (Dicoogle), it also reports errors and returns status code 101h.

This could constitute a problem with PDU writing in async mode, so withholding concurrency might just be hiding the problem. I'm afraid this will stay blocked until we find a fix.

@naterichman
Copy link
Contributor Author

I was worried that it was a random/intermittent thing when you said it failed at first but when you ran it again it passed. I will try to reproduce and look into it!

Copy link
Owner

@Enet4 Enet4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking into this right now! Still haven't gotten to the root of the problem, but encountered a few other things in the code worth looking into.


{
let mut pdata = scu.send_pdata(pc_selected.id).await;
pdata.write_all(&object_data).await.unwrap();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably turn this into a recoverable error.

Comment on lines +501 to 505
if let Err(e) = result {
error!("{}", Report::from_error(e));
if fail_first {
std::process::exit(-2);
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is testing the error on task join, but not the application error returned by the task. This worked better on my machine.

Suggested change
if let Err(e) = result {
error!("{}", Report::from_error(e));
if fail_first {
std::process::exit(-2);
}
match result {
Err(e) => {
error!("{}", Report::from_error(e));
if fail_first {
std::process::exit(-2);
}
}
Ok(Err(e)) => {
error!("{}", Report::from_error(e));
if fail_first {
std::process::exit(-2);
}
}
Ok(Ok(_)) => {}
}

@naterichman
Copy link
Contributor Author

Will get to those too. Don't worry about figuring out the sendscu issue, its something within the messed up logic of self.writing in the implementation of AsyncPDataWriter.poll_write. I've had a hell of a time figuring out how to handle the underlying stream write call returining Poll::Pending, so I just need to figure out the correct logic for handling that, but it will get there!

* Make AsyncPDataWriter a proper state machine
* Remove use of `<stream>.write_all` which already loops over input data
  and removes some of our control, switch to manual use of `poll_write`
  on underlying stream
@naterichman
Copy link
Contributor Author

Okay I believe this is fixed. It only ever came up on sufficiently large files and I believe it had to do with how the tokio TCPStream write method works in that a lot of data can be written immediately, but with a large enough amount to transfer, it can sometimes return Pending instead of Ready. This explains why I had no issues sending a lot of smaller files and then was able to reproduce by trying to send a large US dicom.

I'll be honest feel much better about this implementation too since the original was mostly guided by chatGPT... This implementation I understand much better, and I left a decent amount of inline comments explaining it for future reference

Copy link
Owner

@Enet4 Enet4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I did a few more tests and found no more issues. Terrific work!

@Enet4 Enet4 merged commit 389cd8a into Enet4:master Oct 23, 2024
4 checks passed
@Enet4 Enet4 added the breaking change Hint that this may require a major version bump on release label Nov 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-lib Area: library A-tool Area: tooling breaking change Hint that this may require a major version bump on release C-storescp Crate: dicom-storescp C-storescu Crate: dicom-storescu C-ul Crate: dicom-ul
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Starting the discussion on async support
3 participants