Skip to content

Commit

Permalink
fix rtc with retty pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Dec 19, 2023
1 parent faa71f8 commit 6b2742d
Show file tree
Hide file tree
Showing 30 changed files with 361 additions and 346 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"stun",
"turn",
]
resolver = "2"

[profile.dev]
opt-level = 0
53 changes: 28 additions & 25 deletions dtls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,31 @@ shared = { path = "../shared", package = "shared", default-features = false, fea
retty = "0.24.0"
bytes = "1.4.0"
byteorder = "1"
rand_core = "0.6.3"
hkdf = "~0.12.1"
p256 = { version = "0.11.1", features = ["default", "ecdh", "ecdsa"] }
p384 = "0.11.2"
rand = "0.8.5"
hmac = "0.12.1"
sec1 = { version = "0.3.0", features = [ "std" ] }
sha1 = "0.10.5"
sha2 = "0.10.6"
aes = "0.6.0"
block-modes = "0.7.0"
aes-gcm = "0.10.1"
ccm = "0.3.0"
x25519-dalek = { version = "2.0.0-rc.2", features = ["static_secrets"] }
x509-parser = "0.13.2"
rand_core = "0.6"
hkdf = "0.12"
p256 = { version = "0.13", features = ["default", "ecdh", "ecdsa"] }
p384 = "0.13"
rand = "0.8"
hmac = "0.12"
sec1 = { version = "0.7", features = [ "std" ] }
sha1 = "0.10"
sha2 = "0.10"
aes = "0.8"
cbc = { version = "0.1", features = [ "block-padding", "alloc"] }
aes-gcm = "0.10"
ccm = "0.5"
x25519-dalek = { version = "2", features = ["static_secrets"] }
x509-parser = "0.15"
der-parser = "8.1"
rcgen = "0.10.0"
ring = "0.16.19"
webpki = "0.21.4"
rustls = { version = "0.19.0", features = ["dangerous_configuration"]}
bincode = "1.3"
serde = { version = "1.0.110", features = ["derive"] }
subtle = "2.4"
log = "0.4.16"
thiserror = "1.0"
pem = { version = "1", optional = true }
rcgen = "0.11"
ring = "0.17"
rustls = { version = "0.21", features = ["dangerous_configuration"]}
bincode = "1"
serde = { version = "1", features = ["derive"] }
subtle = "2"
log = "0.4"
thiserror = "1"
pem = { version = "3", optional = true }

[dev-dependencies]
local-sync = "0.1.0"
Expand All @@ -63,6 +62,10 @@ path = "examples/dtls_echo_server.rs"
name = "dtls_client"
path = "examples/dtls_client.rs"

[[example]]
name = "dtls_client_selfsign"
path = "examples/dtls_client_selfsign.rs"

#[[example]]
#name = "dial_psk"
#path = "examples/dial/psk/dial_psk.rs"
Expand Down
206 changes: 206 additions & 0 deletions dtls/examples/dtls_client_selfsign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use bytes::BytesMut;
use clap::Parser;
use futures::StreamExt;
use std::{io::Write, net::SocketAddr, str::FromStr, time::Instant};

use dtls::config::{ConfigBuilder, ExtendedMasterSecretType};
use dtls::crypto::Certificate;
use dtls::dtls_handlers::dtls_endpoint_handler::DtlsEndpointHandler;

use retty::bootstrap::BootstrapUdpClient;
use retty::channel::{
Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler, Pipeline,
};
use retty::codec::string_codec::TaggedString;
use retty::executor::{yield_local, LocalExecutorBuilder};
use retty::transport::{AsyncTransport, AsyncTransportWrite, TaggedBytesMut, TransportContext};

////////////////////////////////////////////////////////////////////////////////////////////////////

struct EchoDecoder;
struct EchoEncoder;
struct EchoHandler {
decoder: EchoDecoder,
encoder: EchoEncoder,
}

impl EchoHandler {
fn new() -> Self {
EchoHandler {
decoder: EchoDecoder,
encoder: EchoEncoder,
}
}
}

impl InboundHandler for EchoDecoder {
type Rin = TaggedBytesMut;
type Rout = TaggedString;

fn read(&mut self, _ctx: &InboundContext<Self::Rin, Self::Rout>, msg: Self::Rin) {
let message = String::from_utf8(msg.message.to_vec()).unwrap();
println!(
"received back: {} from {:?}",
message, msg.transport.peer_addr
);
}
fn poll_timeout(&mut self, _ctx: &InboundContext<Self::Rin, Self::Rout>, _eto: &mut Instant) {
//last handler, no need to fire_poll_timeout
}
}

impl OutboundHandler for EchoEncoder {
type Win = TaggedString;
type Wout = TaggedBytesMut;

fn write(&mut self, ctx: &OutboundContext<Self::Win, Self::Wout>, msg: Self::Win) {
ctx.fire_write(TaggedBytesMut {
now: msg.now,
transport: msg.transport,
message: BytesMut::from(msg.message.as_bytes()),
});
}
}

impl Handler for EchoHandler {
type Rin = TaggedBytesMut;
type Rout = TaggedString;
type Win = TaggedString;
type Wout = TaggedBytesMut;

fn name(&self) -> &str {
"EchoHandler"
}

fn split(
self,
) -> (
Box<dyn InboundHandler<Rin = Self::Rin, Rout = Self::Rout>>,
Box<dyn OutboundHandler<Win = Self::Win, Wout = Self::Wout>>,
) {
(Box::new(self.decoder), Box::new(self.encoder))
}
}

#[derive(Parser)]
#[command(name = "DTLS Echo Client")]
#[command(author = "Rusty Rain <y@liu.mx>")]
#[command(version = "0.1.0")]
#[command(about = "An example of dtls client", long_about = None)]
struct Cli {
#[arg(short, long)]
debug: bool,
#[arg(long, default_value_t = format!("127.0.0.1"))]
host: String,
#[arg(long, default_value_t = 3489)]
port: u16,
#[arg(long, default_value_t = format!("DEBUG"))]
log_level: String,
}

fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let host = cli.host;
let port = cli.port;
let log_level = log::LevelFilter::from_str(&cli.log_level)?;
if cli.debug {
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{}:{} [{}] {} - {}",
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
record.level(),
chrono::Local::now().format("%H:%M:%S.%6f"),
record.args()
)
})
.filter(None, log_level)
.init();
}

println!("Connecting {}:{}...", host, port);

let transport = TransportContext {
local_addr: SocketAddr::from_str("0.0.0.0:0")?,
peer_addr: SocketAddr::from_str(&format!("{}:{}", host, port))?,
ecn: None,
};

LocalExecutorBuilder::default().run(async move {
let certificate = Certificate::generate_self_signed(vec!["localhost".to_owned()]).unwrap();

let handshake_config = ConfigBuilder::default()
.with_certificates(vec![certificate])
.with_insecure_skip_verify(true)
.with_extended_master_secret(ExtendedMasterSecretType::Require)
.build(true, Some(transport.peer_addr))
.unwrap();

let mut bootstrap = BootstrapUdpClient::new();
bootstrap.pipeline(Box::new(
move |writer: AsyncTransportWrite<TaggedBytesMut>| {
let pipeline: Pipeline<TaggedBytesMut, TaggedString> = Pipeline::new();

let local_addr = writer.get_local_addr();
let peer_addr = writer.get_peer_addr();

let async_transport_handler = AsyncTransport::new(writer);
let dtls_handler = DtlsEndpointHandler::new(
local_addr,
handshake_config.clone(),
true,
peer_addr,
None,
);
let echo_handler = EchoHandler::new();

pipeline.add_back(async_transport_handler);
pipeline.add_back(dtls_handler);
pipeline.add_back(echo_handler);
pipeline.finalize()
},
));

bootstrap.bind(transport.local_addr).await.unwrap();

let pipeline = bootstrap.connect(transport.peer_addr).await.unwrap();
yield_local();

println!("Enter bye to stop");
let (mut tx, mut rx) = futures::channel::mpsc::channel(8);
std::thread::spawn(move || {
let mut buffer = String::new();
while std::io::stdin().read_line(&mut buffer).is_ok() {
match buffer.trim_end() {
"" => break,
line => {
if tx.try_send(line.to_string()).is_err() {
break;
}
if line == "bye" {
break;
}
}
};
buffer.clear();
}
});
while let Some(line) = rx.next().await {
pipeline.write(TaggedString {
now: Instant::now(),
transport,
message: format!("{}\r\n", line),
});
if line == "bye" {
pipeline.close();
break;
}
}

bootstrap.graceful_stop().await;
});

Ok(())
}
22 changes: 11 additions & 11 deletions dtls/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::collections::HashMap;
use std::fmt;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;

/// Config is used to configure a DTLS client or server.
Expand Down Expand Up @@ -334,13 +333,11 @@ impl ConfigBuilder {
insecure_verification: self.insecure_verification,
verify_peer_certificate: self.verify_peer_certificate.take(),
roots_cas: self.roots_cas,
client_cert_verifier: if self.client_auth as u8
>= ClientAuthType::VerifyClientCertIfGiven as u8
{
Some(rustls::AllowAnyAuthenticatedClient::new(self.client_cas))
} else {
None
},
server_cert_verifier: Rc::new(rustls::client::WebPkiVerifier::new(
rustls::RootCertStore::empty(),
None,
)),
client_cert_verifier: None,
retransmit_interval,
initial_epoch: 0,
maximum_transmission_unit,
Expand Down Expand Up @@ -369,8 +366,8 @@ pub struct HandshakeConfig {
pub(crate) insecure_verification: bool,
pub(crate) verify_peer_certificate: Option<VerifyPeerCertificateFn>,
pub(crate) roots_cas: rustls::RootCertStore,
pub(crate) server_cert_verifier: Rc<dyn rustls::ServerCertVerifier>,
pub(crate) client_cert_verifier: Option<Arc<dyn rustls::ClientCertVerifier>>,
pub(crate) server_cert_verifier: Rc<dyn rustls::client::ServerCertVerifier>,
pub(crate) client_cert_verifier: Option<Rc<dyn rustls::server::ClientCertVerifier>>,
pub(crate) retransmit_interval: std::time::Duration,
pub(crate) initial_epoch: u16,
pub(crate) maximum_transmission_unit: usize,
Expand Down Expand Up @@ -420,7 +417,10 @@ impl Default for HandshakeConfig {
insecure_verification: false,
verify_peer_certificate: None,
roots_cas: rustls::RootCertStore::empty(),
server_cert_verifier: Rc::new(rustls::WebPKIVerifier::new()),
server_cert_verifier: Rc::new(rustls::client::WebPkiVerifier::new(
rustls::RootCertStore::empty(),
None,
)),
client_cert_verifier: None,
retransmit_interval: std::time::Duration::from_secs(0),
initial_epoch: 0,
Expand Down
26 changes: 12 additions & 14 deletions dtls/src/crypto/crypto_cbc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,18 @@

// https://github.com/RustCrypto/block-ciphers

use aes::cipher::{block_padding::Pkcs7, BlockDecryptMut, BlockEncryptMut, KeyIvInit};
use p256::elliptic_curve::subtle::ConstantTimeEq;
use rand::Rng;
use std::io::Cursor;
use std::ops::Not;

use crate::content::*;
use crate::prf::*;
use crate::record_layer::record_layer_header::*;
use shared::error::*;

use aes::Aes256;
use block_modes::BlockModeError;
use block_modes::{BlockMode, Cbc};
use rand::Rng;
use subtle::ConstantTimeEq;

use super::padding::DtlsPadding;
type Aes256Cbc = Cbc<Aes256, DtlsPadding>;
type Aes256CbcEnc = cbc::Encryptor<aes::Aes256>;
type Aes256CbcDec = cbc::Decryptor<aes::Aes256>;

// State needed to handle encrypted input/output
#[derive(Clone)]
Expand Down Expand Up @@ -73,8 +69,8 @@ impl CryptoCbc {
let mut iv: Vec<u8> = vec![0; Self::BLOCK_SIZE];
rand::thread_rng().fill(iv.as_mut_slice());

let write_cbc = Aes256Cbc::new_var(&self.local_key, &iv)?;
let encrypted = write_cbc.encrypt_vec(&payload);
let write_cbc = Aes256CbcEnc::new_from_slices(&self.local_key, &iv)?;
let encrypted = write_cbc.encrypt_padded_vec_mut::<Pkcs7>(&payload);

// Prepend unencrypte header with encrypted payload
let mut r = vec![];
Expand Down Expand Up @@ -102,9 +98,11 @@ impl CryptoCbc {
let body = &body[Self::BLOCK_SIZE..];
//TODO: add body.len() check

let read_cbc = Aes256Cbc::new_var(&self.remote_key, iv)?;
let read_cbc = Aes256CbcDec::new_from_slices(&self.remote_key, iv)?;

let decrypted = read_cbc.decrypt_vec(body)?;
let decrypted = read_cbc
.decrypt_padded_vec_mut::<Pkcs7>(body)
.map_err(|_| Error::ErrInvalidPacketLength)?;

let recv_mac = &decrypted[decrypted.len() - Self::MAC_SIZE..];
let decrypted = &decrypted[0..decrypted.len() - Self::MAC_SIZE];
Expand All @@ -118,7 +116,7 @@ impl CryptoCbc {
)?;

if recv_mac.ct_eq(&mac).not().into() {
return Err(BlockModeError.into());
return Err(Error::ErrInvalidMac);
}

let mut d = Vec::with_capacity(RECORD_LAYER_HEADER_SIZE + decrypted.len());
Expand Down
Loading

0 comments on commit 6b2742d

Please sign in to comment.