Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.0.8 #112

Merged
merged 19 commits into from
Oct 12, 2024
Merged
1 change: 1 addition & 0 deletions .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Learn more from ready-to-use templates: https://www.gitpod.io/docs/introduction/getting-started/quickstart

tasks:
# - before: mvn clean && docker ps -aq | xargs docker stop | xargs docker rm && docker rmi shihuili1218/klein-jepsen-control shihuili1218/klein-jepsen-node
- init: mvn clean install -DskipTests=true && chmod -R +x klein-jepsen/docker && klein-jepsen/docker/bin/up


4 changes: 2 additions & 2 deletions klein-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>klein</artifactId>
<groupId>com.ofcoder.klein</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.ofcoder.klein.common</groupId>
<artifactId>klein-common</artifactId>
<version>0.0.7</version>
<version>0.0.8</version>
<packaging>jar</packaging>

<properties>
Expand Down
4 changes: 2 additions & 2 deletions klein-consensus/klein-consensus-facade/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>klein-consensus</artifactId>
<groupId>com.ofcoder.klein.consensus</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.ofcoder.klein.consensus.facade</groupId>
<artifactId>klein-consensus-facade</artifactId>
<version>0.0.7</version>
<version>0.0.8</version>
<packaging>jar</packaging>

<properties>
Expand Down
4 changes: 2 additions & 2 deletions klein-consensus/klein-consensus-paxos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>klein-consensus</artifactId>
<groupId>com.ofcoder.klein.consensus</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.ofcoder.klein.consensus.paxos</groupId>
<artifactId>klein-consensus-paxos</artifactId>
<version>0.0.7</version>
<version>0.0.8</version>
<packaging>jar</packaging>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.ofcoder.klein.storage.facade.Snap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Learner Role.
Expand All @@ -42,8 +41,6 @@ public interface Learner extends Role<ConsensusProp> {

long getLastCheckpoint();

Set<String> getGroups();

Map<String, SMApplier> getSms();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -110,11 +109,6 @@ public long getLastCheckpoint() {
.max().orElse(-1L);
}

@Override
public Set<String> getGroups() {
return sms.keySet();
}

private Map<String, Snap> generateSnap() {
Map<String, SMApplier> sms = new HashMap<>(this.sms);
ConcurrentMap<String, Snap> result = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -494,7 +488,7 @@ public SnapSyncRes handleSnapSyncRequest(final SnapSyncReq req) {
Map<String, Snap> allSnaps = generateSnap();
res.getImages().putAll(allSnaps);
} else {
for (String group : RuntimeAccessor.getLearner().getGroups()) {
for (String group : sms.keySet()) {
Snap lastSnap = logManager.getLastSnap(group);
if (lastSnap != null && lastSnap.getCheckpoint() > req.getCheckpoint()) {
res.getImages().put(group, lastSnap);
Expand Down
4 changes: 2 additions & 2 deletions klein-consensus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
<parent>
<artifactId>klein</artifactId>
<groupId>com.ofcoder.klein</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.ofcoder.klein.consensus</groupId>
<artifactId>klein-consensus</artifactId>
<packaging>pom</packaging>
<version>0.0.7</version>
<version>0.0.8</version>

<modules>
<module>klein-consensus-paxos</module>
Expand Down
4 changes: 2 additions & 2 deletions klein-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>klein</artifactId>
<groupId>com.ofcoder.klein</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.ofcoder.klein.core</groupId>
<artifactId>klein-core</artifactId>
<version>0.0.7</version>
<version>0.0.8</version>
<packaging>jar</packaging>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;

import com.ofcoder.klein.common.OnlyForTest;
import com.ofcoder.klein.consensus.facade.Result;

/**
* Klein Cache.
Expand Down Expand Up @@ -52,10 +53,12 @@ public interface KleinCache {
* @param data cache value
* @param <D> cache value type
* @param apply if ture, wait sm apply before return
* @param ttl expire
* @param unit expire unit
* @return put result
*/
@OnlyForTest("for jepsen")
<D extends Serializable> boolean put(String key, D data, boolean apply);
<D extends Serializable> Result.State put(String key, D data, boolean apply, Long ttl, TimeUnit unit);

/**
* put element to cache and set expire.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,17 @@
}

@Override
public <D extends Serializable> boolean put(final String key, final D data, final boolean apply) {
public <D extends Serializable> Result.State put(final String key, final D data, final boolean apply, final Long ttl, final TimeUnit unit) {
CacheMessage message = new CacheMessage();
message.setCacheName(cacheName);
message.setData(data);
message.setKey(key);
message.setOp(CacheMessage.PUT);
if (ttl > 0) {
message.setExpire(TrueTime.currentTimeMillis() + unit.toMillis(ttl));

Check warning on line 81 in klein-core/src/main/java/com/ofcoder/klein/core/cache/KleinCacheImpl.java

View check run for this annotation

Codecov / codecov/patch

klein-core/src/main/java/com/ofcoder/klein/core/cache/KleinCacheImpl.java#L81

Added line #L81 was not covered by tests
}
Result result = consensus.propose(message, apply);
return Result.State.SUCCESS.equals(result.getState());
return result.getState();

Check warning on line 84 in klein-core/src/main/java/com/ofcoder/klein/core/cache/KleinCacheImpl.java

View check run for this annotation

Codecov / codecov/patch

klein-core/src/main/java/com/ofcoder/klein/core/cache/KleinCacheImpl.java#L84

Added line #L84 was not covered by tests
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions klein-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>klein</artifactId>
<groupId>com.ofcoder.klein</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.ofcoder.klein.example</groupId>
<artifactId>klein-example</artifactId>
<version>0.0.7</version>
<version>0.0.8</version>
<packaging>jar</packaging>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion klein-jepsen/docker/bin/build-docker-compose
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ DBS=""
member=""

for ((n=1;n<=NODE_COUNT;n++)); do
member+=${n}:n${n}:1218\;
member+=${n}:n${n}:1218:false\;
done;

# For each node
Expand Down
1 change: 1 addition & 0 deletions klein-jepsen/docker/template/server.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
klein.id=%%N%%
klein.port=1218
klein.ip=n%%N%%
klein.outsider=false
klein.members=%%MEMBER%%
4 changes: 4 additions & 0 deletions klein-jepsen/klein-jepsen-server/deploy/stop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ then
kill $pid
fi
echo "Done!"

rm -rf /data

echo "Cleanup!"
4 changes: 2 additions & 2 deletions klein-jepsen/klein-jepsen-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>klein-jepsen</artifactId>
<groupId>com.ofcoder.klein.jepsen</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.ofcoder.klein.jepsen.server</groupId>
<artifactId>klein-jepsen-server</artifactId>
<version>0.0.7</version>
<version>0.0.8</version>
<packaging>jar</packaging>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,17 @@ public boolean put(final Integer value) throws UnsupportedEncodingException {

try {
LOG.info("seq: {}, put: {} on node: {}", req.getSeq(), value, endpoint.getId());
boolean o = client.sendRequestSync(endpoint, param, 3000);
LOG.info("seq: {}, put: {} on node: {}, result: {}", req.getSeq(), value, endpoint.getId(), o);
if (!o) {
throw new IllegalArgumentException("seq: " + req.getSeq() + ", put: " + value + " on node: " + endpoint.getId() + ", occur proposal conflict");
// see: com.ofcoder.klein.consensus.facade.Result.State
String result = client.sendRequestSync(endpoint, param, 3000);
LOG.info("seq: {}, put: {} on node: {}, result: {}", req.getSeq(), value, endpoint.getId(), result);
if (!"SUCCESS".equals(result)) {
throw new IllegalArgumentException("seq: " + req.getSeq() + ", put: " + value + " on node: " + endpoint.getId() + ", " + result);
}
return true;
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException("seq: " + req.getSeq() + ", put: " + value + " on node: " + endpoint.getId() + ", result: err");
throw new IllegalArgumentException("seq: " + req.getSeq() + ", put: " + value + " on node: " + endpoint.getId() + ", result: UNKNOWN");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.nio.ByteBuffer;

import com.ofcoder.klein.consensus.facade.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,21 +42,15 @@ public PutProcessor(final KleinCache cache) {

@Override
public void handleRequest(final PutReq request, final RpcContext context) {
if (request.getTtl() <= 0) {
try {
LOG.info("put operator, begin, seq: {}", request.getSeq());
boolean put = cache.put(request.getKey(), request.getData(), true);
LOG.info("put operator, end, seq: {}, result: {}", request.getSeq(), put);
context.response(ByteBuffer.wrap(Hessian2Util.serialize(put)));
} catch (Exception e) {
LOG.error(e.getMessage());
context.response(ByteBuffer.wrap(Hessian2Util.serialize(false)));
LOG.info("put operator, err, seq: {}, result: err", request.getSeq());
}
} else {
boolean put = cache.put(request.getKey(), request.getData(), request.getTtl(), request.getUnit());
context.response(ByteBuffer.wrap(Hessian2Util.serialize(put)));
LOG.info("put operator, err, seq: {}", request.getSeq());
try {
LOG.info("put operator, begin, seq: {}", request.getSeq());
Result.State put = cache.put(request.getKey(), request.getData(), false, request.getTtl(), request.getUnit());
LOG.info("put operator, end, seq: {}, result: {}", request.getSeq(), put);
context.response(ByteBuffer.wrap(Hessian2Util.serialize(put.name())));
} catch (Exception e) {
LOG.error(e.getMessage());
context.response(ByteBuffer.wrap(Hessian2Util.serialize(Result.State.UNKNOWN.name())));
LOG.info("put operator, err, seq: {}, result: err", request.getSeq());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
*/
public class PutReq extends BaseReq {
private Serializable data;
private long ttl;
private TimeUnit unit;
private long ttl = -1;
private TimeUnit unit = TimeUnit.SECONDS;

public Serializable getData() {
return data;
Expand Down
5 changes: 0 additions & 5 deletions klein-jepsen/klein-jepsen-test/nodes

This file was deleted.

9 changes: 7 additions & 2 deletions klein-jepsen/klein-jepsen-test/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}
:main jepsen.klein
:jvm-opts ["-Xms2g" "-Xmx2g" "-server"]
:dependencies [[org.clojure/clojure "1.9.0"]
[jepsen "0.1.11"]
:dependencies [
[org.clojure/clojure "1.10.0"]
[jepsen "0.1.19"]
; [org.clojure/clojure "1.9.0"]
; [jepsen "0.1.11"]
[clj-ssh "0.5.14"]
[cider/cider-nrepl "0.17.0-SNAPSHOT"]
[org.clojure/tools.nrepl "0.2.13" :exclusions [org.clojure/clojure]]
[net.java.dev.jna/jna "4.5.1"]
[javax.xml.bind/jaxb-api "2.3.1"]
[org.glassfish.jaxb/jaxb-runtime "2.3.1"]
[com.ofcoder.klein.jepsen.server/klein-jepsen-server "0.0.1"]
]
)
2 changes: 1 addition & 1 deletion klein-jepsen/klein-jepsen-test/run_test.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
unset SSH_AUTH_SOCK
lein run test --time-limit 6000 --concurrency 5 --test-count 5 --username root --password 123456 $@
lein run test --time-limit 600 --concurrency 5 --test-count 2 --username root --password 123456 $@
Loading
Loading