CEP: Detecting Missing Events

04 Dec, 2018 | 3 minutes read

Real-time stream processing analytics is of paramount significance for today’s agile business. There are many solutions out there like:

https://aws.amazon.com/kinesis/data-analytics/
https://www.confluent.io/product/ksql/

Amazon Kinesis Data Analytics is available for a couple of years.  Kafka’s KSQL has been released recently. Both products support SQL like continuous queries for data filtering,  transformation,  aggregations,  windowing etc.

However, it seems, up to my knowledge, one use case is not supported even if it is mentioned as a possible one in their documentation. That use case is detecting missing events. In particular, through SQL like stream joins you can match an order with shipment. However, I’m not aware that you can detect missing events. For example, this could be the scenario where we shall raise an alert if there is no shipment 2 hours after an order has been received on the stream.

Surprisingly (or not) one open source CEP engine released under Apache V2 License supports the use case of detecting missing events. That product is Siddhi, part of the WSO2 platform:
https://wso2.github.io/siddhi/documentation/siddhi-quckstart-4.0/

You can download and try the “Hello World” example in a couple of minutes as explained in the article above.

WSO2 CEP documentation contains a recipe how to detect non-occurrences through pattern matching:
https://docs.wso2.com/display/CEP400/Sample+0111+-+Detecting+non-occurrences+with+Patterns
Let’s modify a little bit this example and run it in the Siddhi editor that contains input stream simulator as well.

The Siddhi application is:

@App:name("DeliveryDetectApp")
@App:description("Delivered and Undelivered Arrivals Detection Application")

define stream arrivals_stream (trackingId string, customerName string);

define stream deliveries_stream (trackingId string);

define stream filter_stream (trackingId string, customerName string, deliveryId string);

@sink(type='log', prefix='DELIVERY_ALERT')
define stream alert_stream (trackingId string, customerName string);

@sink(type='log', prefix='DELIVERY_SUCCESS')
define stream success_stream (trackingId string, customerName string);

from arrivals_stream#window.time(1 minutes)
select *
insert expired events into overdue_deliveries_stream;
 
 
from every arrivalEvent = arrivals_stream ->
    deliveryEvent = deliveries_stream[arrivalEvent.trackingId == trackingId]
    or 
    overdue_delivery = overdue_deliveries_stream[arrivalEvent.trackingId == trackingId]
select 
    arrivalEvent.trackingId as trackingId, arrivalEvent.customerName as customerName, deliveryEvent.trackingId as deliveryId
insert into filter_stream;
 
@info(name='Undelivered') 
from filter_stream [(deliveryId is null)] 
select 
    trackingId, customerName
insert into alert_stream;

@info(name='Delivered') 
from filter_stream [not (deliveryId is null)] 
select 
    trackingId, customerName
insert into success_stream;

The trick here is defining the stream of expired events:

<script src=”https://gist.github.com/renatagosevska/f246ebcc1ce5dc96bea1a70305092344.js”></script>

Once we have this stream, we can apply pattern matching that will try to match the arrival event with:

  • Delivery event with the same tracking id
  • Overdue event with the same tracking id

You can try the whole scenario in the Siddhi simulator by running the application. In the simulator, enter the following events at this particular order:

arrivals_stream: trackingId=100, customerName=Marjan
arrivals_stream: trackingId=200, customerName=Marjan
arrivals_stream: trackingId=300, customerName=Marjan
arrivals_stream: trackingId=400, customerName=Marjan
deliveries_stream: trackingId=100
deliveries_stream:trackingId=400

Wait a minute after that. The log output will look similar to this one:

[2018-12-03_13-51-32_510] INFO  - DELIVERY_SUCCESS : Event{timestamp=1543841492507, data=[100, Marjan], isExpired=false} 
[2018-12-03_13-51-37_382] INFO  - DELIVERY_SUCCESS : Event{timestamp=1543841497380, data=[400, Marjan], isExpired=false} 
[2018-12-03_13-52-19_570] INFO  - DELIVERY_ALERT : Event{timestamp=1543841539569, data=[200, Marjan], isExpired=false} 
[2018-12-03_13-52-22_543] INFO  - DELIVERY_ALERT : Event{timestamp=1543841542542, data=[300, Marjan], isExpired=false}

It would be nice if we can get similar feature in KSQL in the near future. In the meantime, Siddhi provides connector for Kafka.