diff --git a/README.md b/README.md
index a03f710..68f7f59 100644
--- a/README.md
+++ b/README.md
@@ -1,16 +1,14 @@
# time-is-up
-
TODO
-- use GEO_DISTANCE for variable GEOHASH length
-- station-sink equivalent: elastic sink or websocket?
+- outer join trace_to_estimate and estimate_t
+- test tombstone deletes unmoved and track
+- station-sink equivalent
- simplify messages
- new version tile sink connector -> load through hub command
-- tombstone delete through message retention 1 hour?
+- use GEO_DISTANCE for variable GEOHASH length
- order counting query
-- deeplearning UDF?
+- implement delete mechanism at pick-up
-- https://www.confluent.io/blog/importance-of-distributed-tracing-for-apache-kafka-based-applications/
-- https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/: AWS: (AWS) m3.xlarge instances with SSD.
- https://www.confluent.io/blog/importance-of-distributed-tracing-for-apache-kafka-based-applications/
- https://github.com/openzipkin-contrib/brave-kafka-interceptor
diff --git a/ksqldb/queries.sql b/ksqldb/queries.sql
index c9d8c42..3e9b69d 100644
--- a/ksqldb/queries.sql
+++ b/ksqldb/queries.sql
@@ -79,9 +79,9 @@ INSERT INTO mover (id, lat, lon) VALUES ('thisisme', 0.91, 0.99);
INSERT INTO mover (id, lat, lon) VALUES ('thisisme', 1.0, 1.0);
-- mover + track = trace
-CREATE STREAM trace1
+CREATE STREAM trace
WITH (VALUE_FORMAT = 'protobuf',
- KAFKA_TOPIC = 'trace1',
+ KAFKA_TOPIC = 'trace',
PARTITIONS = 2)
AS SELECT
mover.id AS mover_id,
@@ -98,12 +98,12 @@ CREATE STREAM trace1
CREATE SINK CONNECTOR tile WITH (
'tasks.max' = '1',
'connector.class' = 'guru.bonacci.kafka.connect.tile38.Tile38SinkConnector',
- 'topics' = 'unmoved,trace1',
+ 'topics' = 'unmoved,trace',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.protobuf.ProtobufConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'tile38.topic.unmoved' = 'SET unmoved event.ID POINT event.LATITUDE event.LONGITUDE',
- 'tile38.topic.trace1' = 'SET trace event.MOVER_ID POINT event.LAT event.LON',
+ 'tile38.topic.trace' = 'SET trace event.MOVER_ID POINT event.LAT event.LON',
'tile38.host' = 'tile38',
'tile38.port' = 9851,
'errors.tolerance' = 'all',
@@ -177,4 +177,4 @@ CREATE STREAM homeward
estimate.togo_ms as togo_ms
FROM trace_to_estimate AS trace
INNER JOIN estimate_t as estimate ON estimate.hashkey = trace.hashkey
- PARTITION BY trace.unmoved_id;
+ PARTITION BY trace.tracking_number;
diff --git a/pom.xml b/pom.xml
index 07ec5f6..7e26415 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,11 @@
commons-lang3
3.10
+
+ io.projectreactor.kafka
+ reactor-kafka
+ 1.2.2.RELEASE
+
com.google.protobuf
protobuf-java
diff --git a/src/main/java/guru/bonacci/timesup/streams/MoverProducer.java b/src/main/java/guru/bonacci/timesup/streams/MoverProducer.java
new file mode 100644
index 0000000..3ba48a5
--- /dev/null
+++ b/src/main/java/guru/bonacci/timesup/streams/MoverProducer.java
@@ -0,0 +1,86 @@
+package guru.bonacci.timesup.streams;
+
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import guru.bonacci.timesup.model.TheMover.Mover;
+import reactor.core.publisher.Flux;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+public class MoverProducer {
+
+ private static final Logger log = LoggerFactory.getLogger(MoverProducer.class.getName());
+
+ static final String BOOTSTRAP_SERVERS = "localhost:29092";
+ static final String SCHEMA_REGISTRY = "http://127.0.0.1:8081";
+ static final String TOPIC = "testmover";
+
+ private final KafkaSender sender;
+ private final SimpleDateFormat dateFormat;
+
+ public MoverProducer(String bootstrapServers) {
+
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
+ props.put("schema.registry.url", SCHEMA_REGISTRY);
+ SenderOptions senderOptions = SenderOptions.create(props);
+
+ sender = KafkaSender.create(senderOptions);
+ dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
+ }
+
+// String loc = step.getLeft() + "," + step.getRight();
+ public void sendMessages(String topic, CountDownLatch latch) throws InterruptedException {
+ sender.send(Flux.interval(Duration.ofSeconds(1))
+ .map(i -> SenderRecord.create(toRecord(topic), i)))
+ .doOnError(e -> log.error("Send failed", e))
+ .subscribe(r -> {
+ RecordMetadata metadata = r.recordMetadata();
+ System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n",
+ r.correlationMetadata(),
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset(),
+ dateFormat.format(new Date(metadata.timestamp())));
+ latch.countDown();
+ });
+ }
+
+ ProducerRecord toRecord(String topic) {
+ Mover record = Mover.newBuilder().setId("foo").setLat(1.1f).setLon(1.0f).build();
+// record = null;
+ return new ProducerRecord<>(topic, "foo", record);
+ }
+
+ public void close() {
+ sender.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int count = 1;
+ CountDownLatch latch = new CountDownLatch(count);
+ MoverProducer producer = new MoverProducer(BOOTSTRAP_SERVERS);
+ producer.sendMessages(TOPIC, latch);
+ latch.await(5, TimeUnit.MINUTES);
+ producer.close();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/guru/bonacci/timesup/streams/UnmovedProducer.java b/src/main/java/guru/bonacci/timesup/streams/UnmovedProducer.java
new file mode 100644
index 0000000..75bab49
--- /dev/null
+++ b/src/main/java/guru/bonacci/timesup/streams/UnmovedProducer.java
@@ -0,0 +1,86 @@
+package guru.bonacci.timesup.streams;
+
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import guru.bonacci.timesup.model.TheUnmoved.Unmoved;
+import reactor.core.publisher.Flux;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+public class UnmovedProducer {
+
+ private static final Logger log = LoggerFactory.getLogger(UnmovedProducer.class.getName());
+
+ static final String BOOTSTRAP_SERVERS = "localhost:29092";
+ static final String SCHEMA_REGISTRY = "http://127.0.0.1:8081";
+ static final String TOPIC = "testunmoved";
+
+ private final KafkaSender sender;
+ private final SimpleDateFormat dateFormat;
+
+ public UnmovedProducer(String bootstrapServers) {
+
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
+ props.put("schema.registry.url", SCHEMA_REGISTRY);
+ SenderOptions senderOptions = SenderOptions.create(props);
+
+ sender = KafkaSender.create(senderOptions);
+ dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
+ }
+
+// String loc = step.getLeft() + "," + step.getRight();
+ public void sendMessages(String topic, CountDownLatch latch) throws InterruptedException {
+ sender.send(Flux.interval(Duration.ofSeconds(1))
+ .map(i -> SenderRecord.create(toRecord(topic), i)))
+ .doOnError(e -> log.error("Send failed", e))
+ .subscribe(r -> {
+ RecordMetadata metadata = r.recordMetadata();
+ System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n",
+ r.correlationMetadata(),
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset(),
+ dateFormat.format(new Date(metadata.timestamp())));
+ latch.countDown();
+ });
+ }
+
+ ProducerRecord toRecord(String topic) {
+ Unmoved record = Unmoved.newBuilder().setId("bar").setLatitude(1.0f).setLongitude(1.0f).build();
+// record = null;
+ return new ProducerRecord<>(topic, "foo", record);
+ }
+
+ public void close() {
+ sender.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int count = 1;
+ CountDownLatch latch = new CountDownLatch(count);
+ UnmovedProducer producer = new UnmovedProducer(BOOTSTRAP_SERVERS);
+ producer.sendMessages(TOPIC, latch);
+ latch.await(5, TimeUnit.MINUTES);
+ producer.close();
+ }
+}
\ No newline at end of file
diff --git a/src/main/protobuf/the-unmoved.proto b/src/main/protobuf/the-unmoved.proto
index 3011db7..b1dc8ca 100644
--- a/src/main/protobuf/the-unmoved.proto
+++ b/src/main/protobuf/the-unmoved.proto
@@ -7,7 +7,6 @@ option java_outer_classname = "TheUnmoved";
message Unmoved {
required string id = 1;
- optional string name = 2;
required float latitude = 3;
required float longitude = 4;
}