Since I joined Bronto last year, I’ve been spending most of my time building new reporting infrastructure for our platform. The core of our solution aims to give our customers fast reports and dashboards while ensuring correctness at every step. At Bronto’s scale, we need this system to aggregate billions of inbound marketing-related events per day. That’s a hard number to imagine, so think of it this way: we need to handle millions of events every minute, or hundreds of thousands of events per second. Horizontal scaling is a requirement in almost everything we build. To give our customers extremely fast access to their reports, we’ve built a system that can safely pre-aggregate the results for the vast majority of queries our customers request. Each pre-aggregation is composed of a metric, such as opens per delivery, and a granularity, such as hourly or all-time. Our reporting stack is able to source events from many Bronto Services and combine them into these pre-aggregations, which can in turn be served very quickly. After surveying the landscape of tools available to us, we decided to build our aggregation system on top of Kafka and Spark Streaming. In this two-part post, I’ll explain how we use these core technologies to create a system that’s safe and performant.

aggregation system

At a high level, we create a persistent log of all events in Kafka, use Spark Streaming to aggregate the events and write those aggregates to some data storage. Events from throughout Bronto go through an enrichment and validation pipeline (not pictured) before ending up in the Kafka topics used by our Spark Streaming process. The Spark Streaming process then aggregates events in the batch based on our list of supported metrics. This aggregation, based only on events from the batch, is written to our database. As noted above, for our solution, we require a transactional database to ensure safety, but these writes in particular do not have to be transactional. To understand why, we need to talk a bit about Spark Streaming and fault tolerance guarantees. Exactly-once semantics and Spark Streaming are hot topics, but the best way to understand exactly what guarantees Spark Streaming offers is to read Spark Streaming’s own documentation on fault tolerance. The key takeaway is that from an end-to-end perspective, Spark Streaming offers at-least-once semantics. If your system requires exactly-once semantics to ensure correctness, writes from your Spark workers to your database should appear to be idempotent. Our reporting platform accomplishes this by writing the results of a batch along with the batch identifier. We can then ensure that for any instance of a metric, there is only one row per batch. A simplified SQL schema might look like:

CREATE TABLE opens_per_delivery (   batch_id BIGINT,   delivery_id BIGINT,   num_opens BIGINT,   UNIQUE INDEX ix_batch_unique (batch_id, delivery_id) ); 
For a given batch identifier and delivery identifier, there can only be one aggregated count of opens. So when we execute writes against this table, we can use the unique index to block us from issuing duplicate writes. Since Spark requires computations to be deterministic, we know that our count of opens for that batch/delivery pair won’t ever change, no matter how many times we recount it. The remaining problem is one I call the “stable batches problem.” Our log of events stored in Kafka is going to be divided up into batches such that every event is in exactly one batch. In the event of a terminal fault causing a batch to fail, the batch can be retried, but it must be exactly the same batch. Spark checkpointing does this for you, but its implementation does not fit Bronto's needs. In particular, checkpoints aren’t portable across versions, configuration is locked down by the checkpoint, and there is relatively limited visibility into the batches themselves. In short, it’s a good general purpose solution that didn’t work for Bronto. Instead, we keep our own record of each batch, and implement our own retry mechanism. In general, we still rely on Spark Streaming to create the batches for us so that we can take advantage of its more sophisticated batch management features like backpressure, which was introduced in Spark 1.5. These records are stored in a database that can be queried by our Spark driver processes, as well as dashboards and administrative tools. Using stable batches allows us to get the idempotency we need to survive in cases of failure. We’ve also designed our batch management system with a focus on visibility. We can even look at logs of our Kafka data to try and figure out what data was in a batch long after that batch was processed. Having a system that can survive failures safely is a great starting place, but being able to introspect even non-critical failures in your systems is what pushes them over the line into production-ready. For more, check out part two of this series, which explains how we drastically increased the read performance of our system. See also: Cody Koeninger has done some highly insightful write-ups about fault tolerance semantics when using Spark Streaming and Kafka. Our initial design was already in place when we saw this, but it would’ve saved us a lot of time to understand this all at the start.