Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: seaprate blocking methods #72

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions relay_client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,13 @@ impl Client {
/// when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still experimental?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't thought about it, but we've changed the API a couple of times already. So yeah, I'd consider it experimental still.

/// future.
pub async fn subscribe_blocking(&self, topic: Topic) -> Response<rpc::Subscribe> {
self.request(rpc::Subscribe { topic, block: true }).await
pub async fn subscribe_blocking(&self, topic: Topic) -> Response<rpc::SubscribeBlocking> {
self.request(rpc::SubscribeBlocking { topic }).await
}

/// Unsubscribes from a topic.
pub async fn unsubscribe(
&self,
topic: Topic,
subscription_id: SubscriptionId,
) -> Response<rpc::Unsubscribe> {
self.request(rpc::Unsubscribe {
topic,
subscription_id,
})
.await
pub async fn unsubscribe(&self, topic: Topic) -> Response<rpc::Unsubscribe> {
self.request(rpc::Unsubscribe { topic }).await
}

/// Fetch mailbox messages for a specific topic.
Expand Down Expand Up @@ -265,12 +257,18 @@ impl Client {
pub async fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> Response<rpc::BatchSubscribe> {
self.request(rpc::BatchSubscribe {
topics: topics.into(),
block: true,
})
.await
) -> Result<
Vec<Result<SubscriptionId, Error<rpc::SubscriptionError>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return SubscriptionId still? Maybe change to ()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess doesn't hurt to have in the future if needed

Error<rpc::SubscriptionError>,
> {
Ok(self
.request(rpc::BatchSubscribeBlocking {
topics: topics.into(),
})
.await?
.into_iter()
.map(crate::convert_subscription_result)
.collect())
}

/// Unsubscribes from multiple topics.
Expand Down
13 changes: 12 additions & 1 deletion relay_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use {
::http::HeaderMap,
relay_rpc::{
auth::{SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS},
domain::{MessageId, ProjectId},
domain::{MessageId, ProjectId, SubscriptionId},
rpc::{SubscriptionError, SubscriptionResult},
user_agent::UserAgent,
},
serde::Serialize,
Expand Down Expand Up @@ -170,6 +171,16 @@ impl Default for MessageIdGenerator {
}
}

#[inline]
fn convert_subscription_result(
res: SubscriptionResult,
) -> Result<SubscriptionId, error::Error<SubscriptionError>> {
match res {
SubscriptionResult::Id(id) => Ok(id),
SubscriptionResult::Error(err) => Err(ClientError::from(err).into()),
}
}

#[cfg(test)]
mod tests {
use {
Expand Down
43 changes: 26 additions & 17 deletions relay_client/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
use {
self::connection::{connection_event_loop, ConnectionControl},
crate::{error::ClientError, ConnectionOptions},
crate::{
error::{ClientError, Error},
ConnectionOptions,
},
relay_rpc::{
domain::{MessageId, SubscriptionId, Topic},
rpc::{
BatchFetchMessages,
BatchReceiveMessages,
BatchSubscribe,
BatchSubscribeBlocking,
BatchUnsubscribe,
FetchMessages,
Publish,
Receipt,
Subscribe,
SubscribeBlocking,
Subscription,
SubscriptionError,
Unsubscribe,
},
},
std::{sync::Arc, time::Duration},
std::{future::Future, sync::Arc, time::Duration},
tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
Expand Down Expand Up @@ -182,24 +188,17 @@ impl Client {
/// when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
/// future.
pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<Subscribe> {
let (request, response) = create_request(Subscribe { topic, block: true });
pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<SubscribeBlocking> {
let (request, response) = create_request(SubscribeBlocking { topic });

self.request(request);

response
}

/// Unsubscribes from a topic.
pub fn unsubscribe(
&self,
topic: Topic,
subscription_id: SubscriptionId,
) -> EmptyResponseFuture<Unsubscribe> {
let (request, response) = create_request(Unsubscribe {
topic,
subscription_id,
});
pub fn unsubscribe(&self, topic: Topic) -> EmptyResponseFuture<Unsubscribe> {
let (request, response) = create_request(Unsubscribe { topic });

self.request(request);

Expand Down Expand Up @@ -240,15 +239,25 @@ impl Client {
pub fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> ResponseFuture<BatchSubscribe> {
let (request, response) = create_request(BatchSubscribe {
) -> impl Future<
Output = Result<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit ugly, I would use a type alias for this. clippy should have noticed this 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't the function just async?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the function async would capture self reference for the lifetime of that future, which I wanted to avoid initially. Now why would I want to avoid capturing self is another question. Maybe in the initial implementation there were some methods that required &mut self, so I didn't want to mix immutable and mutable borrowing, not sure. But yeah, now that the client API is stable-ish, we could just convert these methods to be async and return anonymous futures.

Vec<Result<SubscriptionId, Error<SubscriptionError>>>,
Error<SubscriptionError>,
>,
> {
let (request, response) = create_request(BatchSubscribeBlocking {
topics: topics.into(),
block: true,
});

self.request(request);

response
async move {
Ok(response
.await?
.into_iter()
.map(crate::convert_subscription_result)
.collect())
}
}

/// Unsubscribes from multiple topics.
Expand Down
100 changes: 86 additions & 14 deletions relay_rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl ErrorResponse {
}

/// Data structure representing error response params.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct ErrorData {
/// Error code.
pub code: i32,
Expand All @@ -215,7 +215,9 @@ pub enum SubscriptionError {
SubscriberLimitExceeded,
}

/// Data structure representing subscribe request params.
/// Subscription request parameters. This request does not require the
/// subscription to be fully processed, and returns as soon as the server
/// receives it.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Subscribe {
/// The topic to subscribe to.
Expand Down Expand Up @@ -244,15 +246,36 @@ impl ServiceRequest for Subscribe {
}
}

/// Subscription request parameters. This request awaits the subscription to be
/// fully processed and returns possible errors.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SubscribeBlocking {
/// The topic to subscribe to.
pub topic: Topic,
}

impl ServiceRequest for SubscribeBlocking {
type Error = SubscriptionError;
type Response = SubscriptionId;

fn validate(&self) -> Result<(), PayloadError> {
self.topic
.decode()
.map_err(|_| PayloadError::InvalidTopic)?;

Ok(())
}

fn into_params(self) -> Params {
Params::SubscribeBlocking(self)
}
}

/// Data structure representing unsubscribe request params.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Unsubscribe {
/// The topic to unsubscribe from.
pub topic: Topic,

/// The id of the subscription to unsubscribe from.
#[serde(rename = "id")]
pub subscription_id: SubscriptionId,
}

impl ServiceRequest for Unsubscribe {
Expand Down Expand Up @@ -317,7 +340,9 @@ pub struct FetchResponse {
pub has_more: bool,
}

/// Multi-topic subscription request parameters.
/// Multi-topic subscription request parameters. This request does not require
/// all subscriptions to be fully processed, and returns as soon as the server
/// receives it.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchSubscribe {
/// The topics to subscribe to.
Expand All @@ -329,12 +354,9 @@ pub struct BatchSubscribe {
pub block: bool,
}

impl ServiceRequest for BatchSubscribe {
type Error = SubscriptionError;
type Response = Vec<SubscriptionId>;

fn validate(&self) -> Result<(), PayloadError> {
let batch_size = self.topics.len();
impl BatchSubscribe {
fn validate_topics(topics: &[Topic]) -> Result<(), PayloadError> {
let batch_size = topics.len();

if batch_size == 0 {
return Err(PayloadError::BatchEmpty);
Expand All @@ -344,18 +366,55 @@ impl ServiceRequest for BatchSubscribe {
return Err(PayloadError::BatchLimitExceeded);
}

for topic in &self.topics {
for topic in topics {
topic.decode().map_err(|_| PayloadError::InvalidTopic)?;
}

Ok(())
}
}

impl ServiceRequest for BatchSubscribe {
type Error = SubscriptionError;
type Response = Vec<SubscriptionId>;

fn validate(&self) -> Result<(), PayloadError> {
Self::validate_topics(&self.topics)
}

fn into_params(self) -> Params {
Params::BatchSubscribe(self)
}
}

/// Multi-topic subscription request parameters. This request awaits all
/// subscriptions to be fully processed and returns possible errors per topic.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchSubscribeBlocking {
/// The topics to subscribe to.
pub topics: Vec<Topic>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SubscriptionResult {
Id(SubscriptionId),
Error(ErrorData),
}

impl ServiceRequest for BatchSubscribeBlocking {
type Error = SubscriptionError;
type Response = Vec<SubscriptionResult>;

fn validate(&self) -> Result<(), PayloadError> {
BatchSubscribe::validate_topics(&self.topics)
}

fn into_params(self) -> Params {
Params::BatchSubscribeBlocking(self)
}
}

/// Multi-topic unsubscription request parameters.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchUnsubscribe {
Expand Down Expand Up @@ -696,6 +755,10 @@ pub enum Params {
#[serde(rename = "irn_subscribe", alias = "iridium_subscribe")]
Subscribe(Subscribe),

/// Parameters to blocking subscribe.
#[serde(rename = "irn_subscribeBlocking", alias = "iridium_subscribeBlocking")]
SubscribeBlocking(SubscribeBlocking),

/// Parameters to unsubscribe.
#[serde(rename = "irn_unsubscribe", alias = "iridium_unsubscribe")]
Unsubscribe(Unsubscribe),
Expand All @@ -708,6 +771,13 @@ pub enum Params {
#[serde(rename = "irn_batchSubscribe", alias = "iridium_batchSubscribe")]
BatchSubscribe(BatchSubscribe),

/// Parameters to blocking batch subscribe.
#[serde(
rename = "irn_batchSubscribeBlocking",
alias = "iridium_batchSubscribeBlocking"
)]
BatchSubscribeBlocking(BatchSubscribeBlocking),

/// Parameters to batch unsubscribe.
#[serde(rename = "irn_batchUnsubscribe", alias = "iridium_batchUnsubscribe")]
BatchUnsubscribe(BatchUnsubscribe),
Expand Down Expand Up @@ -779,9 +849,11 @@ impl Request {

match &self.params {
Params::Subscribe(params) => params.validate(),
Params::SubscribeBlocking(params) => params.validate(),
Params::Unsubscribe(params) => params.validate(),
Params::FetchMessages(params) => params.validate(),
Params::BatchSubscribe(params) => params.validate(),
Params::BatchSubscribeBlocking(params) => params.validate(),
Params::BatchUnsubscribe(params) => params.validate(),
Params::BatchFetchMessages(params) => params.validate(),
Params::Publish(params) => params.validate(),
Expand Down
Loading
Loading