Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(protocol): add serverless protocol library #16

Closed
wants to merge 10 commits into from
4 changes: 4 additions & 0 deletions .github/workflows/flow_lint_rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- run: |
sudo apt-get update
sudo apt-get install -y libudev-dev
- uses: actions/checkout@v4
name: Checkout project

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
members = [ "src/api","src/vmm", "src/cli"]
members = ["src/api", "src/serverless_protocol", "src/vmm", "src/cli"]
resolver = "2"
22 changes: 22 additions & 0 deletions src/serverless_protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "serverless_protocol"
version = "0.1.0"
edition = "2021"

[lib]
name = "serverless_protocol"

[dependencies]
bincode = "1.3.3"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
serialport = "4.3.0"
digest = "0.11.0-pre.8"
sha256 = "1.5.0"

[dev-dependencies]
clap = { version = "4.5.2", features = ["derive"] }

[[example]]
name = "serverless_protocol_example"
path = "examples/serverless_protocol_example.rs"
19 changes: 19 additions & 0 deletions src/serverless_protocol/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Serverless Protocol Over Serial

Allows the VMM and Agent to communicate over a serial connection.

## How to test it

1. Use socat to create a virtual serial port:

```bash
socat -d -d pty,raw,echo=0 pty,raw,echo=0
```

2. Run the example:

```bash
cargo run --example cargo run --example serverless_protocol_example -- --serial-path-a=<path_to_first_pty> --serial-path-b=<path_to_second_pty>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove typo

Suggested change
cargo run --example cargo run --example serverless_protocol_example -- --serial-path-a=<path_to_first_pty> --serial-path-b=<path_to_second_pty>
cargo run --example serverless_protocol_example -- --serial-path-a=<path_to_first_pty> --serial-path-b=<path_to_second_pty>

```

This example will show how processes can communicate over a serial connection.
75 changes: 75 additions & 0 deletions src/serverless_protocol/examples/serverless_protocol_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
extern crate clap;
extern crate serverless_protocol;

use std::{thread, time::Duration};

use clap::Parser;
use serverless_protocol::{
messages::{MessageType, Payload, StartMessage},
CloudletMessage, CloudletProtocol,
};

#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(long)]
serial_path_a: String,

#[arg(long)]
serial_path_b: String,
}

fn main() {
let args = Args::parse();
println!("{:?}", args);

let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();

let handle_a = thread::spawn(|| {
let serial_path = args.serial_path_a;

let serial_port = serialport::new(serial_path, 115_200)
.timeout(Duration::from_secs(10))
.open_native()
.expect("Failed to open serial port");

let mut protocol = CloudletProtocol::new(serial_port);

println!("waiting for message");

let message = protocol
.read_message()
.expect("Failed to read message from serial port");

println!("{:?}", message);
});

let handle_b = thread::spawn(|| {
let serial_path = args.serial_path_b;

let serial_port = serialport::new(serial_path, 115_200)
.timeout(Duration::from_secs(10))
.open_native()
.expect("Failed to open serial port");

let mut protocol = CloudletProtocol::new(serial_port);

let message = CloudletMessage::new(
MessageType::Start,
Payload::Start(StartMessage::new("Hello, World!".to_string())),
);

println!("sending message: {:?}", message);

protocol.send_message(message);

println!("message sent")
});

handles.push(handle_a);
handles.push(handle_b);

for handle in handles {
handle.join().unwrap();
}
}
181 changes: 181 additions & 0 deletions src/serverless_protocol/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
pub mod messages;

use std::io::{Read, Write};

use messages::{MessageType, Payload};
use serialport::TTYPort;

use sha256::digest;

const TERMINATOR: u8 = 0xC0;
const ESCAPE: u8 = 0xDB;
const ESCAPE_TERMINATOR: u8 = 0xDC;
const ESCAPE_ESCAPE: u8 = 0xDD;

#[derive(Debug)]
pub enum Error {
IoError(std::io::Error),
Utf8Error(std::str::Utf8Error),
ChecksumError,
MessageTypeDeserializationError(&'static str),
PayloadDeserializationError(serde_json::Error),
}

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug)]
pub struct CloudletProtocol {
pub serial_port: TTYPort,
}

#[derive(Debug)]
pub struct CloudletMessage {
pub message_type: MessageType,
pub checksum: Vec<u8>,
pub payload: Payload,
}

pub fn create_checksum(message_type: &MessageType, payload: &Payload) -> Vec<u8> {
let type_bytes = message_type.to_owned() as u8;
let mut bytes = vec![type_bytes];

bytes.append(&mut bincode::serialize(payload).unwrap());

digest(&bytes).as_bytes().to_vec()
}

impl CloudletMessage {
pub fn new(message_type: MessageType, payload: Payload) -> CloudletMessage {
let checksum = create_checksum(&message_type, &payload);

CloudletMessage {
message_type,
checksum,
payload,
}
}
}

impl CloudletProtocol {
pub fn new(serial_port: TTYPort) -> CloudletProtocol {
CloudletProtocol { serial_port }
}

/// Escape a buffer to avoid the terminator byte and the escape byte
fn escape_buffer(buffer: Vec<u8>) -> Vec<u8> {
let mut result: Vec<u8> = Vec::new();
for byte in buffer {
match byte {
TERMINATOR => {
result.push(ESCAPE);
result.push(ESCAPE_TERMINATOR);
}
ESCAPE => {
result.push(ESCAPE);
result.push(ESCAPE_ESCAPE);
}
_ => {
result.push(byte);
}
}
}
result
}

/// Write a message to the serial port
/// Escape the message to avoid the terminator byte
/// and the escape byte
/// The message is terminated with the terminator byte
/// The message is formatted as follows:
///
/// # Arguments
///
/// * `message` - The message to send
pub fn send_message(&mut self, message: CloudletMessage) {
let mut buffer: Vec<u8> = Vec::new();
let message_type = message.message_type as u16;
buffer.append(&mut message.checksum.clone());
buffer.push((message_type >> 8) as u8);
buffer.push((message_type & 0xFF) as u8);
let json_payload = serde_json::to_string(&message.payload).unwrap();
buffer.extend(json_payload.as_bytes());

buffer = CloudletProtocol::escape_buffer(buffer);

buffer.push(TERMINATOR);

self.serial_port
.write_all(&buffer)
.expect("Failed to write message to serial port");
}

/// Read a message from the serial port
pub fn read_message(&mut self) -> Result<CloudletMessage> {
let mut buffer: Vec<u8> = Vec::new();
let mut byte = [0];

loop {
match self.serial_port.read_exact(&mut byte) {
Ok(_) => match byte[0] {
TERMINATOR => break,
ESCAPE => match self.serial_port.read_exact(&mut byte) {
Ok(_) => match byte[0] {
ESCAPE_TERMINATOR => buffer.push(TERMINATOR),
ESCAPE_ESCAPE => buffer.push(ESCAPE),
_ => {
return Err(Error::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
"Invalid escape sequence",
)))
}
},
Err(e) => return Err(Error::IoError(e)),
},
_ => buffer.push(byte[0]),
},
Err(e) => return Err(Error::IoError(e)),
}
}

if buffer.len() < 4 {
return Err(Error::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
"Message too short",
)));
}

let checksum = buffer[0..64].to_vec();
let message_type = u16::from_be_bytes([buffer[64], buffer[65]]);
let message_type =
MessageType::try_from(message_type).map_err(Error::MessageTypeDeserializationError)?;
let json_payload = String::from_utf8_lossy(&buffer[66..]).into_owned();

let payload =
match message_type {
MessageType::Start => serde_json::from_str(&json_payload)
.map_err(Error::PayloadDeserializationError)?,

MessageType::Exit => serde_json::from_str(&json_payload)
.map_err(Error::PayloadDeserializationError)?,

MessageType::Interrupt => serde_json::from_str(&json_payload)
.map_err(Error::PayloadDeserializationError)?,

MessageType::Ok => serde_json::from_str(&json_payload)
.map_err(Error::PayloadDeserializationError)?,

MessageType::Log => serde_json::from_str(&json_payload)
.map_err(Error::PayloadDeserializationError)?,
};

if checksum != create_checksum(&message_type, &payload) {
return Err(Error::ChecksumError);
}

Ok(CloudletMessage {
message_type,
checksum,
payload,
})
}
}
Loading