diff --git a/java-sdk/build.gradle b/java-sdk/build.gradle index 76d586fd..bd5362dc 100644 --- a/java-sdk/build.gradle +++ b/java-sdk/build.gradle @@ -17,7 +17,7 @@ subprojects { apply plugin: 'idea' // Configuration - version = '0.3.5' + version = '0.3.6-SNAPSHOT' group = 'org.radarcns' ext.githubRepoName = 'RADAR-base/RADAR-Schemas' diff --git a/java-sdk/radar-schemas-tools/src/main/java/org/radarcns/schema/registration/KafkaTopics.java b/java-sdk/radar-schemas-tools/src/main/java/org/radarcns/schema/registration/KafkaTopics.java index 6ab7aaaa..921cb499 100644 --- a/java-sdk/radar-schemas-tools/src/main/java/org/radarcns/schema/registration/KafkaTopics.java +++ b/java-sdk/radar-schemas-tools/src/main/java/org/radarcns/schema/registration/KafkaTopics.java @@ -11,6 +11,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import kafka.cluster.Broker; +import kafka.cluster.EndPoint; import kafka.zk.KafkaZkClient; import kafka.zookeeper.ZooKeeperClientException; import net.sourceforge.argparse4j.inf.ArgumentParser; @@ -24,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConverters$; +import scala.collection.Seq; /** * Registers Kafka topics with Zookeeper. @@ -62,8 +64,8 @@ public boolean waitForBrokers(int brokers) throws InterruptedException, List brokerList; try { // convert Scala sequence of servers to Java - brokerList = JavaConverters$.MODULE$ - .seqAsJavaList(zkClient.getAllBrokersInCluster()); + brokerList = asStream(zkClient.getAllBrokersInCluster()) + .collect(Collectors.toList()); } catch (ZooKeeperClientException ex) { logger.warn("Failed to reach zookeeper"); brokerList = Collections.emptyList(); @@ -74,7 +76,9 @@ public boolean waitForBrokers(int brokers) throws InterruptedException, logger.info("Kafka brokers available. Starting topic creation."); String bootstrapServers = brokerList.stream() - .map(b -> b.endPoints().mkString(",")) + .map(Broker::endPoints) + .flatMap(KafkaTopics::asStream) + .map(EndPoint::connectionString) .collect(Collectors.joining(",")); kafkaClient = AdminClient.create(Collections.singletonMap( @@ -97,6 +101,10 @@ public boolean waitForBrokers(int brokers) throws InterruptedException, return brokersAvailable; } + private static Stream asStream(Seq stream) { + return JavaConverters$.MODULE$.seqAsJavaList(stream).stream(); + } + /** * Create all topics in a catalogue. * @param catalogue source catalogue to extract topic names from