Skip to content

Commit

Permalink
Merge pull request #57 from RADAR-base/release-0.3.0
Browse files Browse the repository at this point in the history
Release 0.3.0
  • Loading branch information
yatharthranjan committed May 14, 2020
2 parents 98c260e + 6bafb92 commit 743ebeb
Show file tree
Hide file tree
Showing 20 changed files with 1,016 additions and 74 deletions.
7 changes: 4 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 2
continuation_indent_size = 4
ij_continuation_indent_size = 4
max_line_length=100

[*.gradle,*.py]
[{*.gradle, *.py}]
indent_size = 4
continuation_indent_size = 8
ij_continuation_indent_size = 8
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ subprojects {
apply plugin: 'java-library'

group = 'org.radarbase'
version = '0.2.4'
version = '0.3.0'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
mavenCentral()
maven { url "http://packages.confluent.io/maven/" }
maven { url "http://repo.maven.apache.org/maven2" }
maven { url "https://packages.confluent.io/maven/" }
maven { url "https://repo.maven.apache.org/maven2" }
jcenter()
maven { url 'http://oss.jfrog.org/artifactory/oss-snapshot-local/' }
maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local/' }
}
}

Expand Down
1 change: 1 addition & 0 deletions kafka-connect-fitbit-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies {

implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion
implementation 'com.google.firebase:firebase-admin:6.12.2'

// Included in connector runtime
compileOnly group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
import org.apache.kafka.common.config.ConfigDef.Range;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigDef.Width;
Expand Down Expand Up @@ -123,21 +122,22 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY = "Intraday calories topic";
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DEFAULT = "connect_fitbit_intraday_calories";

private final UserRepository userRepository;
public static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG = "fitbit.user.firebase.collection.fitbit.name";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC = "Firestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY = "Firebase Fitbit collection name.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DEFAULT = "fitbit";

public static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG = "fitbit.user.firebase.collection.user.name";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DOC = "Firestore Collection for retrieving User details. Only used when a Firebase based user repository is used.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DISPLAY = "Firebase User collection name.";
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DEFAULT = "users";

private UserRepository userRepository;
private final Headers clientCredentials;

@SuppressWarnings("unchecked")
public FitbitRestSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig, boolean doLog) {
super(config, parsedConfig, doLog);

try {
userRepository = ((Class<? extends UserRepository>)
getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance();
} catch (IllegalAccessException | InstantiationException
| InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e);
}

String credentialString = getFitbitClient() + ":" + getFitbitClientSecret();
String credentialsBase64 = Base64.getEncoder().encodeToString(
credentialString.getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -318,6 +318,26 @@ public String toString() {
++orderInGroup,
Width.SHORT,
FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY)

.define(FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG,
Type.STRING,
FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DEFAULT,
Importance.LOW,
FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY)

.define(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG,
Type.STRING,
FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DEFAULT,
Importance.LOW,
FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DISPLAY)
;
}

Expand All @@ -333,11 +353,32 @@ public String getFitbitClientSecret() {
return getPassword(FITBIT_API_SECRET_CONFIG).value();
}

public UserRepository getUserRepository(UserRepository reuse) {
if (reuse != null && reuse.getClass().equals(getClass(FITBIT_USER_REPOSITORY_CONFIG))) {
userRepository = reuse;
} else {
userRepository = createUserRepository();
}
userRepository.initialize(this);
return userRepository;
}

public UserRepository getUserRepository() {
userRepository.initialize(this);
return userRepository;
}

@SuppressWarnings("unchecked")
public UserRepository createUserRepository() {
try {
return ((Class<? extends UserRepository>)
getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance();
} catch (IllegalAccessException | InstantiationException
| InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e);
}
}

public String getFitbitIntradayStepsTopic() {
return getString(FITBIT_INTRADAY_STEPS_TOPIC_CONFIG);
}
Expand Down Expand Up @@ -398,4 +439,12 @@ public Duration getTooManyRequestsCooldownInterval() {
public String getFitbitIntradayCaloriesTopic() {
return getString(FITBIT_INTRADAY_CALORIES_TOPIC_CONFIG);
}

public String getFitbitUserRepositoryFirestoreFitbitCollection() {
return getString(FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG);
}

public String getFitbitUserRepositoryFirestoreUserCollection() {
return getString(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.radarbase.connect.rest.AbstractRestSourceConnector;
import org.radarbase.connect.rest.fitbit.user.User;
import org.radarbase.connect.rest.fitbit.user.UserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,26 +41,32 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {
private static final Logger logger = LoggerFactory.getLogger(FitbitSourceConnector.class);
private ScheduledExecutorService executor;
private Set<? extends User> configuredUsers;

private UserRepository repository;

@Override
public void start(Map<String, String> props) {
super.start(props);
executor = Executors.newSingleThreadScheduledExecutor();

executor.scheduleAtFixedRate(() -> {
try {
logger.info("Requesting latest user details...");
Set<? extends User> newUsers = getConfig(props, false).getUserRepository().stream()
.collect(Collectors.toSet());
if (configuredUsers != null && !newUsers.equals(configuredUsers)) {
logger.info("User info mismatch found. Requesting reconfiguration...");
reconfigure();
if (repository.hasPendingUpdates()) {
try {
logger.info("Requesting latest user details...");
repository.applyPendingUpdates();
Set<? extends User> newUsers =
getConfig(props, false).getUserRepository(repository).stream()
.collect(Collectors.toSet());
if (configuredUsers != null && !newUsers.equals(configuredUsers)) {
logger.info("User info mismatch found. Requesting reconfiguration...");
reconfigure();
}
} catch (IOException e) {
logger.warn("Failed to refresh users: {}", e.toString());
}
} catch (IOException e) {
logger.warn("Failed to refresh users: {}", e.toString());
} else {
logger.info("No pending updates found. Not attempting to refresh users.");
}
},0, 5, TimeUnit.MINUTES);
}, 0, 5, TimeUnit.MINUTES);
}

@Override
Expand All @@ -78,7 +83,9 @@ private FitbitRestSourceConnectorConfig getConfig(Map<String, String> conf, bool

@Override
public FitbitRestSourceConnectorConfig getConfig(Map<String, String> conf) {
return getConfig(conf, true);
FitbitRestSourceConnectorConfig connectorConfig = getConfig(conf, true);
repository = connectorConfig.getUserRepository(repository);
return connectorConfig;
}

@Override
Expand All @@ -94,14 +101,16 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
private List<Map<String, String>> configureTasks(int maxTasks) {
Map<String, String> baseConfig = config.originalsStrings();
FitbitRestSourceConnectorConfig fitbitConfig = getConfig(baseConfig);
if (repository == null) {
repository = fitbitConfig.getUserRepository(null);
}
// Divide the users over tasks
try {

List<Map<String, String>> userTasks = fitbitConfig.getUserRepository().stream()
.map(User::getId)
// group users based on their hashCode
// in principle this allows for more efficient reconfigurations for a fixed number of tasks,
// since that allows existing tasks to only handle small modifications users to handle.
List<Map<String, String>> userTasks = fitbitConfig.getUserRepository(repository).stream()
.map(User::getVersionedId)
// group users based on their hashCode, in principle, this allows for more efficient
// reconfigurations for a fixed number of tasks, since that allows existing tasks to
// only handle small modifications users to handle.
.collect(Collectors.groupingBy(
u -> Math.abs(u.hashCode()) % maxTasks,
Collectors.joining(",")))
Expand All @@ -114,7 +123,7 @@ private List<Map<String, String>> configureTasks(int maxTasks) {
.collect(Collectors.toList());
this.configuredUsers = fitbitConfig.getUserRepository().stream()
.collect(Collectors.toSet());
logger.info("Received userTask Configs {}" , userTasks);
logger.info("Received userTask Configs {}", userTasks);
return userTasks;
} catch (IOException ex) {
throw new ConfigException("Cannot read users", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public OkHttpClient getClient(User user) {
public Map<String, Map<String, Object>> getPartitions(String route) {
try {
return userRepository.stream()
.collect(Collectors.toMap(User::getId, u -> getPartition(route, u)));
.collect(Collectors.toMap(User::getVersionedId, u -> getPartition(route, u)));
} catch (IOException e) {
logger.warn("Failed to initialize user partitions for route {}: {}", route, e.toString());
return Collections.emptyMap();
Expand All @@ -114,7 +114,7 @@ public Map<String, Map<String, Object>> getPartitions(String route) {

public Map<String, Object> getPartition(String route, User user) {
Map<String, Object> partition = new HashMap<>(4);
partition.put("user", user.getId());
partition.put("user", user.getVersionedId());
partition.put("route", route);
return partition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Request authenticate(Route requestRoute, Response response) throws IOExce
.header("Authorization", "Bearer " + newAccessToken)
.build();
} catch (NotAuthorizedException ex) {
logger.error("Cannot get a new refresh token for user {}. Cancelling request.", user);
logger.error("Cannot get a new refresh token for user {}. Cancelling request.", user, ex);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void initialize(RestSourceConnectorConfig config) {
@Override
public void requestSucceeded(RestRequest request, SourceRecord record) {
lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll);
String userKey = ((FitbitRestRequest) request).getUser().getId();
String userKey = ((FitbitRestRequest) request).getUser().getVersionedId();
Instant offset = Instant.ofEpochMilli((Long) record.sourceOffset().get(TIMESTAMP_OFFSET_KEY));
offsets.put(userKey, offset);
}
Expand All @@ -154,7 +154,7 @@ public void requestEmpty(RestRequest request) {
FitbitRestRequest fitbitRequest = (FitbitRestRequest) request;
Instant endOffset = fitbitRequest.getDateRange().end().toInstant();
if (DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) {
String key = fitbitRequest.getUser().getId();
String key = fitbitRequest.getUser().getVersionedId();
offsets.put(key, endOffset);
}
}
Expand Down Expand Up @@ -197,7 +197,7 @@ public Stream<FitbitRestRequest> requests() {
return userRepository.stream()
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
.filter(u -> lastPoll.isAfter(u.getValue()))
.sorted(Comparator.comparing(Map.Entry::getValue))
.sorted(Map.Entry.comparingByValue())
.flatMap(u -> this.createRequests(u.getKey()))
.filter(Objects::nonNull);
} catch (IOException e) {
Expand All @@ -216,7 +216,7 @@ public Instant getTimeOfNextRequest() {
}

private Map<String, Object> getPartition(User user) {
return partitions.computeIfAbsent(user.getId(),
return partitions.computeIfAbsent(user.getVersionedId(),
k -> generator.getPartition(routeName, user));
}

Expand Down Expand Up @@ -279,7 +279,7 @@ public Instant getLastPoll() {
}

protected Instant getOffset(User user) {
return offsets.getOrDefault(user.getId(), user.getStartDate().minus(ONE_NANO));
return offsets.getOrDefault(user.getVersionedId(), user.getStartDate().minus(ONE_NANO));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class LocalUser implements User {
private static final Pattern ILLEGAL_CHARACTERS_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]");
private String id;
private String version;
private String externalUserId;
private String projectId;
private String userId;
Expand Down Expand Up @@ -94,9 +95,19 @@ public void setFitbitUserId(String id) {
this.externalUserId = id;
}

@Override
public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

public LocalUser copy() {
LocalUser copy = new LocalUser();
copy.id = id;
copy.version = version;
copy.externalUserId = externalUserId;
copy.projectId = projectId;
copy.userId = userId;
Expand All @@ -116,7 +127,8 @@ public synchronized SchemaAndValue getObservationKey(AvroData avroData) {

@Override
public String toString() {
return "LocalUser{" + "id='" + id + '\''
return "LocalUser{id='" + id + '\''
+ ", version='" + version + '\''
+ ", externalUserId='" + externalUserId + '\''
+ ", projectId='" + projectId + '\''
+ ", userId='" + userId + '\''
Expand All @@ -134,19 +146,17 @@ public boolean equals(Object o) {
}
LocalUser localUser = (LocalUser) o;
return Objects.equals(id, localUser.id)
&& Objects.equals(version, localUser.version)
&& Objects.equals(externalUserId, localUser.externalUserId)
&& Objects.equals(projectId, localUser.projectId)
&& Objects.equals(userId, localUser.userId)
&& Objects.equals(sourceId, localUser.sourceId)
&& Objects.equals(startDate, localUser.startDate)
&& Objects.equals(endDate, localUser.endDate)
&& Objects.equals(observationKey, localUser.observationKey);
&& Objects.equals(endDate, localUser.endDate);
}

@Override
public int hashCode() {

return Objects
.hash(id, externalUserId, projectId, userId, sourceId, startDate, endDate, observationKey);
return Objects.hash(id, version);
}
}
Loading

0 comments on commit 743ebeb

Please sign in to comment.