Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support the usage of com.aerospike/aerospike-client with versions 4.9 and up #70

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### [4.0.0] - 2024-01-14

#### Changed

* Enable the usage of `com.aerospike/aerospike-client` with versions 4.9 and up.
* Introduce a `aerospike-clj.batch-client` namespace.
This namespace extends the `aerospike-clj.client` namespace with batch operations using the `AerospikeBatchOps`
protocol, which is relevant for Java client library versions 6.0.0 and up.
* Introduce a `aerospike-clj.batch-policy` namespace.
This namespace contains functions to create batch policies for the Aerospike Java client library.

#### Removed

* Remove `:result-code` from the `get-batch`'s response.

### [3.1.0] - 2023-08-22

#### Added
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ An opinionated Clojure library wrapping Aerospike Java Client.

# Requirements
- Java 8
- Clojure 1.8
- Aerospike server version >= `4.9.0`
- Clojure version >= `1.11.0`

Expand Down Expand Up @@ -111,6 +110,22 @@ user=> @(aero/get-single c "index" "set-name")
Aerospike returns a TTL on the queried records that is epoch style, but with a different "beginning of time" which is "2010-01-01T00:00:00Z".
Call `expiry-unix` with the returned TTL to get a TTL relative to the UNIX epoch.

### Using the `AerospikeBatchOps` protocol

Since library version `4.0.0`, the implementation of the `AerospikeBatchOps` protocol is available as an extension to
the `aerospike-clj.client` namespace.
The extension is available via the `aerospike-clj.batch-client` namespace.
To use it, require the `aerospike-clj.batch-client` namespace.
The implementation of the `AerospikeBatchOps` protocol is implemented in a separate namespace to allow older versions of
the `com.aerospike/aerospike-client` dependency to be used.
Please note that the `aerospike-clj.batch-client` namespace requires the `com.aerospike/aerospike-client` dependency to
be version `6.0.0` or higher.

### Setting up a client policy for `com.aerospike/aerospike-client` with version 6.0.0 and above

The `com.aerospike/aerospike-client` dependency version `6.0.0` is a breaking change.
To set batch operation policies, please use the `aerospike-clj.batch-policy` namespace.

## Testing
### Unit tests
Executed via running `lein test`.
Expand Down
4 changes: 2 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/aerospike-clj "3.1.0"
(defproject com.appsflyer/aerospike-clj "4.0.0-SNAPSHOT"
:description "An Aerospike Clojure client."
:url "https://github.com/AppsFlyer/aerospike-clj"
:license {:name "Eclipse Public License"
Expand All @@ -23,7 +23,7 @@
[cheshire "5.11.0"]
[tortue/spy "2.14.0"]
[com.fasterxml.jackson.core/jackson-databind "2.11.2"]
[clj-kondo "2023.09.07"]
[clj-kondo "2023.12.15"]
[com.clojure-goes-fast/clj-java-decompiler "0.3.4"]]
:eftest {:multithread? false
:report eftest.report.junit/report
Expand Down
51 changes: 51 additions & 0 deletions src/main/clojure/aerospike_clj/batch_client.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
(ns aerospike-clj.batch-client
"This namespace contains implementation of the [[AerospikeBatchOps]] protocol.
Clients should `require` this namespace to extend the protocol to the [[SimpleAerospikeClient]].
This protocol is only usable with `com.aerospike/aerospike-client` version 6.0.0 or higher."
(:require [aerospike-clj.aerospike-record :as record]
[aerospike-clj.client :as client]
[aerospike-clj.collections :as collections]
[aerospike-clj.protocols :as pt]
[promesa.core :as p])
(:import (aerospike_clj.client SimpleAerospikeClient)
(com.aerospike.client AerospikeClient AerospikeException BatchRecord)
(com.aerospike.client.async EventLoop EventLoops)
(com.aerospike.client.listener BatchOperateListListener)
(com.aerospike.client.policy BatchPolicy)
(java.util List)))

(deftype ^:private AsyncBatchOperateListListener [op-future]
BatchOperateListListener
(^void onSuccess [_this ^List records ^boolean _status]
(p/resolve! op-future records))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex)))

(defn- batch-record->map [^BatchRecord batch-record]
(let [k (.key batch-record)]
(-> (record/record->map (.record batch-record))
(assoc :index (.toString (.userKey k)))
(assoc :set (.setName k))
(assoc :result-code (.resultCode batch-record)))))

(extend-type SimpleAerospikeClient
pt/AerospikeBatchOps
(batch-operate
([this batch-records]
(pt/batch-operate this batch-records {}))
([this batch-records conf]
(let [op-future (p/deferred)
policy (:policy conf)
batch-list (if (list? batch-records)
batch-records
(into [] batch-records))
start-time (System/nanoTime)
transcoder (:transcoder conf identity)]
(.operate ^AerospikeClient (.-client this)
^EventLoop (.next ^EventLoops (.-el this))
^BatchOperateListListener (AsyncBatchOperateListListener. op-future)
^BatchPolicy policy
^List batch-list)
(-> op-future
(p/then' (comp transcoder #(collections/->list batch-record->map %)) (.-completion-executor this))
(client/register-events (.-client-events this) :batch-operate nil start-time conf))))))
51 changes: 51 additions & 0 deletions src/main/clojure/aerospike_clj/batch_policy.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
(ns aerospike-clj.batch-policy
"This namespace contains functions to create batch policies.
These policies are only usable with `com.aerospike/aerospike-client` version 6.0.0 or higher."
(:require [aerospike-clj.policy :as policy])
(:import #_{:clj-kondo/ignore [:unused-import]}
(com.aerospike.client.policy BatchPolicy
BatchWritePolicy
ClientPolicy
CommitLevel
GenerationPolicy
RecordExistsAction)))

(defn map->batch-policy
"Create a `BatchPolicy` from a map.
This function is slow due to possible reflection."
^BatchPolicy [conf]
(let [bp (BatchPolicy. (policy/map->policy conf))
conf (merge {"timeoutDelay" 3000} conf)]
(policy/set-java bp conf "allowInline")
(policy/set-java bp conf "respondAllKeys")
(policy/set-java bp conf "maxConcurrentThreads")
(policy/set-java bp conf "sendSetName")
bp))

(defn map->batch-write-policy
"Create a `BatchWritePolicy` from a map. Enumeration names should start with capitalized letter.
This function is slow due to possible reflection."
^BatchWritePolicy [conf]
(let [p (BatchWritePolicy.)]
(policy/set-java-enum p conf "RecordExistsAction")
(policy/set-java-enum p conf "CommitLevel")
(policy/set-java-enum p conf "GenerationPolicy")
(policy/set-java p conf "filterExp")
(policy/set-java p conf "generation")
(policy/set-java p conf "expiration")
(policy/set-java p conf "durableDelete")
(policy/set-java p conf "sendKey")
p))

(defn add-batch-write-policy
"Set the [[batchWritePolicyDefault]] or the [[batchParentPolicyWriteDefault]] in a [[ClientPolicy]]."
[^ClientPolicy client-policy conf]
(set! (.batchParentPolicyWriteDefault client-policy) (get conf "batchParentPolicyWriteDefault" (map->batch-policy conf)))
(set! (.batchWritePolicyDefault client-policy) (get conf "batchWritePolicyDefault" (map->batch-write-policy conf))))

(defn create-client-policy
"This is a wrapper around [[policy/create-client-policy]] that adds the batch policies to the client policy."
^ClientPolicy [event-loops conf]
(let [client-policy (policy/create-client-policy event-loops conf)]
(add-batch-write-policy client-policy conf)
client-policy))
42 changes: 9 additions & 33 deletions src/main/clojure/aerospike_clj/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
[clojure.tools.logging :as log]
[promesa.core :as p]
[promesa.exec :as p-exec])
(:import (aerospike_clj.listeners AsyncBatchListListener AsyncBatchOperateListListener AsyncDeleteListener
(:import (aerospike_clj.listeners AsyncBatchListListener AsyncDeleteListener
AsyncExistsArrayListener AsyncExistsListener AsyncInfoListener
AsyncRecordListener AsyncRecordSequenceListener AsyncWriteListener)
(com.aerospike.client BatchRecord Host Key)
(com.aerospike.client Host Key)
(com.aerospike.client AerospikeClient BatchRead Bin Key Operation)
(com.aerospike.client.async EventLoop EventLoops NioEventLoops)
(com.aerospike.client.cluster Node)
(com.aerospike.client.listener BatchOperateListListener)
(com.aerospike.client.policy BatchPolicy ClientPolicy InfoPolicy
Policy RecordExistsAction ScanPolicy
WritePolicy)
(java.time Instant)
(java.util Arrays List)
(java.util Arrays)
(java.util.concurrent Executor)))

(def
Expand Down Expand Up @@ -66,7 +65,7 @@
(p/catch (fn [op-exception]
(pt/on-failure client-events op-name op-exception index op-start-time))))))

(defn- register-events [op-future default-client-events op-name index op-start-time conf]
(defn register-events [op-future default-client-events op-name index op-start-time conf]
(let [client-events (:client-events conf default-client-events)]
(if (empty? client-events)
op-future
Expand All @@ -80,12 +79,11 @@
(create-key ^Key [this as-namespace set-name]
(as-key/create-key this as-namespace set-name)))

(defn- batch-record->map [^BatchRecord batch-record]
(let [k (.key batch-record)]
(-> (record/record->map (.record batch-record))
(defn batch-read->map [^BatchRead batch-read]
(let [k (.key batch-read)]
(-> (record/record->map (.record batch-read))
(assoc :index (.toString (.userKey k)))
(assoc :set (.setName k))
(assoc :result-code (.resultCode batch-record)))))
(assoc :set (.setName k)))))

(defn- map->batch-read ^BatchRead [batch-read-map dbns]
(let [k ^Key (pt/create-key (:index batch-read-map) dbns (:set batch-read-map))]
Expand Down Expand Up @@ -183,7 +181,7 @@
^BatchPolicy (:policy conf)
batch-reads-arr)
(-> op-future
(p/then' #(collections/->list batch-record->map %) completion-executor)
(p/then' #(collections/->list batch-read->map %) completion-executor)
(p/then' (:transcoder conf identity))
(register-events client-events :read-batch nil start-time conf))))

Expand Down Expand Up @@ -367,28 +365,6 @@
(p/then' record/record->map completion-executor)
(register-events client-events :operate index start-time conf)))))

pt/AerospikeBatchOps
(batch-operate [this batch-records]
(pt/batch-operate this batch-records {}))

(batch-operate [_this batch-records conf]
(let [op-future (p/deferred)
policy (:policy conf)
batch-list (if (list? batch-records)
batch-records
(into [] batch-records))
start-time (System/nanoTime)
transcoder (:transcoder conf identity)]
(.operate ^AerospikeClient client
^EventLoop (.next ^EventLoops el)
^BatchOperateListListener (AsyncBatchOperateListListener. op-future)
^BatchPolicy policy
^List batch-list)
(-> op-future
(p/then' (comp transcoder #(collections/->list batch-record->map %)) completion-executor)
(register-events client-events :batch-operate nil start-time conf))))


pt/AerospikeSetOps
(scan-set [_this aero-namespace set-name conf]
(when-not (fn? (:callback conf))
Expand Down
19 changes: 6 additions & 13 deletions src/main/clojure/aerospike_clj/listeners.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
(ns aerospike-clj.listeners
(:require [promesa.core :as p]
[aerospike-clj.aerospike-record :as record])
(:import (java.util List Map)
(com.aerospike.client Key Record AerospikeException AerospikeException$QueryTerminated)
(com.aerospike.client.listener RecordListener WriteListener DeleteListener
ExistsListener BatchListListener RecordSequenceListener InfoListener ExistsArrayListener BatchOperateListListener)))
(:require [aerospike-clj.aerospike-record :as record]
[promesa.core :as p])
(:import (com.aerospike.client AerospikeException AerospikeException$QueryTerminated Key Record)
(com.aerospike.client.listener BatchListListener DeleteListener
ExistsArrayListener ExistsListener InfoListener RecordListener RecordSequenceListener WriteListener)
(java.util List Map)))

(deftype AsyncExistsListener [op-future]
ExistsListener
Expand Down Expand Up @@ -66,10 +66,3 @@
(p/reject! op-future ex))
(^void onSuccess [_this ^"[Lcom.aerospike.client.Key;" _keys ^"[Z" exists]
(p/resolve! op-future exists)))

(deftype AsyncBatchOperateListListener [op-future]
BatchOperateListListener
(^void onSuccess [_this ^List records ^boolean _status]
(p/resolve! op-future records))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex)))
31 changes: 1 addition & 30 deletions src/main/clojure/aerospike_clj/policy.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#_{:clj-kondo/ignore [:unused-import]}
[com.aerospike.client.policy Policy ClientPolicy WritePolicy RecordExistsAction
GenerationPolicy BatchPolicy CommitLevel
AuthMode ReadModeAP ReadModeSC Replica BatchWritePolicy]))
AuthMode ReadModeAP ReadModeSC Replica]))

(defmacro set-java [obj conf obj-name]
`(when (some? (get ~conf ~obj-name))
Expand Down Expand Up @@ -38,21 +38,6 @@
(set-java p conf "totalTimeout")
p))

(defn map->batch-write-policy
"Create a `BatchWritePolicy` from a map. Enumeration names should start with capitalized letter.
This function is slow due to possible reflection."
^BatchWritePolicy [conf]
(let [p (BatchWritePolicy.)]
(set-java-enum p conf "RecordExistsAction")
(set-java-enum p conf "CommitLevel")
(set-java-enum p conf "GenerationPolicy")
(set-java p conf "filterExp")
(set-java p conf "generation")
(set-java p conf "expiration")
(set-java p conf "durableDelete")
(set-java p conf "sendKey")
p))

(defn map->batch-policy
"Create a `BatchPolicy` from a map.
This function is slow due to possible reflection."
Expand Down Expand Up @@ -92,18 +77,6 @@
(set! (.recordExistsAction wp) record-exists-action)
wp)))

(defn batch-write-policy
"Create a write policy to be passed to put methods via `{:policy wp}`.
Also used in `update` and `create`.
The default policy in case the record exists is `RecordExistsAction/UPDATE`."
(^BatchWritePolicy [client expiration]
(batch-write-policy client expiration (RecordExistsAction/UPDATE)))
(^BatchWritePolicy [client expiration record-exists-action]
(let [wp (BatchWritePolicy. (.getBatchWritePolicyDefault ^AerospikeClient client))]
(set! (.expiration wp) expiration)
(set! (.recordExistsAction wp) record-exists-action)
wp)))

(defn set-policy
"Create a write policy with UPDATE record exists action.
in case of new entry, create it
Expand Down Expand Up @@ -187,8 +160,6 @@
(set! (.readPolicyDefault cp) (get conf "readPolicyDefault" (map->policy conf)))
(set! (.writePolicyDefault cp) (get conf "writePolicyDefault" (map->write-policy conf)))
(set! (.batchPolicyDefault cp) (get conf "batchPolicyDefault" (map->batch-policy conf)))
(set! (.batchParentPolicyWriteDefault cp) (get conf "batchParentPolicyWriteDefault" (map->batch-policy conf)))
(set! (.batchWritePolicyDefault cp) (get conf "batchWritePolicyDefault" (map->batch-write-policy conf)))

(set-java-enum cp conf "AuthMode")
(set-java cp conf "clusterName")
Expand Down
Loading
Loading