Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mcap export format #143

Open
gbin opened this issue Dec 10, 2024 · 8 comments
Open

Support mcap export format #143

gbin opened this issue Dec 10, 2024 · 8 comments
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@gbin
Copy link
Collaborator

gbin commented Dec 10, 2024

mcap is a standard logging format used by log visualization and management tools.

We have a placeholder for log extraction here https://github.com/copper-project/copper-rs/blob/master/core/cu29_export/src/lib.rs#L82

We would need to extend that to output a properly formed mcap file.

The rust bindings are here: https://docs.rs/mcap/latest/mcap/

@gbin gbin added enhancement New feature or request good first issue Good for newcomers labels Dec 10, 2024
@eggyal
Copy link

eggyal commented Dec 13, 2024

Hi, I'm interested in tackling this one.

Having delved a little into the codebase, it appears that while the parameters of existing log entries are typed (cu29_value::Value), the log entries themselves are untyped/schemaless. (Of course they do have a format into which a string rendering of each parameter is interpolated, but at best that format can only be used to determine how many parameters are expected: not what type they are/should be).

Is this a correct understanding of the status quo?

If so, unless it's possible to construct a schema from some other (out of band) source, it seems to me that:

  1. we can map each distinct CuLogEntry.msg_index to a distinct mcap::Channel;
  2. those channels can either be schemaless (not terribly useful) or else we can attempt to infer their schema from the log entry's format string and actual parameter values. Unfortunately, inference might not always work:
    • suppose a parameter happens to be an empty Value::Option or Value::Map: it will not be possible to ascertain what the type would have been if non-empty—we could continue scanning the log until a non-empty value of that parameter is found but this could be expensive if the logs are large, and it is not guaranteed that any such parameter will ever be found
    • should, for some reason, different entries have different value types for the same parameter, any such inferred schema would be violated: is it possible that this could arise (essentially requiring some sort of union type in the schema) or would this be a fatal error?
  3. Alternatively, we could add schema information to Copper's own log format (or use mcap internally).

Is this something that the project already has some thoughts about? My gut says using mcap internally might actually be quite sensible: but obviously that'd be a much more significant change that touches on many more areas of the codebase (and requires some consideration regarding migration/ongoing support of existing deployments/log files).

@gbin
Copy link
Collaborator Author

gbin commented Dec 13, 2024

Hey @eggyal thank you for checking this thing out!

What about doing something like this: #151 and asking our users to just add Schema on their messages (maybe optionally or not) so we can export it?

@gbin
Copy link
Collaborator Author

gbin commented Dec 13, 2024

using mcap internally btw is not really an option as there are allocations & jitter generation in the lib. It is also 3 to 4x slower than the pretty brutal memmap from copper :P.
Let's keep the conversion at log ingest... Here I attach the benchmark results for posterity:

(Initial fsync to get the pending IOs out of the way): 39.599 ms

Copper: End to End: 1.754 s
Copper: allocated: 0 bytes
Copper: mean jitter: 67 ns
Copper: max jitter: 877.055 µs

(fsync check after Copper): 9.219 ms

mcap: End to End: 5.220 s
mcap: allocated: 269583972 bytes
mcap: mean jitter: 1.097 µs
mcap: max jitter: 8.454 ms

(fsync check after Mcap): 9.811 ms

This is the proggy to reproduce:

use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use cu29::bincode::{Decode, Encode};
use cu29::prelude::*;

#[derive(Default, Clone, Debug, Encode, Decode)]
struct TestPayload {
    number: u64,
    array: [u8; 10],
    float: f64,
}

fn global_system_fsync() {
    unsafe {
        libc::sync();
    }
}

struct ScopeTimer {
    start: CuDuration,
    msg: String,
    clk: RobotClock,
}

impl ScopeTimer {
    fn new(clk: &RobotClock, msg: &str) -> Self {
        Self {
            start: clk.now(),
            msg: msg.to_string(),
            clk: clk.clone(),
        }
    }
}

impl Drop for ScopeTimer {
    fn drop(&mut self) {
        println!("{}: {}", self.msg, self.clk.now() - self.start);
    }
}

const NB_MSG: usize = 20_000_000;

fn main() {

    let _a = ScopedAllocCounter::new();

    let clk = RobotClock::new();
    {
        let _t = ScopeTimer::new(&clk, "(Initial fsync to get the pending IOs out of the way)");
        global_system_fsync();
    }

    // Copper side.
    let file_path = "./ziooon.copper";
    let mut timings = CuDurationStatistics::new(CuDuration::from(Duration::from_millis(100)));
    let copper_allocations = {
        let _t = ScopeTimer::new(&clk, "Copper: End to End");
        let UnifiedLogger::Write(logger) = UnifiedLoggerBuilder::new()
            .write(true)
            .create(true)
            .file_base_name(Path::new(file_path))
            .preallocated_size(4 * 1024 * 1024 * 1024)
            .build()
            .expect("Failed to create logger")
        else {
            panic!("Failed to create logger")
        };
        let logger = Arc::new(Mutex::new(logger));

        let allocs_before = GLOBAL.get_allocated();
        let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 40 * 1024 * 1024);
        {
            for i in 0..NB_MSG {
                let bf = clk.now();
                let test_payload = TestPayload {
                    number: i as u64,
                    array: [(i % 255) as u8; 10],
                    float: i as f64,
                };
                let mut msg = CuMsg::<TestPayload>::new(Some(test_payload));
                msg.metadata.process_time = PartialCuTimeRange { start: CuDuration(22).into(), end: CuDuration(23).into() };
                msg.metadata.tov = CuDuration(19).into();
                stream.log(&msg).unwrap();
                timings.record(clk.now() - bf);
            }
        }
        // here everything will be dropped and a final flush to disk will be done
        GLOBAL.get_allocated() - allocs_before
    };
    println!("Copper: allocated: {} bytes", copper_allocations);
    println!("Copper: mean jitter: {}", timings.jitter_mean());
    println!("Copper: max jitter: {}", timings.jitter_max());
    {
        // To verify that really, everything was flushed to disk, it should be close to immediate
        let _t = ScopeTimer::new(&clk, "(fsync check after Copper)");
        global_system_fsync();
    }


    let mut timings = CuDurationStatistics::new(CuDuration::from(Duration::from_millis(100)));

    // mcap side
    let mcap_allocations = {
        let _t = ScopeTimer::new(&clk, "Mcap: End to End");
        use mcap::{Channel, records::MessageHeader, Writer};
        use std::{collections::BTreeMap, fs, io::BufWriter};

        let mut out = Writer::new(
            BufWriter::new(fs::File::create("out.mcap").unwrap())
        ).unwrap();

        let my_channel = Channel {
            topic: String::from("cool stuff"),
            schema: None,
            message_encoding: String::from("application/octet-stream"),
            metadata: BTreeMap::default()
        };

        let channel_id = out.add_channel(&my_channel).unwrap();
        let mut buffer = [0u8; 1024];

        let allocs_before = GLOBAL.get_allocated();
        for i in 0..NB_MSG {
            let bf = clk.now();
            let data = TestPayload {
                number: i as u64,
                array: [(i % 255) as u8; 10],
                float: i as f64,
            };
            // apple to apple with Copper
            let length = bincode::encode_into_slice(
                data,
                &mut buffer,
                bincode::config::standard()
            ).unwrap();

            out.write_to_known_channel(
                &MessageHeader {
                    channel_id,
                    sequence: 32,
                    log_time: 23,
                    publish_time: 25
                },
                &buffer[..length]
            ).unwrap();
            timings.record(clk.now() - bf);
        }
        out.finish().unwrap();
        GLOBAL.get_allocated() - allocs_before
    };
    println!("mcap: allocated: {} bytes", mcap_allocations);
    println!("mcap: mean jitter: {}", timings.jitter_mean());
    println!("mcap: max jitter: {}", timings.jitter_max());
    {
        // To verify that really, everything was flushed to disk, it should be close to immediate
        let _t = ScopeTimer::new(&clk, "(fsync check after Mcap)");
        global_system_fsync();
    }

}

@gbin
Copy link
Collaborator Author

gbin commented Dec 22, 2024

@eggyal if you have a draft of the PR feel free to push it and we can jam on it

@eggyal
Copy link

eggyal commented Dec 23, 2024

I'm still a little way from a draft PR, but I've pushed my current WIP to my schemaful-encoding branch (diff).

As you can see, I've been working on a way to represent a type's schema in a serialisation-agnostic manner, but in such a way that an implementation of a serialisation format can extract the schema whilst being assured at compile-time (by Rust's type system) that the schema is fully coherent with the data.

Unfortunately it's only in the last day or so that I realised the sole serialisation format for which I so far have a sketch implementation (ROS 2.0 messages) has no schema support for union types, which means that messages cannot contain eg Option fields or such. I intend to implement other (more capable) serialisation formats in due course, but I might nevertheless proceed with ROS 2.0 messages for the time being in order to make progress more rapidly in integrating with the Copper codebase and resolve this issue.

As regards next steps, I intend to modify the log format used internally by Copper to record the schema in some way.

@eggyal
Copy link

eggyal commented Dec 23, 2024

PS: it's likely that some of the documentation/comments are outdated.

@gbin
Copy link
Collaborator Author

gbin commented Dec 23, 2024

Cool I'll have a look! For the options we can export it as a boolean + value or something like that?

@eggyal
Copy link

eggyal commented Dec 23, 2024

That was my first thought too, but it won't work because instances would always require the value be occupied: and we might not be able to construct a sane value that is safe to be ignored/discarded.

But actually we could use BoundedArray bounded to 1 element, such that enums lower to one such field per variant plus an integer discriminant. 👍🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

2 participants