Skip to content

Commit

Permalink
Auth unauth (#20)
Browse files Browse the repository at this point in the history
* added explict authorize and unauthorize methods

* disable autologging in on rpc and wrote a test for this behavor

* updated README

* fixed README typo

* Added race condition test

* check for auth success in tests setup

* added warnings when auth/unauth is unnecessarily called

* added more detail to the README example

* added more auth tests

* removed unused import

* fixed main function

* added test for action after unauth
  • Loading branch information
dfirsht authored and boopt2 committed Apr 28, 2017
1 parent 21eb2d6 commit 2bf9e05
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 102 deletions.
19 changes: 16 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,25 @@ Applications use the CSync class to create a connection to a specific CSync serv
final CSync csync = new CSync.builder()
.host("localhost")
.port(6005)
.provider("demo")
.token("demoToken")
.build();
```

Note: Update the `host` and `port` to your specific csync server instance. Also, `provider` and `token` can be updated to allow other providers to authenticate the csync session.
Note: Update the `host` and `port` to your specific csync server instance.

## Authenticating
```
csync.authenticate("demo", "demoToken")
.onComplete((exception, didSucceed) -> {
if(didSucceed) {
//We successfully logged in
}
else {
ex.printStackTrace()
}
});
```

Note: The provider and token can be updated to allow other providers to authenticate the csync session.

## Listening to values on a key
```
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ public static void main(String args[]) throws Exception {


final CSync csync = CSync.builder()
.token("demoToken")
.provider("demo")
.build();
csync.authenticate("demo", "demoToken");

csync.pub("something", "hello", Acls.Private)
.then(() -> csync.pub("something", "hello again", Acls.PublicRead))
Expand Down
24 changes: 12 additions & 12 deletions src/main/java/com/ibm/csync/CSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.ibm.csync.functional.Futur;
import com.ibm.csync.impl.CSyncImpl;
import com.ibm.csync.impl.commands.Pub;
import okhttp3.ws.WebSocket;

import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
Expand Down Expand Up @@ -69,9 +71,17 @@ public Futur<Long> del(final Key key, final Deadline dl) {



// Auth

@Override
public Futur<Boolean> authenticate(String provider, String token) {
return impl.ws.startSession(provider, token);
}



@Override
public Futur<Boolean> unauthenticate() {
return impl.ws.endSession();
}

// Blocking

Expand Down Expand Up @@ -175,16 +185,6 @@ public Builder path(final String path) {
return this;
}

public Builder provider(final String provider){
args.put("authProvider",provider);
return this;
}

public Builder token(final String token) {
args.put("token",token);
return this;
}

private static String encode(final String in) {
try {
return URLEncoder.encode(in,"UTF-8");
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/ibm/csync/CSyncAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
package com.ibm.csync;

import com.ibm.csync.functional.Futur;
import okhttp3.ws.WebSocket;

import java.io.IOError;
import java.io.IOException;

interface CSyncAPI {

Futur<Boolean> authenticate(String provider, String token);
Futur<Boolean> unauthenticate();
Futur<Long> pub(Key key, String data, Acl acl, Deadline dl);
Futur<Long> del(final Key key, final Deadline dl);
Timeout defaultTimeout();
Expand Down
83 changes: 74 additions & 9 deletions src/main/java/com/ibm/csync/impl/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
Expand All @@ -55,6 +57,7 @@ public class Transport implements WebSocketListener {

private Promise<WebSocket> socketPromise = null;
private Futur<WebSocket> socketFutur = null;
private Promise<Boolean> finshedClosing = null;
private final Executor sendExec = Executors.newSingleThreadExecutor();
private WebSocket loginWebSocket = null; //Needs to be stored so that we callback when we are sure auth succeeded
// TODO: handle concurrency
Expand All @@ -63,11 +66,10 @@ public class Transport implements WebSocketListener {

private final Tracer tracer;

//private final String url;
private final Database db;
private final ScheduledExecutorService workers;

private final Request req;
private Request req;
private final OkHttpClient client;

Transport(final String url, final Database db, final ScheduledExecutorService workers, final Tracer tracer) {
Expand All @@ -77,17 +79,64 @@ public class Transport implements WebSocketListener {
this.tracer = tracer;

req = new Request.Builder()
.get()
.url(url)
.build();
.get()
.url(url)
.build();

client = new OkHttpClient.Builder()
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS).build();

}

private synchronized Futur<WebSocket> connect() {
public synchronized Futur<Boolean> startSession(String provider, String token) {
String authURL = this.req.url().toString() + encodeAuthParameters(provider, token);
Promise<Boolean> sessionPromise = new Promise<>();
if(socketFutur != null) {
// Already logged in
logger.warn("Start session called while the session is already active");
sessionPromise.set(null, true);
}
else {
connect(authURL)
.onComplete(workers, (ex, ws) -> {
if (ex != null) {
sessionPromise.set(ex,false);
}
else {
sessionPromise.set(null,true);
}
}

);
}
return sessionPromise.futur;
}

public synchronized Futur<Boolean> endSession() {
if(socketFutur == null) {
// Already logged out
logger.warn("End session called while the session is already not active");
Promise<Boolean> willLogout = new Promise<>();
Futur<Boolean> logoutSuccess = willLogout.futur;
willLogout.set(null, true);
return logoutSuccess;
}
else {
if(finshedClosing != null) {
return finshedClosing.futur;
}
finshedClosing = new Promise<>();
socketFutur.consume(ws -> ws.close(1000,"session ended"));
return finshedClosing.futur;
}
}

private synchronized Futur<WebSocket> connect(String url) {
Request req = new Request.Builder()
.get()
.url(url)
.build();
if (socketFutur == null) {
socketPromise = new Promise<>(workers);
socketFutur = socketPromise.futur;
Expand All @@ -96,7 +145,22 @@ private synchronized Futur<WebSocket> connect() {
return socketFutur;
}

private String encodeAuthParameters(String provider, String token) {
try {
return "&authProvider=" + URLEncoder.encode(provider,"UTF-8")
+ "&token=" + URLEncoder.encode(token,"UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}

public synchronized <T> Futur<T> rpc(final String kind, final Object request, Class<T> cls, final Deadline dl) {
if(socketFutur == null) {
// Unauthenticated
Promise<T> failedPromise = new Promise<>();
failedPromise.set(new Exception("Unauthorized"), null);
return failedPromise.futur;
}
final Envelope requestEnv = new Envelope(kind, gson.toJsonTree(request));
final Long closure = requestEnv.closure;
final String outgoing = gson.toJson(requestEnv);
Expand All @@ -105,7 +169,7 @@ public synchronized <T> Futur<T> rpc(final String kind, final Object request, C
final Promise<Envelope> responseEnvelopePromise = new Promise<>(workers);
waitingForResponse.put(closure, responseEnvelopePromise);

return connect()
return socketFutur
.consume(sendExec,ws -> ws.sendMessage(RequestBody.create(WebSocket.TEXT, outgoing)))
.then(() -> responseEnvelopePromise.futur)
.map(env -> gson.fromJson(env.payload,cls))
Expand Down Expand Up @@ -193,7 +257,8 @@ public synchronized void onClose(int code, String reason) {
if (thePromise != null) {
thePromise.set(new Exception(reason),null);
}

if (finshedClosing != null) {
finshedClosing.set(null, true);
}
}

}
16 changes: 13 additions & 3 deletions src/test/java/com/ibm/csync/AdvanceTestsIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class AdvanceTestsIT {

Expand All @@ -37,13 +38,22 @@ public class AdvanceTestsIT {

@Before
public void setup() throws Exception{
int x = 50;
CompletableFuture<Boolean> future = new CompletableFuture<>();
csync = CSync.builder()
.token(System.getenv("CSYNC_DEMO_TOKEN"))
.provider(System.getenv("CSYNC_DEMO_PROVIDER"))
.host(System.getenv("CSYNC_HOST"))
.port(Integer.parseInt(System.getenv("CSYNC_PORT")))
.build();
csync.authenticate(System.getenv("CSYNC_DEMO_PROVIDER"), System.getenv("CSYNC_DEMO_TOKEN"))
.onComplete((ex, isSuccessful) -> {
if(ex == null) {
future.complete(true);
}
else {
fail("Unable to authenticate with the given credentials");
}
});

future.get(10, TimeUnit.SECONDS);
}

@After
Expand Down
Loading

0 comments on commit 2bf9e05

Please sign in to comment.