Calculating speed, bearing and distance using Kafka Streams Processor API

Maciej Szymczyk
6 min readApr 30, 2020

Sometimes the classic DSL Kafka is not enough for us. The Processor API allows you to freely define the processor, and best of all, use the State Store. In this case, we will calculate the speed, direction and distance of public transport vehicles in Warsaw.

Data Source

In this article I will use records of city buses and trams in Warsaw. You can find the Open Data API of Warsaw here. A single bus / tram record looks like this:

{"Lines":"130","Lon":21.003338,"VehicleNumber":"1000","Time":"2020-04-23 20:22:55","Lat":52.206166,"Brigade":"3"}

Big Picture

The plan is as follows:

  • Producer sends record to Kafka
  • Kafka Streams filters duplicates and old records, then calculates speed, distance and direction
  • The consumer retrieves records from Kafka and puts them on Elasticsearch
  • We admire the maps and dashboards

Here we will focus on the part related to Kafka Streams.

Why the Processor API?

It allows the use of the State Store, i.e. saving the record / stream status. In my article on Apache Spark, I used Window Functions. Here the mechanism does not rely on micro batch, so we will be able to precisely process record by record. My plan is to put the last record on the KeyValueStore and compare it with the next one.

Reading data from Kafka

We have to write a class corresponding to the records from Kafka, that’s why this intermediate class InputZtmRecord was created.

public class InputZtmRecord {@SerializedName("Lines")public String lines;@SerializedName("Lon")public double lon;@SerializedName("Lat")public double lat;@SerializedName("VehicleNumber")public String vehicleNumber;@SerializedName("Brigade")public String brigade;@SerializedName("Time")public Date time;}

Ultimately, we map it to the target ZtmRecord class.

public class ZtmRecord {public String lines;public double lon;public double lat;public String vehicleNumber;public String brigade;public Date time;public double speed;public double distance;public double bearing;}

I wanted to keep some flexibility between classes. I also experimented with different types of dates, but eventually I stayed with the old java.util.Date due to some SerDe issues. The deserializer maps one class to another.

@Testpublic void ztmRecordSerializationWorks(){InputZtmRecordDeserializer deserializer = new InputZtmRecordDeserializer();String rawRecord = "{\"Lines\": \"204\", \"Lon\": 21.043399, \"VehicleNumber\": \"1042\", \"Time\": \"2020-04-24 21:14:34\", \"Lat\": 52.26617, \"Brigade\": \"04\"}";InputZtmRecord record =  deserializer.deserialize(null, rawRecord.getBytes());Assert.assertEquals("204",record.lines);Assert.assertEquals(21.043399,record.lon,0.0001);Assert.assertEquals(52.26617,record.lat,0.0001);Assert.assertEquals("04",record.brigade);Assert.assertEquals("1042",record.vehicleNumber);Assert.assertEquals("204",record.lines);Assert.assertEquals(Date.from(LocalDateTime.of(2020,04,24,21,14,34).atZone(ZoneOffset.systemDefault()).toInstant()),record.time);}

Topology

The approach using Process API is different from Kafka DSL. They can be combined, but here let’s focus on processes. At Kafka DSL pipe, we owe data flow. Here we also use pipe, but the sources (parents in an acyclic directed graph) of each process are simply defined by a string.

public Topology createTopology() {final StreamsBuilder builder = new StreamsBuilder();final Serde<ZtmRecord> outputZtmRecordSerde = Serdes.serdeFrom(new GenericSerializer(), new ZtmRecordDeserializer());///new ZtmRecordDeserializer());StoreBuilder ztmStoreBuilder =Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("ztmStore"),Serdes.String(),outputZtmRecordSerde);Topology topology = new Topology();topology.addSource("Source",new StringDeserializer(), new InputZtmRecordToZtmRecordDeserializer(),INPUT_TOPIC).addProcessor("ZtmProcess", () -> new ZtmProcessor(), "Source").addStateStore(ztmStoreBuilder, "ZtmProcess").addSink("Sink", OUTPUT_TOPIC, new StringSerializer(), new GenericSerializer(),"ZtmProcess");return topology;}

5–10 -> we define the state store that we will use in the processor
13 -> we define what topic we collect data from
14 -> we add the processor, we indicate its parent
15 -> we assign the previously created state store to the processor
16 -> we add the output of our stream

Processor

First, I’ll show the processor outline. Next, we will discuss the processing flow.

public class ZtmProcessor implements Processor<String, ZtmRecord> {public static final int SUSPICIOUS_SPEED = 120;private ProcessorContext context;private KeyValueStore<String, ZtmRecord> ztmRecordStore;@Overridepublic void init(ProcessorContext context) {this.context = context;ztmRecordStore = (KeyValueStore) context.getStateStore("ztmStore");}@Overridepublic void process(String key, ZtmRecord record) {// some awesome code}// some awesome code@Overridepublic void close() {}}

All magic happens in the process method, but first you need to download the StateStore with the appropriate name (line 4) and keep the reference on ProcessorContext. The context will allow reading data about the record (e.g. headers) and actions such as passing the record further.

@Overridepublic void process(String key, ZtmRecord record) {ZtmRecord previousRecord = ztmRecordStore.get(key);if (previousRecord == null) {ztmRecordStore.put(key, record);context.forward(key, record);return;}if (previousRecord.time.compareTo(record.time) >= 0) {return; // ignore old/same record}record = calculateRecord(previousRecord, record);if (record.speed > SUSPICIOUS_SPEED){return; // probably measurement error}ztmRecordStore.put(key, record);context.forward(key, record);}private ZtmRecord calculateRecord(ZtmRecord previousRecord, ZtmRecord record) {double lat1 = previousRecord.lat;double lat2 = record.lat;double lon1 = previousRecord.lon;double lon2 = record.lon;record.distance = GeoTools.calculateDistanceInKilometers(lat1, lat2, lon1, lon2);record.bearing = GeoTools.calculateBearing(lat1, lat2, lon1, lon2);record.speed = GeoTools.calculateSpeed(record.distance,previousRecord.time, record.time);return record;}

3 -> retrieve the previous record
4–8 -> if there is no predecessor, save the record in storage and forward it
9–11 -> ignoring duplicates, old records
12 -> calculation of speed, distance, direction
13–16 -> I assumed that speeds over 120 kph are GPS sensor errors.
17–10 -> saving the record to the KeyValueStore and passing it on

Probably a lot of bugs will come out, as the source will attach and put records on Elasticsearch. I transferred the calculations to a separate GeoTools class.

public class GeoTools {public static double calculateDistanceInKilometers(double lat1, double lat2, double lon1, double lon2) {if ((lat1 == lat2) &amp;&amp; (lon1 == lon2)) {return 0;} else {double theta = lon1 - lon2;double dist = Math.sin(Math.toRadians(lat1)) * Math.sin(Math.toRadians(lat2)) + Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) * Math.cos(Math.toRadians(theta));dist = Math.acos(dist);dist = Math.toDegrees(dist);dist = dist * 60 * 1.1515;dist = dist * 1.609344;return dist;}}public static double calculateBearing(double lat1, double lat2, double lon1, double lon2) {double latitude1 = Math.toRadians(lat1);double latitude2 = Math.toRadians(lat2);double longDiff = Math.toRadians(lon2 - lon1);double y = Math.sin(longDiff) * Math.cos(latitude2);double x = Math.cos(latitude1) * Math.sin(latitude2) - Math.sin(latitude1) * Math.cos(latitude2) * Math.cos(longDiff);return (Math.toDegrees(Math.atan2(y, x)) + 360) % 360;}public static double calculateSpeed(double distance, Date previousTime, Date time) {double millis = time.getTime() - previousTime.getTime();double hours = millis / (1000 * 60 * 60);if (hours <= 0) {return 0;}return distance / hours;}}

Tests

public class ZtmStreamTest {

TopologyTestDriver testDriver;

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
ZtmRecordDeserializer ztmRecordDeserializer = new ZtmRecordDeserializer();
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, ZtmRecord> outputTopic;

@Before
public void prepareTopologyTestDriver() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-ztm-stream-test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");

ZtmStream ztmStream = new ZtmStream();
Topology topology = ztmStream.createTopology();
testDriver = new TopologyTestDriver(topology, config);
inputTopic = testDriver.createInputTopic(ZtmStream.INPUT_TOPIC, stringSerializer, stringSerializer);
outputTopic = testDriver.createOutputTopic(ZtmStream.OUTPUT_TOPIC, stringDeserializer, ztmRecordDeserializer);
}

@After
public void closeTestDriver() {
testDriver.close();
}

@Test
public void streamDropsSameRecords() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
Assert.assertEquals(1, output.size());
}

@Test
public void streamDropsOldRecords() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
Assert.assertEquals(1, output.size());
}

@Test
public void streamComputesAreCorrectly() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.078823, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:34\", \"Lat\": 52.199871, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
ZtmRecord record = output.get(1);
Assert.assertTrue(record.bearing > 0);
Assert.assertTrue(record.speed > 0);
Assert.assertTrue(record.distance > 0);
Assert.assertEquals(0.1734, record.distance, 0.0001);
Assert.assertEquals(134, record.bearing, 1);
Assert.assertEquals(15.227, record.speed, 0.001);
}

}

Does it work with a real Kafka?

I wrote a simple script to throw records into Kafka in python.

import requestsimport jsonfrom kafka import KafkaProducertoken = 'paste-your-token-here'url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'bus_params = {'apikey':token,'type':1,'resource_id': resource_id}tram_params = {'apikey':token,'type':2,'resource_id': resource_id}r = requests.get(url = url, params = bus_params)data = r.json()producer = KafkaProducer(bootstrap_servers=['localhost:29092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'),key_serializer=lambda x: x)for record in data['result']:print(record)future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))result = future.get(timeout=60)

Repository

https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_201

Branch: kafka_streams_201

Summary

As seen above, it works. The next step is to connect the application to the infrastructure and check the sense of data. Maybe along the way an attempt to dockerize Kafka Streams? 😉

I want to mention the value of tests. I can’t imagine throwing manual records every time to check the implementation.

--

--

Maciej Szymczyk
Maciej Szymczyk

Written by Maciej Szymczyk

Software Developer, Big Data Engineer, Blogger (https://wiadrodanych.pl), Amateur Cyclists & Triathlete, @maciej_szymczyk

No responses yet