Skip to content

Commit

Permalink
Merge pull request #136 from RADAR-base/release-0.3.6
Browse files Browse the repository at this point in the history
Release 0.3.6
  • Loading branch information
blootsvoets committed Aug 9, 2018
2 parents 420d62c + 3a499fc commit af4e13c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
2 changes: 1 addition & 1 deletion java-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ subprojects {
apply plugin: 'idea'

// Configuration
version = '0.3.5'
version = '0.3.6-SNAPSHOT'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-base/RADAR-Schemas'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClientException;
import net.sourceforge.argparse4j.inf.ArgumentParser;
Expand All @@ -24,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters$;
import scala.collection.Seq;

/**
* Registers Kafka topics with Zookeeper.
Expand Down Expand Up @@ -62,8 +64,8 @@ public boolean waitForBrokers(int brokers) throws InterruptedException,
List<Broker> brokerList;
try {
// convert Scala sequence of servers to Java
brokerList = JavaConverters$.MODULE$
.seqAsJavaList(zkClient.getAllBrokersInCluster());
brokerList = asStream(zkClient.getAllBrokersInCluster())
.collect(Collectors.toList());
} catch (ZooKeeperClientException ex) {
logger.warn("Failed to reach zookeeper");
brokerList = Collections.emptyList();
Expand All @@ -74,7 +76,9 @@ public boolean waitForBrokers(int brokers) throws InterruptedException,
logger.info("Kafka brokers available. Starting topic creation.");

String bootstrapServers = brokerList.stream()
.map(b -> b.endPoints().mkString(","))
.map(Broker::endPoints)
.flatMap(KafkaTopics::asStream)
.map(EndPoint::connectionString)
.collect(Collectors.joining(","));

kafkaClient = AdminClient.create(Collections.singletonMap(
Expand All @@ -97,6 +101,10 @@ public boolean waitForBrokers(int brokers) throws InterruptedException,
return brokersAvailable;
}

private static <T> Stream<T> asStream(Seq<T> stream) {
return JavaConverters$.MODULE$.seqAsJavaList(stream).stream();
}

/**
* Create all topics in a catalogue.
* @param catalogue source catalogue to extract topic names from
Expand Down

0 comments on commit af4e13c

Please sign in to comment.