-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
jeffreyvanhelden
committed
Jun 19, 2020
1 parent
b241bcc
commit 91760dd
Showing
6 changed files
with
187 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,14 @@ | ||
# time-is-up | ||
|
||
|
||
TODO | ||
- use GEO_DISTANCE for variable GEOHASH length | ||
- station-sink equivalent: elastic sink or websocket? | ||
- outer join trace_to_estimate and estimate_t | ||
- test tombstone deletes unmoved and track | ||
- station-sink equivalent | ||
- simplify messages | ||
- new version tile sink connector -> load through hub command | ||
- tombstone delete through message retention 1 hour? | ||
- use GEO_DISTANCE for variable GEOHASH length | ||
- order counting query | ||
- deeplearning UDF? | ||
- implement delete mechanism at pick-up | ||
|
||
- https://www.confluent.io/blog/importance-of-distributed-tracing-for-apache-kafka-based-applications/ | ||
- https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/: AWS: (AWS) m3.xlarge instances with SSD. | ||
- https://www.confluent.io/blog/importance-of-distributed-tracing-for-apache-kafka-based-applications/ | ||
- https://github.com/openzipkin-contrib/brave-kafka-interceptor |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
86 changes: 86 additions & 0 deletions
86
src/main/java/guru/bonacci/timesup/streams/MoverProducer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package guru.bonacci.timesup.streams; | ||
|
||
import java.text.SimpleDateFormat; | ||
import java.time.Duration; | ||
import java.util.Date; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import guru.bonacci.timesup.model.TheMover.Mover; | ||
import reactor.core.publisher.Flux; | ||
import reactor.kafka.sender.KafkaSender; | ||
import reactor.kafka.sender.SenderOptions; | ||
import reactor.kafka.sender.SenderRecord; | ||
|
||
public class MoverProducer { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(MoverProducer.class.getName()); | ||
|
||
static final String BOOTSTRAP_SERVERS = "localhost:29092"; | ||
static final String SCHEMA_REGISTRY = "http://127.0.0.1:8081"; | ||
static final String TOPIC = "testmover"; | ||
|
||
private final KafkaSender<String, Mover> sender; | ||
private final SimpleDateFormat dateFormat; | ||
|
||
public MoverProducer(String bootstrapServers) { | ||
|
||
Map<String, Object> props = new HashMap<>(); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer"); | ||
props.put(ProducerConfig.ACKS_CONFIG, "all"); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | ||
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"); | ||
props.put("schema.registry.url", SCHEMA_REGISTRY); | ||
SenderOptions<String, Mover> senderOptions = SenderOptions.create(props); | ||
|
||
sender = KafkaSender.create(senderOptions); | ||
dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy"); | ||
} | ||
|
||
// String loc = step.getLeft() + "," + step.getRight(); | ||
public void sendMessages(String topic, CountDownLatch latch) throws InterruptedException { | ||
sender.send(Flux.interval(Duration.ofSeconds(1)) | ||
.map(i -> SenderRecord.create(toRecord(topic), i))) | ||
.doOnError(e -> log.error("Send failed", e)) | ||
.subscribe(r -> { | ||
RecordMetadata metadata = r.recordMetadata(); | ||
System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n", | ||
r.correlationMetadata(), | ||
metadata.topic(), | ||
metadata.partition(), | ||
metadata.offset(), | ||
dateFormat.format(new Date(metadata.timestamp()))); | ||
latch.countDown(); | ||
}); | ||
} | ||
|
||
ProducerRecord<String, Mover> toRecord(String topic) { | ||
Mover record = Mover.newBuilder().setId("foo").setLat(1.1f).setLon(1.0f).build(); | ||
// record = null; | ||
return new ProducerRecord<>(topic, "foo", record); | ||
} | ||
|
||
public void close() { | ||
sender.close(); | ||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
int count = 1; | ||
CountDownLatch latch = new CountDownLatch(count); | ||
MoverProducer producer = new MoverProducer(BOOTSTRAP_SERVERS); | ||
producer.sendMessages(TOPIC, latch); | ||
latch.await(5, TimeUnit.MINUTES); | ||
producer.close(); | ||
} | ||
} |
86 changes: 86 additions & 0 deletions
86
src/main/java/guru/bonacci/timesup/streams/UnmovedProducer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package guru.bonacci.timesup.streams; | ||
|
||
import java.text.SimpleDateFormat; | ||
import java.time.Duration; | ||
import java.util.Date; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import guru.bonacci.timesup.model.TheUnmoved.Unmoved; | ||
import reactor.core.publisher.Flux; | ||
import reactor.kafka.sender.KafkaSender; | ||
import reactor.kafka.sender.SenderOptions; | ||
import reactor.kafka.sender.SenderRecord; | ||
|
||
public class UnmovedProducer { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(UnmovedProducer.class.getName()); | ||
|
||
static final String BOOTSTRAP_SERVERS = "localhost:29092"; | ||
static final String SCHEMA_REGISTRY = "http://127.0.0.1:8081"; | ||
static final String TOPIC = "testunmoved"; | ||
|
||
private final KafkaSender<String, Unmoved> sender; | ||
private final SimpleDateFormat dateFormat; | ||
|
||
public UnmovedProducer(String bootstrapServers) { | ||
|
||
Map<String, Object> props = new HashMap<>(); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer"); | ||
props.put(ProducerConfig.ACKS_CONFIG, "all"); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | ||
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"); | ||
props.put("schema.registry.url", SCHEMA_REGISTRY); | ||
SenderOptions<String, Unmoved> senderOptions = SenderOptions.create(props); | ||
|
||
sender = KafkaSender.create(senderOptions); | ||
dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy"); | ||
} | ||
|
||
// String loc = step.getLeft() + "," + step.getRight(); | ||
public void sendMessages(String topic, CountDownLatch latch) throws InterruptedException { | ||
sender.send(Flux.interval(Duration.ofSeconds(1)) | ||
.map(i -> SenderRecord.create(toRecord(topic), i))) | ||
.doOnError(e -> log.error("Send failed", e)) | ||
.subscribe(r -> { | ||
RecordMetadata metadata = r.recordMetadata(); | ||
System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n", | ||
r.correlationMetadata(), | ||
metadata.topic(), | ||
metadata.partition(), | ||
metadata.offset(), | ||
dateFormat.format(new Date(metadata.timestamp()))); | ||
latch.countDown(); | ||
}); | ||
} | ||
|
||
ProducerRecord<String, Unmoved> toRecord(String topic) { | ||
Unmoved record = Unmoved.newBuilder().setId("bar").setLatitude(1.0f).setLongitude(1.0f).build(); | ||
// record = null; | ||
return new ProducerRecord<>(topic, "foo", record); | ||
} | ||
|
||
public void close() { | ||
sender.close(); | ||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
int count = 1; | ||
CountDownLatch latch = new CountDownLatch(count); | ||
UnmovedProducer producer = new UnmovedProducer(BOOTSTRAP_SERVERS); | ||
producer.sendMessages(TOPIC, latch); | ||
latch.await(5, TimeUnit.MINUTES); | ||
producer.close(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters