Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: adjust naming in internal session functions #1156

Merged
merged 4 commits into from
Dec 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 74 additions & 68 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl Default for SessionConfig {
}
}

pub(crate) enum RunQueryResult<ResT> {
pub(crate) enum RunRequestResult<ResT> {
IgnoredWriteError,
Completed(ResT),
}
Expand Down Expand Up @@ -1203,8 +1203,8 @@ where

let span = RequestSpan::new_query(&query.contents);
let span_ref = &span;
let run_query_result = self
.run_query(
let run_request_result = self
.run_request(
statement_info,
&query.config,
execution_profile,
Expand Down Expand Up @@ -1257,13 +1257,13 @@ where
.instrument(span.span().clone())
.await?;

let response = match run_query_result {
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
let response = match run_request_result {
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
response: NonErrorResponse::Result(result::Result::Void),
tracing_id: None,
warnings: Vec::new(),
},
RunQueryResult::Completed(response) => response,
RunRequestResult::Completed(response) => response,
};

self.handle_set_keyspace_response(&response).await?;
Expand Down Expand Up @@ -1526,8 +1526,8 @@ where
}
}

let run_query_result: RunQueryResult<NonErrorQueryResponse> = self
.run_query(
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
.run_request(
statement_info,
&prepared.config,
execution_profile,
Expand Down Expand Up @@ -1558,13 +1558,13 @@ where
.instrument(span.span().clone())
.await?;

let response = match run_query_result {
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
let response = match run_request_result {
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
response: NonErrorResponse::Result(result::Result::Void),
tracing_id: None,
warnings: Vec::new(),
},
RunQueryResult::Completed(response) => response,
RunRequestResult::Completed(response) => response,
};

self.handle_set_keyspace_response(&response).await?;
Expand Down Expand Up @@ -1650,8 +1650,8 @@ where

let span = RequestSpan::new_batch();

let run_query_result = self
.run_query(
let run_request_result = self
.run_request(
statement_info,
&batch.config,
execution_profile,
Expand All @@ -1678,9 +1678,9 @@ where
.instrument(span.span().clone())
.await?;

let result = match run_query_result {
RunQueryResult::IgnoredWriteError => QueryResult::mock_empty(),
RunQueryResult::Completed(result) => {
let result = match run_request_result {
RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
RunRequestResult::Completed(result) => {
span.record_result_fields(&result);
result
}
Expand Down Expand Up @@ -1916,26 +1916,27 @@ where
Ok(Some(tracing_info))
}

// This method allows to easily run a query using load balancing, retry policy etc.
// Requires some information about the query and a closure.
// The closure is used to do the query itself on a connection.
// - query will use connection.query()
// - execute will use connection.execute()
// If this query closure fails with some errors retry policy is used to perform retries
// On success this query's result is returned
/// This method allows to easily run a request using load balancing, retry policy etc.
/// Requires some information about the request and a closure.
/// The closure is used to execute the request once on a chosen connection.
/// - query will use connection.query()
/// - execute will use connection.execute()
///
/// If this closure fails with some errors, retry policy is used to perform retries.
/// On success, this request's result is returned.
// I tried to make this closures take a reference instead of an Arc but failed
// maybe once async closures get stabilized this can be fixed
async fn run_query<'a, QueryFut, ResT>(
async fn run_request<'a, QueryFut, ResT>(
&'a self,
statement_info: RoutingInfo<'a>,
statement_config: &'a StatementConfig,
execution_profile: Arc<ExecutionProfileInner>,
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
request_span: &'a RequestSpan,
) -> Result<RunQueryResult<ResT>, QueryError>
) -> Result<RunRequestResult<ResT>, QueryError>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
ResT: AllowedRunQueryResTType,
ResT: AllowedRunRequestResTType,
{
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
statement_config
Expand All @@ -1947,11 +1948,11 @@ where

let runner = async {
let cluster_data = self.cluster.get_data();
let query_plan =
let request_plan =
load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data);

// If a speculative execution policy is used to run query, query_plan has to be shared
// between different async functions. This struct helps to wrap query_plan in mutex so it
// If a speculative execution policy is used to run request, request_plan has to be shared
// between different async functions. This struct helps to wrap request_plan in mutex so it
// can be shared safely.
struct SharedPlan<'a, I>
where
Expand Down Expand Up @@ -1980,11 +1981,11 @@ where

match speculative_policy {
Some(speculative) if statement_config.is_idempotent => {
let shared_query_plan = SharedPlan {
iter: std::sync::Mutex::new(query_plan),
let shared_request_plan = SharedPlan {
iter: std::sync::Mutex::new(request_plan),
};

let execute_query_generator = |is_speculative: bool| {
let request_runner_generator = |is_speculative: bool| {
let history_data: Option<HistoryData> = history_listener_and_id
.as_ref()
.map(|(history_listener, query_id)| {
Expand All @@ -2005,11 +2006,11 @@ where
request_span.inc_speculative_executions();
}

self.execute_query(
&shared_query_plan,
&do_query,
self.run_request_speculative_fiber(
&shared_request_plan,
&run_request_once,
&execution_profile,
ExecuteQueryContext {
ExecuteRequestContext {
is_idempotent: statement_config.is_idempotent,
consistency_set_on_statement: statement_config.consistency,
retry_session: retry_policy.new_session(),
Expand All @@ -2027,7 +2028,7 @@ where
speculative_execution::execute(
speculative.as_ref(),
&context,
execute_query_generator,
request_runner_generator,
)
.await
}
Expand All @@ -2040,11 +2041,11 @@ where
query_id: *query_id,
speculative_id: None,
});
self.execute_query(
query_plan,
&do_query,
self.run_request_speculative_fiber(
request_plan,
&run_request_once,
&execution_profile,
ExecuteQueryContext {
ExecuteRequestContext {
is_idempotent: statement_config.is_idempotent,
consistency_set_on_statement: statement_config.consistency,
retry_session: retry_policy.new_session(),
Expand Down Expand Up @@ -2085,24 +2086,29 @@ where
result
}

async fn execute_query<'a, QueryFut, ResT>(
/// Executes the closure `run_request_once`, provided the load balancing plan and some information
/// about the request, including retry session.
/// If request fails, retry session is used to perform retries.
///
/// Returns None, if provided plan is empty.
async fn run_request_speculative_fiber<'a, QueryFut, ResT>(
&'a self,
query_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
request_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
execution_profile: &ExecutionProfileInner,
mut context: ExecuteQueryContext<'a>,
) -> Option<Result<RunQueryResult<ResT>, QueryError>>
mut context: ExecuteRequestContext<'a>,
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
ResT: AllowedRunQueryResTType,
ResT: AllowedRunRequestResTType,
{
let mut last_error: Option<QueryError> = None;
let mut current_consistency: Consistency = context
.consistency_set_on_statement
.unwrap_or(execution_profile.consistency);

'nodes_in_plan: for (node, shard) in query_plan {
let span = trace_span!("Executing query", node = %node.address);
'nodes_in_plan: for (node, shard) in request_plan {
let span = trace_span!("Executing request", node = %node.address);
'same_node_retries: loop {
trace!(parent: &span, "Execution started");
let connection = match node.connection_for_shard(shard).await {
Expand All @@ -2114,14 +2120,14 @@ where
"Choosing connection failed"
);
last_error = Some(e.into());
// Broken connection doesn't count as a failed query, don't log in metrics
// Broken connection doesn't count as a failed request, don't log in metrics
continue 'nodes_in_plan;
}
};
context.request_span.record_shard_id(&connection);

self.metrics.inc_total_nonpaged_queries();
let query_start = std::time::Instant::now();
let request_start = std::time::Instant::now();

trace!(
parent: &span,
Expand All @@ -2130,29 +2136,29 @@ where
);
let attempt_id: Option<history::AttemptId> =
context.log_attempt_start(connection.get_connect_address());
let query_result: Result<ResT, QueryError> =
do_query(connection, current_consistency, execution_profile)
let request_result: Result<ResT, QueryError> =
run_request_once(connection, current_consistency, execution_profile)
.instrument(span.clone())
.await;

let elapsed = query_start.elapsed();
last_error = match query_result {
let elapsed = request_start.elapsed();
last_error = match request_result {
Ok(response) => {
trace!(parent: &span, "Query succeeded");
trace!(parent: &span, "Request succeeded");
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
context.log_attempt_success(&attempt_id);
execution_profile.load_balancing_policy.on_query_success(
context.query_info,
elapsed,
node,
);
return Some(Ok(RunQueryResult::Completed(response)));
return Some(Ok(RunRequestResult::Completed(response)));
}
Err(e) => {
trace!(
parent: &span,
last_error = %e,
"Query failed"
"Request failed"
);
self.metrics.inc_failed_nonpaged_queries();
execution_profile.load_balancing_policy.on_query_failure(
Expand Down Expand Up @@ -2195,7 +2201,7 @@ where
RetryDecision::DontRetry => break 'nodes_in_plan,

RetryDecision::IgnoreWriteError => {
return Some(Ok(RunQueryResult::IgnoredWriteError))
return Some(Ok(RunRequestResult::IgnoredWriteError))
}
};
}
Expand Down Expand Up @@ -2243,22 +2249,22 @@ where
}
}

// run_query, execute_query, etc have a template type called ResT.
// run_request, run_request_speculative_fiber, etc have a template type called ResT.
// There was a bug where ResT was set to QueryResponse, which could
// be an error response. This was not caught by retry policy which
// assumed all errors would come from analyzing Result<ResT, QueryError>.
// This trait is a guard to make sure that this mistake doesn't
// happen again.
// When using run_query make sure that the ResT type is NOT able
// When using run_request make sure that the ResT type is NOT able
// to contain any errors.
// See https://github.com/scylladb/scylla-rust-driver/issues/501
pub(crate) trait AllowedRunQueryResTType {}
pub(crate) trait AllowedRunRequestResTType {}

impl AllowedRunQueryResTType for Uuid {}
impl AllowedRunQueryResTType for QueryResult {}
impl AllowedRunQueryResTType for NonErrorQueryResponse {}
impl AllowedRunRequestResTType for Uuid {}
impl AllowedRunRequestResTType for QueryResult {}
impl AllowedRunRequestResTType for NonErrorQueryResponse {}

struct ExecuteQueryContext<'a> {
struct ExecuteRequestContext<'a> {
is_idempotent: bool,
consistency_set_on_statement: Option<Consistency>,
retry_session: Box<dyn RetrySession>,
Expand All @@ -2273,7 +2279,7 @@ struct HistoryData<'a> {
speculative_id: Option<history::SpeculativeId>,
}

impl ExecuteQueryContext<'_> {
impl ExecuteRequestContext<'_> {
fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
self.history_data.as_ref().map(|hd| {
hd.listener
Expand Down
Loading