Skip to content

Commit

Permalink
tombstone testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreyvanhelden committed Jun 19, 2020
1 parent b241bcc commit 91760dd
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 13 deletions.
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# time-is-up


TODO
- use GEO_DISTANCE for variable GEOHASH length
- station-sink equivalent: elastic sink or websocket?
- outer join trace_to_estimate and estimate_t
- test tombstone deletes unmoved and track
- station-sink equivalent
- simplify messages
- new version tile sink connector -> load through hub command
- tombstone delete through message retention 1 hour?
- use GEO_DISTANCE for variable GEOHASH length
- order counting query
- deeplearning UDF?
- implement delete mechanism at pick-up

- https://www.confluent.io/blog/importance-of-distributed-tracing-for-apache-kafka-based-applications/
- https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/: AWS: (AWS) m3.xlarge instances with SSD.
- https://www.confluent.io/blog/importance-of-distributed-tracing-for-apache-kafka-based-applications/
- https://github.com/openzipkin-contrib/brave-kafka-interceptor
10 changes: 5 additions & 5 deletions ksqldb/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ INSERT INTO mover (id, lat, lon) VALUES ('thisisme', 0.91, 0.99);
INSERT INTO mover (id, lat, lon) VALUES ('thisisme', 1.0, 1.0);

-- mover + track = trace
CREATE STREAM trace1
CREATE STREAM trace
WITH (VALUE_FORMAT = 'protobuf',
KAFKA_TOPIC = 'trace1',
KAFKA_TOPIC = 'trace',
PARTITIONS = 2)
AS SELECT
mover.id AS mover_id,
Expand All @@ -98,12 +98,12 @@ CREATE STREAM trace1
CREATE SINK CONNECTOR tile WITH (
'tasks.max' = '1',
'connector.class' = 'guru.bonacci.kafka.connect.tile38.Tile38SinkConnector',
'topics' = 'unmoved,trace1',
'topics' = 'unmoved,trace',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.protobuf.ProtobufConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'tile38.topic.unmoved' = 'SET unmoved event.ID POINT event.LATITUDE event.LONGITUDE',
'tile38.topic.trace1' = 'SET trace event.MOVER_ID POINT event.LAT event.LON',
'tile38.topic.trace' = 'SET trace event.MOVER_ID POINT event.LAT event.LON',
'tile38.host' = 'tile38',
'tile38.port' = 9851,
'errors.tolerance' = 'all',
Expand Down Expand Up @@ -177,4 +177,4 @@ CREATE STREAM homeward
estimate.togo_ms as togo_ms
FROM trace_to_estimate AS trace
INNER JOIN estimate_t as estimate ON estimate.hashkey = trace.hashkey
PARTITION BY trace.unmoved_id;
PARTITION BY trace.tracking_number;
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
86 changes: 86 additions & 0 deletions src/main/java/guru/bonacci/timesup/streams/MoverProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package guru.bonacci.timesup.streams;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import guru.bonacci.timesup.model.TheMover.Mover;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

public class MoverProducer {

private static final Logger log = LoggerFactory.getLogger(MoverProducer.class.getName());

static final String BOOTSTRAP_SERVERS = "localhost:29092";
static final String SCHEMA_REGISTRY = "http://127.0.0.1:8081";
static final String TOPIC = "testmover";

private final KafkaSender<String, Mover> sender;
private final SimpleDateFormat dateFormat;

public MoverProducer(String bootstrapServers) {

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", SCHEMA_REGISTRY);
SenderOptions<String, Mover> senderOptions = SenderOptions.create(props);

sender = KafkaSender.create(senderOptions);
dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
}

// String loc = step.getLeft() + "," + step.getRight();
public void sendMessages(String topic, CountDownLatch latch) throws InterruptedException {
sender.send(Flux.interval(Duration.ofSeconds(1))
.map(i -> SenderRecord.create(toRecord(topic), i)))
.doOnError(e -> log.error("Send failed", e))
.subscribe(r -> {
RecordMetadata metadata = r.recordMetadata();
System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n",
r.correlationMetadata(),
metadata.topic(),
metadata.partition(),
metadata.offset(),
dateFormat.format(new Date(metadata.timestamp())));
latch.countDown();
});
}

ProducerRecord<String, Mover> toRecord(String topic) {
Mover record = Mover.newBuilder().setId("foo").setLat(1.1f).setLon(1.0f).build();
// record = null;
return new ProducerRecord<>(topic, "foo", record);
}

public void close() {
sender.close();
}

public static void main(String[] args) throws Exception {
int count = 1;
CountDownLatch latch = new CountDownLatch(count);
MoverProducer producer = new MoverProducer(BOOTSTRAP_SERVERS);
producer.sendMessages(TOPIC, latch);
latch.await(5, TimeUnit.MINUTES);
producer.close();
}
}
86 changes: 86 additions & 0 deletions src/main/java/guru/bonacci/timesup/streams/UnmovedProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package guru.bonacci.timesup.streams;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import guru.bonacci.timesup.model.TheUnmoved.Unmoved;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

public class UnmovedProducer {

private static final Logger log = LoggerFactory.getLogger(UnmovedProducer.class.getName());

static final String BOOTSTRAP_SERVERS = "localhost:29092";
static final String SCHEMA_REGISTRY = "http://127.0.0.1:8081";
static final String TOPIC = "testunmoved";

private final KafkaSender<String, Unmoved> sender;
private final SimpleDateFormat dateFormat;

public UnmovedProducer(String bootstrapServers) {

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", SCHEMA_REGISTRY);
SenderOptions<String, Unmoved> senderOptions = SenderOptions.create(props);

sender = KafkaSender.create(senderOptions);
dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
}

// String loc = step.getLeft() + "," + step.getRight();
public void sendMessages(String topic, CountDownLatch latch) throws InterruptedException {
sender.send(Flux.interval(Duration.ofSeconds(1))
.map(i -> SenderRecord.create(toRecord(topic), i)))
.doOnError(e -> log.error("Send failed", e))
.subscribe(r -> {
RecordMetadata metadata = r.recordMetadata();
System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n",
r.correlationMetadata(),
metadata.topic(),
metadata.partition(),
metadata.offset(),
dateFormat.format(new Date(metadata.timestamp())));
latch.countDown();
});
}

ProducerRecord<String, Unmoved> toRecord(String topic) {
Unmoved record = Unmoved.newBuilder().setId("bar").setLatitude(1.0f).setLongitude(1.0f).build();
// record = null;
return new ProducerRecord<>(topic, "foo", record);
}

public void close() {
sender.close();
}

public static void main(String[] args) throws Exception {
int count = 1;
CountDownLatch latch = new CountDownLatch(count);
UnmovedProducer producer = new UnmovedProducer(BOOTSTRAP_SERVERS);
producer.sendMessages(TOPIC, latch);
latch.await(5, TimeUnit.MINUTES);
producer.close();
}
}
1 change: 0 additions & 1 deletion src/main/protobuf/the-unmoved.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ option java_outer_classname = "TheUnmoved";

message Unmoved {
required string id = 1;
optional string name = 2;
required float latitude = 3;
required float longitude = 4;
}

0 comments on commit 91760dd

Please sign in to comment.