Use Case
1. Messages are streamed to a Kafka topic
2. We need to generate timeout event for every message after X minute
Approach:
1. From kafka source topic we will stream every message using kafka stream api. Message key and value is string.
2. Group all messages (those are key and value of String)by arrived timestamp and create a map i.e
arrived timestamp -> Map<Key, Value>
3. Run schedule every 1 minute
4. Check if (current time — arrived timestamp) >= timeout
(i.e timeout threshold have reached )
a) yes — forward the message and delete from state store
b) no — do nothing
Technology used kafka stream, state store , kafka processor api
- Building a kafka topology to read from input topic with persistent state store (providing keySerde, valueSerde), apply processor and finally write to sink
2. Processor
Below is the processor class where we are having business logic of setting schedule/cron that run every 1 minute
Here we need to override init() where we get the state store and schedule to run every 1 minute. Now every 1 minute the punctuate method will be called.
3. Related Serdes(Serializer and desrializer)
3.1) Serdes for state store
key serde: Serdes.Long() // kafka
value serde : new MapStringSerde()
public static final class MapStringSerde extends Serdes.WrapperSerde<Map<String,String>>{
MapStringSerde(){
super(new MapSerializer(), new MapDeserializer());
}
}
3.2) Serializer for Sink
key Serializer : new LongSerializer()
value Serializer: , new MapSerializer()
4. Creating stream with topology and starting the stream.
final KafkaStreams streams = new KafkaStreams(createDelayTopologyWithStateStore(), Config.getStreamsConfig());streams.start();
Once we start the stream it will read from kafka and run schedule every 2 min and forward the timeout message to forward topic