Calculating speed, bearing and distance using Kafka Streams Processor API

Sometimes the classic DSL Kafka is not enough for us. The Processor API allows you to freely define the processor, and best of all, use the State Store. In this case, we will calculate the speed, direction and distance of public transport vehicles in Warsaw.

Data Source

{"Lines":"130","Lon":21.003338,"VehicleNumber":"1000","Time":"2020-04-23 20:22:55","Lat":52.206166,"Brigade":"3"}

Big Picture

  • Producer sends record to Kafka
  • Kafka Streams filters duplicates and old records, then calculates speed, distance and direction
  • The consumer retrieves records from Kafka and puts them on Elasticsearch
  • We admire the maps and dashboards

Here we will focus on the part related to Kafka Streams.

Why the Processor API?

Reading data from Kafka

public class InputZtmRecord {@SerializedName("Lines")public String lines;@SerializedName("Lon")public double lon;@SerializedName("Lat")public double lat;@SerializedName("VehicleNumber")public String vehicleNumber;@SerializedName("Brigade")public String brigade;@SerializedName("Time")public Date time;}

Ultimately, we map it to the target ZtmRecord class.

public class ZtmRecord {public String lines;public double lon;public double lat;public String vehicleNumber;public String brigade;public Date time;public double speed;public double distance;public double bearing;}

I wanted to keep some flexibility between classes. I also experimented with different types of dates, but eventually I stayed with the old java.util.Date due to some SerDe issues. The deserializer maps one class to another.

@Testpublic void ztmRecordSerializationWorks(){InputZtmRecordDeserializer deserializer = new InputZtmRecordDeserializer();String rawRecord = "{\"Lines\": \"204\", \"Lon\": 21.043399, \"VehicleNumber\": \"1042\", \"Time\": \"2020-04-24 21:14:34\", \"Lat\": 52.26617, \"Brigade\": \"04\"}";InputZtmRecord record =  deserializer.deserialize(null, rawRecord.getBytes());Assert.assertEquals("204",record.lines);Assert.assertEquals(21.043399,record.lon,0.0001);Assert.assertEquals(52.26617,record.lat,0.0001);Assert.assertEquals("04",record.brigade);Assert.assertEquals("1042",record.vehicleNumber);Assert.assertEquals("204",record.lines);Assert.assertEquals(Date.from(LocalDateTime.of(2020,04,24,21,14,34).atZone(ZoneOffset.systemDefault()).toInstant()),record.time);}

Topology

public Topology createTopology() {final StreamsBuilder builder = new StreamsBuilder();final Serde<ZtmRecord> outputZtmRecordSerde = Serdes.serdeFrom(new GenericSerializer(), new ZtmRecordDeserializer());///new ZtmRecordDeserializer());StoreBuilder ztmStoreBuilder =Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("ztmStore"),Serdes.String(),outputZtmRecordSerde);Topology topology = new Topology();topology.addSource("Source",new StringDeserializer(), new InputZtmRecordToZtmRecordDeserializer(),INPUT_TOPIC).addProcessor("ZtmProcess", () -> new ZtmProcessor(), "Source").addStateStore(ztmStoreBuilder, "ZtmProcess").addSink("Sink", OUTPUT_TOPIC, new StringSerializer(), new GenericSerializer(),"ZtmProcess");return topology;}

5–10 -> we define the state store that we will use in the processor
13 -> we define what topic we collect data from
14 -> we add the processor, we indicate its parent
15 -> we assign the previously created state store to the processor
16 -> we add the output of our stream

Processor

public class ZtmProcessor implements Processor<String, ZtmRecord> {public static final int SUSPICIOUS_SPEED = 120;private ProcessorContext context;private KeyValueStore<String, ZtmRecord> ztmRecordStore;@Overridepublic void init(ProcessorContext context) {this.context = context;ztmRecordStore = (KeyValueStore) context.getStateStore("ztmStore");}@Overridepublic void process(String key, ZtmRecord record) {// some awesome code}// some awesome code@Overridepublic void close() {}}

All magic happens in the process method, but first you need to download the StateStore with the appropriate name (line 4) and keep the reference on ProcessorContext. The context will allow reading data about the record (e.g. headers) and actions such as passing the record further.

@Overridepublic void process(String key, ZtmRecord record) {ZtmRecord previousRecord = ztmRecordStore.get(key);if (previousRecord == null) {ztmRecordStore.put(key, record);context.forward(key, record);return;}if (previousRecord.time.compareTo(record.time) >= 0) {return; // ignore old/same record}record = calculateRecord(previousRecord, record);if (record.speed > SUSPICIOUS_SPEED){return; // probably measurement error}ztmRecordStore.put(key, record);context.forward(key, record);}private ZtmRecord calculateRecord(ZtmRecord previousRecord, ZtmRecord record) {double lat1 = previousRecord.lat;double lat2 = record.lat;double lon1 = previousRecord.lon;double lon2 = record.lon;record.distance = GeoTools.calculateDistanceInKilometers(lat1, lat2, lon1, lon2);record.bearing = GeoTools.calculateBearing(lat1, lat2, lon1, lon2);record.speed = GeoTools.calculateSpeed(record.distance,previousRecord.time, record.time);return record;}

3 -> retrieve the previous record
4–8 -> if there is no predecessor, save the record in storage and forward it
9–11 -> ignoring duplicates, old records
12 -> calculation of speed, distance, direction
13–16 -> I assumed that speeds over 120 kph are GPS sensor errors.
17–10 -> saving the record to the KeyValueStore and passing it on

Probably a lot of bugs will come out, as the source will attach and put records on Elasticsearch. I transferred the calculations to a separate GeoTools class.

public class GeoTools {public static double calculateDistanceInKilometers(double lat1, double lat2, double lon1, double lon2) {if ((lat1 == lat2) &amp;&amp; (lon1 == lon2)) {return 0;} else {double theta = lon1 - lon2;double dist = Math.sin(Math.toRadians(lat1)) * Math.sin(Math.toRadians(lat2)) + Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) * Math.cos(Math.toRadians(theta));dist = Math.acos(dist);dist = Math.toDegrees(dist);dist = dist * 60 * 1.1515;dist = dist * 1.609344;return dist;}}public static double calculateBearing(double lat1, double lat2, double lon1, double lon2) {double latitude1 = Math.toRadians(lat1);double latitude2 = Math.toRadians(lat2);double longDiff = Math.toRadians(lon2 - lon1);double y = Math.sin(longDiff) * Math.cos(latitude2);double x = Math.cos(latitude1) * Math.sin(latitude2) - Math.sin(latitude1) * Math.cos(latitude2) * Math.cos(longDiff);return (Math.toDegrees(Math.atan2(y, x)) + 360) % 360;}public static double calculateSpeed(double distance, Date previousTime, Date time) {double millis = time.getTime() - previousTime.getTime();double hours = millis / (1000 * 60 * 60);if (hours <= 0) {return 0;}return distance / hours;}}

Tests

public class ZtmStreamTest {

TopologyTestDriver testDriver;

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
ZtmRecordDeserializer ztmRecordDeserializer = new ZtmRecordDeserializer();
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, ZtmRecord> outputTopic;

@Before
public void prepareTopologyTestDriver() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-ztm-stream-test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");

ZtmStream ztmStream = new ZtmStream();
Topology topology = ztmStream.createTopology();
testDriver = new TopologyTestDriver(topology, config);
inputTopic = testDriver.createInputTopic(ZtmStream.INPUT_TOPIC, stringSerializer, stringSerializer);
outputTopic = testDriver.createOutputTopic(ZtmStream.OUTPUT_TOPIC, stringDeserializer, ztmRecordDeserializer);
}

@After
public void closeTestDriver() {
testDriver.close();
}

@Test
public void streamDropsSameRecords() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
Assert.assertEquals(1, output.size());
}

@Test
public void streamDropsOldRecords() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
Assert.assertEquals(1, output.size());
}

@Test
public void streamComputesAreCorrectly() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.078823, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:34\", \"Lat\": 52.199871, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
ZtmRecord record = output.get(1);
Assert.assertTrue(record.bearing > 0);
Assert.assertTrue(record.speed > 0);
Assert.assertTrue(record.distance > 0);
Assert.assertEquals(0.1734, record.distance, 0.0001);
Assert.assertEquals(134, record.bearing, 1);
Assert.assertEquals(15.227, record.speed, 0.001);
}

}

Does it work with a real Kafka?

import requestsimport jsonfrom kafka import KafkaProducertoken = 'paste-your-token-here'url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'bus_params = {'apikey':token,'type':1,'resource_id': resource_id}tram_params = {'apikey':token,'type':2,'resource_id': resource_id}r = requests.get(url = url, params = bus_params)data = r.json()producer = KafkaProducer(bootstrap_servers=['localhost:29092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x)for record in data['result']:print(record)future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))result = future.get(timeout=60)

Repository

Branch: kafka_streams_201

Summary

I want to mention the value of tests. I can’t imagine throwing manual records every time to check the implementation.

Software Developer, Big Data Engineer, Blogger (https://wiadrodanych.pl), Amateur Cyclists & Triathlete, @maciej_szymczyk