Skip to content

Commit

Permalink
feat(lwt): add lightweight transaction (LWT) support (#29)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Boll <danielboll.academico@gmail.com>
  • Loading branch information
Daniel-Boll authored Jul 29, 2024
1 parent 08e4764 commit e2d11a7
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 23 deletions.
18 changes: 18 additions & 0 deletions examples/lwt.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Cluster, Consistency, Query, SerialConsistency } from "../index.js"

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

const cluster = new Cluster({ nodes });

const session = await cluster.connect();

await session.execute("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}");
await session.execute("CREATE TABLE IF NOT EXISTS examples_ks.tab (a int PRIMARY KEY)");

const query = new Query("INSERT INTO examples_ks.tab (a) VALUES(?) IF NOT EXISTS");
query.setConsistency(Consistency.One);
query.setSerialConsistency(SerialConsistency.Serial);

await session.execute(query, [12345]);

console.log("Ok.");
2 changes: 1 addition & 1 deletion examples/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"extends": "../tsconfig.json",
"compilerOptions": {
"module": "ESNext",
"module": "NodeNext",
"target": "ESNext",
"moduleResolution": "NodeNext"
}
Expand Down
11 changes: 10 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ export const enum Consistency {
Serial = 8,
LocalSerial = 9
}
export const enum SerialConsistency {
Serial = 8,
LocalSerial = 9
}
export interface ExecutionProfile {
consistency?: Consistency
serialConsistency?: SerialConsistency
requestTimeout?: number
}
export interface ConnectionOptions {
Expand Down Expand Up @@ -69,9 +74,13 @@ export class Cluster {
export type ScyllaQuery = Query
export class Query {
constructor(query: string)
setConsistency(consistency: Consistency): void
setSerialConsistency(serialConsistency: SerialConsistency): void
setPageSize(pageSize: number): void
}
export class ScyllaPreparedStatement {
setConsistency(consistency: Consistency): void
setSerialConsistency(serialConsistency: SerialConsistency): void
}
export class Metrics {
/** Returns counter for nonpaged queries */
Expand All @@ -95,7 +104,7 @@ export class Metrics {
}
export class ScyllaSession {
metrics(): Metrics
execute(query: string | ScyllaPreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
execute(query: string | Query | ScyllaPreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
query(scyllaQuery: Query, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
prepare(query: string): Promise<ScyllaPreparedStatement>
/**
Expand Down
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,11 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}

const { Compression, Consistency, Cluster, VerifyMode, Query, ScyllaPreparedStatement, Metrics, ScyllaSession, Uuid } = nativeBinding
const { Compression, Consistency, SerialConsistency, Cluster, VerifyMode, Query, ScyllaPreparedStatement, Metrics, ScyllaSession, Uuid } = nativeBinding

module.exports.Compression = Compression
module.exports.Consistency = Consistency
module.exports.SerialConsistency = SerialConsistency
module.exports.Cluster = Cluster
module.exports.VerifyMode = VerifyMode
module.exports.Query = Query
Expand Down
5 changes: 5 additions & 0 deletions src/cluster/execution_profile/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
pub mod consistency;
pub mod serial_consistency;

use self::consistency::Consistency;
use self::serial_consistency::SerialConsistency;

#[napi(object)]
#[derive(Copy, Clone)]
pub struct ExecutionProfile {
pub consistency: Option<Consistency>,
pub serial_consistency: Option<SerialConsistency>,
pub request_timeout: Option<u32>,
}

Expand All @@ -17,6 +20,8 @@ impl ExecutionProfile {
ec_builder = ec_builder.consistency(consistency.into());
}

ec_builder = ec_builder.serial_consistency(self.serial_consistency.map(|sc| sc.into()));

if let Some(request_timeout) = self.request_timeout {
ec_builder =
ec_builder.request_timeout(Some(std::time::Duration::from_secs(request_timeout.into())));
Expand Down
23 changes: 23 additions & 0 deletions src/cluster/execution_profile/serial_consistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#[napi]
pub enum SerialConsistency {
Serial = 0x0008,
LocalSerial = 0x0009,
}

impl From<SerialConsistency> for scylla::statement::SerialConsistency {
fn from(value: SerialConsistency) -> Self {
match value {
SerialConsistency::Serial => Self::Serial,
SerialConsistency::LocalSerial => Self::LocalSerial,
}
}
}

impl From<scylla::statement::SerialConsistency> for SerialConsistency {
fn from(value: scylla::statement::SerialConsistency) -> Self {
match value {
scylla::statement::SerialConsistency::Serial => Self::Serial,
scylla::statement::SerialConsistency::LocalSerial => Self::LocalSerial,
}
}
}
29 changes: 17 additions & 12 deletions src/query/scylla_prepared_statement.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
use scylla::prepared_statement::PreparedStatement;

use crate::cluster::execution_profile::consistency::Consistency;
use crate::cluster::execution_profile::{
consistency::Consistency, serial_consistency::SerialConsistency,
};

#[napi]
pub struct ScyllaPreparedStatement {
pub (crate) prepared: PreparedStatement,
pub(crate) prepared: PreparedStatement,
}

#[napi]
impl ScyllaPreparedStatement {
pub fn new(prepared: PreparedStatement) -> Self {
Self { prepared }
}

pub fn new(prepared: PreparedStatement) -> Self {
Self {
prepared
}
}
#[napi]
pub fn set_consistency(&mut self, consistency: Consistency) {
self.prepared.set_consistency(consistency.into());
}

#[napi]
pub fn set_consistency(&mut self, consistency: Consistency) {
self.prepared.set_consistency(consistency.into());
}
#[napi]
pub fn set_serial_consistency(&mut self, serial_consistency: SerialConsistency) {
self
.prepared
.set_serial_consistency(Some(serial_consistency.into()));
}
}

15 changes: 13 additions & 2 deletions src/query/scylla_query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::fmt::Display;

use crate::cluster::execution_profile::{
consistency::Consistency, serial_consistency::SerialConsistency,
};
use scylla::query::Query;
use scylla::statement::Consistency;

#[napi(js_name = "Query")]
pub struct ScyllaQuery {
Expand All @@ -23,10 +25,19 @@ impl ScyllaQuery {
}
}

#[napi]
pub fn set_consistency(&mut self, consistency: Consistency) {
self.query.set_consistency(consistency);
self.query.set_consistency(consistency.into());
}

#[napi]
pub fn set_serial_consistency(&mut self, serial_consistency: SerialConsistency) {
self
.query
.set_serial_consistency(Some(serial_consistency.into()));
}

#[napi]
pub fn set_page_size(&mut self, page_size: i32) {
self.query.set_page_size(page_size);
}
Expand Down
14 changes: 8 additions & 6 deletions src/session/scylla_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::helpers::query_results::QueryResult;
use crate::query::scylla_prepared_statement::ScyllaPreparedStatement;
use crate::query::scylla_query::ScyllaQuery;
use crate::types::uuid::Uuid;
use napi::bindgen_prelude::{Either, Either3};
use napi::bindgen_prelude::Either3;

use super::metrics;

Expand All @@ -26,7 +26,7 @@ impl ScyllaSession {
#[napi]
pub async fn execute(
&self,
query: Either<String, &ScyllaPreparedStatement>,
query: Either3<String, &ScyllaQuery, &ScyllaPreparedStatement>,
parameters: Option<Vec<Either3<u32, String, &Uuid>>>,
) -> napi::Result<serde_json::Value> {
let values = QueryParameter::parser(parameters.clone()).ok_or(napi::Error::new(
Expand All @@ -35,13 +35,15 @@ impl ScyllaSession {
))?;

let query_result = match query.clone() {
Either::A(query) => self.session.query(query, values).await,
Either::B(prepared) => self.session.execute(&prepared.prepared, values).await,
Either3::A(query) => self.session.query(query, values).await,
Either3::B(query) => self.session.query(query.query.clone(), values).await,
Either3::C(prepared) => self.session.execute(&prepared.prepared, values).await,
}
.map_err(|_| {
let query = match query {
Either::A(query) => query,
Either::B(prepared) => prepared.prepared.get_statement().to_string(),
Either3::A(query) => query,
Either3::B(query) => query.query.contents.clone(),
Either3::C(prepared) => prepared.prepared.get_statement().to_string(),
};

napi::Error::new(
Expand Down

0 comments on commit e2d11a7

Please sign in to comment.