Skip to content

Commit

Permalink
remove unnecessary thread spawns
Browse files Browse the repository at this point in the history
  • Loading branch information
joshmossas committed Jul 18, 2024
1 parent 82386b8 commit bb64b23
Showing 1 changed file with 100 additions and 118 deletions.
218 changes: 100 additions & 118 deletions tests/clients/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,56 +453,49 @@ mod tests {
#[tokio::test]
async fn stream_messages_test() {
let config = get_config(headers());
let client = Arc::new(TestClient::create(config));
let error_count = Arc::new(Mutex::new(0));
let error_count_ref = Arc::clone(&error_count);
let msg_count = Arc::new(Mutex::new(0));
let msg_count_ref = Arc::clone(&msg_count);
let open_count = Arc::new(Mutex::new(0));
let open_count_ref = Arc::clone(&open_count);
let thread = tokio::spawn(async move {
client
.tests
.stream_messages(
ChatMessageParams {
channel_id: "12345".to_string(),
},
&mut |event, _| match event {
SseEvent::Message(msg) => {
let mut msg_count = msg_count_ref.lock().unwrap();
*msg_count += 1;
match msg {
crate::test_client::ChatMessage::Text { channel_id, .. } => {
assert_eq!(channel_id, "12345".to_string());
}
crate::test_client::ChatMessage::Image { channel_id, .. } => {
assert_eq!(channel_id, "12345".to_string());
}
crate::test_client::ChatMessage::Url { channel_id, .. } => {
assert_eq!(channel_id, "12345".to_string());
}
let client = TestClient::create(config);
let mut error_count = 0;
let mut msg_count = 0;
let mut open_count = 0;
client
.tests
.stream_messages(
ChatMessageParams {
channel_id: "12345".to_string(),
},
&mut |event, controller| match event {
SseEvent::Message(msg) => {
msg_count += 1;
match msg {
crate::test_client::ChatMessage::Text { channel_id, .. } => {
assert_eq!(channel_id, "12345".to_string());
}
crate::test_client::ChatMessage::Image { channel_id, .. } => {
assert_eq!(channel_id, "12345".to_string());
}
crate::test_client::ChatMessage::Url { channel_id, .. } => {
assert_eq!(channel_id, "12345".to_string());
}
}
SseEvent::Error { .. } => {
let mut error_count = error_count_ref.lock().unwrap();
*error_count += 1;
}
SseEvent::Open => {
let mut open_count = open_count_ref.lock().unwrap();
*open_count += 1;
if msg_count >= 20 {
controller.abort();
}
SseEvent::Close => {}
},
None,
None,
)
.await
});
tokio::time::sleep(Duration::from_millis(1000)).await;
thread.abort();
assert_eq!(*open_count.lock().unwrap(), 1);
assert!(*msg_count.lock().unwrap() > 0,);
assert_eq!(*error_count.lock().unwrap(), 0);
}
SseEvent::Error { .. } => {
error_count += 1;
}
SseEvent::Open => {
open_count += 1;
}
SseEvent::Close => {}
},
None,
None,
)
.await;
assert_eq!(open_count, 1);
assert_eq!(msg_count, 20);
assert_eq!(error_count, 0);
}

#[tokio::test]
Expand Down Expand Up @@ -560,84 +553,73 @@ mod tests {
#[tokio::test]
async fn stream_auto_reconnect_test() {
let config = get_config(headers());
let client = Arc::new(TestClient::create(config));
let open_count = Arc::new(Mutex::new(0));
let open_count_ref = Arc::clone(&open_count);
let msg_count = Arc::new(Mutex::new(0));
let msg_count_ref = Arc::clone(&msg_count);
let thread = tokio::spawn(async move {
client
.tests
.stream_auto_reconnect(
AutoReconnectParams { message_count: 10 },
&mut |event, _| match event {
SseEvent::Message(_) => {
let mut msg_count = msg_count_ref.lock().unwrap();
*msg_count += 1;
}
SseEvent::Error(_) => {}
SseEvent::Open => {
let mut open_count = open_count_ref.lock().unwrap();
*open_count += 1;
let client = TestClient::create(config);
let mut open_count = 0;
let mut msg_count = 0;
client
.tests
.stream_auto_reconnect(
AutoReconnectParams { message_count: 10 },
&mut |event, controller| match event {
SseEvent::Message(_) => {
msg_count += 1;
}
SseEvent::Error(_) => {}
SseEvent::Open => {
open_count += 1;
if open_count >= 5 {
controller.abort();
}
SseEvent::Close => {}
},
None,
None,
)
.await;
});
tokio::time::sleep(Duration::from_millis(5000)).await;
thread.abort();
assert!(*open_count.lock().unwrap() > 1);
assert!(*msg_count.lock().unwrap() > 10);
}
SseEvent::Close => {}
},
None,
None,
)
.await;
assert_eq!(open_count, 5);
assert!(msg_count > 10);
}

#[tokio::test]
async fn stream_connection_error_test_test() {
let config = get_config(headers());
let client = Arc::new(TestClient::create(config));
let open_count = Arc::new(Mutex::new(0));
let open_count_ref = Arc::clone(&open_count);
let error_count = Arc::new(Mutex::new(0));
let error_count_ref = Arc::clone(&error_count);
let msg_count = Arc::new(Mutex::new(0));
let msg_count_ref = Arc::clone(&msg_count);
let thread = tokio::spawn(async move {
client
.tests
.stream_connection_error_test(
StreamConnectionErrorTestParams {
status_code: 411,
status_message: "Invalid request".to_string(),
},
&mut |event, _| match event {
SseEvent::Message(_) => {
let mut msg_count = msg_count_ref.lock().unwrap();
*msg_count += 1;
}
SseEvent::Error(err) => {
assert_eq!(err.code, 411);
assert_eq!(err.message, "Invalid request".to_string());
let mut err_count = error_count_ref.lock().unwrap();
*err_count += 1;
}
SseEvent::Open => {
let mut open_count = open_count_ref.lock().unwrap();
*open_count += 1;
let client = TestClient::create(config);
let mut open_count = 0;
let mut error_count = 0;
let mut msg_count = 0;

client
.tests
.stream_connection_error_test(
StreamConnectionErrorTestParams {
status_code: 411,
status_message: "Invalid request".to_string(),
},
&mut |event, controller| match event {
SseEvent::Message(_) => {
msg_count += 1;
}
SseEvent::Error(err) => {
assert_eq!(err.code, 411);
assert_eq!(err.message, "Invalid request".to_string());
error_count += 1;
if error_count >= 5 {
controller.abort();
}
SseEvent::Close => {}
},
None,
None,
)
.await
});
tokio::time::sleep(Duration::from_millis(3000)).await;
thread.abort();
assert!(*error_count.lock().unwrap() > 1);
assert!(*open_count.lock().unwrap() > 1);
assert!(*msg_count.lock().unwrap() == 0);
}
SseEvent::Open => {
open_count += 1;
}
SseEvent::Close => {}
},
None,
None,
)
.await;
assert!(error_count > 5);
assert!(open_count > 5);
assert!(msg_count == 0);
}

#[tokio::test]
Expand Down

0 comments on commit bb64b23

Please sign in to comment.