Skip to content

Commit

Permalink
Merge pull request #55 from RADAR-base/firebase_user_repo
Browse files Browse the repository at this point in the history
Firebase user repo
  • Loading branch information
yatharthranjan authored May 4, 2020
2 parents 1b7d783 + a26ca00 commit bba9625
Show file tree
Hide file tree
Showing 15 changed files with 975 additions and 54 deletions.
7 changes: 4 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 2
continuation_indent_size = 4
ij_continuation_indent_size = 4
max_line_length=100

[*.gradle,*.py]
[{*.gradle, *.py}]
indent_size = 4
continuation_indent_size = 8
ij_continuation_indent_size = 8
1 change: 1 addition & 0 deletions kafka-connect-fitbit-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies {

implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion
implementation 'com.google.firebase:firebase-admin:6.12.2'

// Included in connector runtime
compileOnly group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
import org.apache.kafka.common.config.ConfigDef.Range;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigDef.Width;
Expand Down Expand Up @@ -123,21 +122,22 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY = "Intraday calories topic";
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DEFAULT = "connect_fitbit_intraday_calories";

private final UserRepository userRepository;
public static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG = "fitbit.user.firebase.collection.fitbit.name";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC = "Firestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY = "Firebase Fitbit collection name.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DEFAULT = "fitbit";

public static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG = "fitbit.user.firebase.collection.user.name";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DOC = "Firestore Collection for retrieving User details. Only used when a Firebase based user repository is used.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DISPLAY = "Firebase User collection name.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DEFAULT = "users";

private UserRepository userRepository;
private final Headers clientCredentials;

@SuppressWarnings("unchecked")
public FitbitRestSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig, boolean doLog) {
super(config, parsedConfig, doLog);

try {
userRepository = ((Class<? extends UserRepository>)
getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance();
} catch (IllegalAccessException | InstantiationException
| InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e);
}

String credentialString = getFitbitClient() + ":" + getFitbitClientSecret();
String credentialsBase64 = Base64.getEncoder().encodeToString(
credentialString.getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -318,6 +318,26 @@ public String toString() {
++orderInGroup,
Width.SHORT,
FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY)

.define(FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG,
Type.STRING,
FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DEFAULT,
Importance.LOW,
FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY)

.define(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG,
Type.STRING,
FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DEFAULT,
Importance.LOW,
FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DISPLAY)
;
}

Expand All @@ -333,11 +353,32 @@ public String getFitbitClientSecret() {
return getPassword(FITBIT_API_SECRET_CONFIG).value();
}

public UserRepository getUserRepository(UserRepository reuse) {
if (reuse != null && reuse.getClass().equals(getClass(FITBIT_USER_REPOSITORY_CONFIG))) {
userRepository = reuse;
} else {
userRepository = createUserRepository();
}
userRepository.initialize(this);
return userRepository;
}

public UserRepository getUserRepository() {
userRepository.initialize(this);
return userRepository;
}

@SuppressWarnings("unchecked")
public UserRepository createUserRepository() {
try {
return ((Class<? extends UserRepository>)
getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance();
} catch (IllegalAccessException | InstantiationException
| InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e);
}
}

public String getFitbitIntradayStepsTopic() {
return getString(FITBIT_INTRADAY_STEPS_TOPIC_CONFIG);
}
Expand Down Expand Up @@ -398,4 +439,12 @@ public Duration getTooManyRequestsCooldownInterval() {
public String getFitbitIntradayCaloriesTopic() {
return getString(FITBIT_INTRADAY_CALORIES_TOPIC_CONFIG);
}

public String getFitbitUserRepositoryFirestoreFitbitCollection() {
return getString(FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG);
}

public String getFitbitUserRepositoryFirestoreUserCollection() {
return getString(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.radarbase.connect.rest.AbstractRestSourceConnector;
import org.radarbase.connect.rest.fitbit.user.User;
import org.radarbase.connect.rest.fitbit.user.UserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,26 +41,32 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {
private static final Logger logger = LoggerFactory.getLogger(FitbitSourceConnector.class);
private ScheduledExecutorService executor;
private Set<? extends User> configuredUsers;

private UserRepository repository;

@Override
public void start(Map<String, String> props) {
super.start(props);
executor = Executors.newSingleThreadScheduledExecutor();

executor.scheduleAtFixedRate(() -> {
try {
logger.info("Requesting latest user details...");
Set<? extends User> newUsers = getConfig(props, false).getUserRepository().stream()
.collect(Collectors.toSet());
if (configuredUsers != null && !newUsers.equals(configuredUsers)) {
logger.info("User info mismatch found. Requesting reconfiguration...");
reconfigure();
if (repository.hasPendingUpdates()) {
try {
logger.info("Requesting latest user details...");
repository.applyPendingUpdates();
Set<? extends User> newUsers =
getConfig(props, false).getUserRepository(repository).stream()
.collect(Collectors.toSet());
if (configuredUsers != null && !newUsers.equals(configuredUsers)) {
logger.info("User info mismatch found. Requesting reconfiguration...");
reconfigure();
}
} catch (IOException e) {
logger.warn("Failed to refresh users: {}", e.toString());
}
} catch (IOException e) {
logger.warn("Failed to refresh users: {}", e.toString());
} else {
logger.info("No pending updates found. Not attempting to refresh users.");
}
},0, 5, TimeUnit.MINUTES);
}, 0, 5, TimeUnit.MINUTES);
}

@Override
Expand All @@ -78,7 +83,9 @@ private FitbitRestSourceConnectorConfig getConfig(Map<String, String> conf, bool

@Override
public FitbitRestSourceConnectorConfig getConfig(Map<String, String> conf) {
return getConfig(conf, true);
FitbitRestSourceConnectorConfig connectorConfig = getConfig(conf, true);
repository = connectorConfig.getUserRepository(repository);
return connectorConfig;
}

@Override
Expand All @@ -94,14 +101,16 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
private List<Map<String, String>> configureTasks(int maxTasks) {
Map<String, String> baseConfig = config.originalsStrings();
FitbitRestSourceConnectorConfig fitbitConfig = getConfig(baseConfig);
if (repository == null) {
repository = fitbitConfig.getUserRepository(null);
}
// Divide the users over tasks
try {

List<Map<String, String>> userTasks = fitbitConfig.getUserRepository().stream()
List<Map<String, String>> userTasks = fitbitConfig.getUserRepository(repository).stream()
.map(User::getVersionedId)
// group users based on their hashCode
// in principle this allows for more efficient reconfigurations for a fixed number of tasks,
// since that allows existing tasks to only handle small modifications users to handle.
// group users based on their hashCode, in principle, this allows for more efficient
// reconfigurations for a fixed number of tasks, since that allows existing tasks to
// only handle small modifications users to handle.
.collect(Collectors.groupingBy(
u -> Math.abs(u.hashCode()) % maxTasks,
Collectors.joining(",")))
Expand All @@ -114,7 +123,7 @@ private List<Map<String, String>> configureTasks(int maxTasks) {
.collect(Collectors.toList());
this.configuredUsers = fitbitConfig.getUserRepository().stream()
.collect(Collectors.toSet());
logger.info("Received userTask Configs {}" , userTasks);
logger.info("Received userTask Configs {}", userTasks);
return userTasks;
} catch (IOException ex) {
throw new ConfigException("Cannot read users", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Request authenticate(Route requestRoute, Response response) throws IOExce
.header("Authorization", "Bearer " + newAccessToken)
.build();
} catch (NotAuthorizedException ex) {
logger.error("Cannot get a new refresh token for user {}. Cancelling request.", user);
logger.error("Cannot get a new refresh token for user {}. Cancelling request.", user, ex);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,7 @@ public void initialize(RestSourceConnectorConfig config) {
}

@Override
public Stream<? extends User> stream() throws IOException {
Instant nextFetchTime = nextFetch.get();
Instant now = Instant.now();
if (!now.isAfter(nextFetchTime)
|| !nextFetch.compareAndSet(nextFetchTime, now.plus(FETCH_THRESHOLD))) {
logger.debug("Providing cached user information...");
return timedCachedUsers.stream();
}

logger.info("Requesting user information from webservice");
Request request = requestFor("users?source-type=FitBit").build();
this.timedCachedUsers =
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
.filter(u -> u.isComplete()
&& (containedUsers.isEmpty() || containedUsers.contains(u.getVersionedId())))
.collect(Collectors.toSet());

public Stream<? extends User> stream() {
return this.timedCachedUsers.stream();
}

Expand All @@ -124,6 +108,28 @@ public String refreshAccessToken(User user) throws IOException, NotAuthorizedExc
return credentials.getAccessToken();
}

@Override
public boolean hasPendingUpdates() {
Instant nextFetchTime = nextFetch.get();
Instant now = Instant.now();
return now.isAfter(nextFetchTime);
}

@Override
public void applyPendingUpdates() throws IOException {
logger.info("Requesting user information from webservice");
Request request = requestFor("users?source-type=FitBit").build();
this.timedCachedUsers =
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
.filter(
u ->
u.isComplete()
&& (containedUsers.isEmpty()
|| containedUsers.contains(u.getVersionedId())))
.collect(Collectors.toSet());
nextFetch.set(Instant.now().plus(FETCH_THRESHOLD));
}

private Request.Builder requestFor(String relativeUrl) {
HttpUrl url = baseUrl.resolve(relativeUrl);
if (url == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,19 @@ public interface UserRepository extends RestSourceTool {
* @throws java.util.NoSuchElementException if the user does not exists in this repository.
*/
String refreshAccessToken(User user) throws IOException, NotAuthorizedException;

/**
* The functions allows the repository to supply when there are pending updates.
* This gives more control to the user repository in updating and caching users.
* @return {@code true} if there are new updates available, {@code false} otherwise.
*/
boolean hasPendingUpdates();

/**
* Apply any pending updates to users. This could include, for instance, refreshing a cache
* of users with latest information.
* This is called when {@link #hasPendingUpdates()} is {@code true}.
* @throws IOException if there was an error when applying updates.
*/
void applyPendingUpdates() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ private void updateUsers() {
|| !nextFetch.compareAndSet(nextFetchTime, now.plus(FETCH_THRESHOLD))) {
return;
}
forceUpdateUsers();
}

private void forceUpdateUsers() {
try {
Map<String, LockedUser> newMap = Files.walk(credentialsDir)
.filter(p -> Files.isRegularFile(p)
Expand All @@ -129,8 +132,6 @@ private void updateUsers() {

@Override
public Stream<LocalUser> stream() {
updateUsers();

Stream<LockedUser> users = this.users.values().stream()
.filter(lockedTest(u -> u.getOAuth2Credentials().hasRefreshToken()));
if (!configuredUsers.isEmpty()) {
Expand Down Expand Up @@ -177,6 +178,19 @@ public String refreshAccessToken(User user) throws IOException {
return refreshAccessToken(user, NUM_RETRIES);
}

@Override
public boolean hasPendingUpdates() {
Instant nextFetchTime = nextFetch.get();
Instant now = Instant.now();
return now.isAfter(nextFetchTime);
}

@Override
public void applyPendingUpdates() {
forceUpdateUsers();
nextFetch.set(Instant.now().plus(FETCH_THRESHOLD));
}

/**
* Refreshes the Fitbit access token on the current host, using the locally stored refresh token.
* If successful, the tokens are locally stored.
Expand Down
Loading

0 comments on commit bba9625

Please sign in to comment.