See Continuous Code Improvement in Action. Join Rollbar's live product demo!
March 10th, 2020 • By Jon de Andrés Frías
In this two-part series of blog posts, we’ll explain how Kafka has helped us in removing parts of our architecture that we consider to be “legacy”.
During the development of a project sometimes we need to take decisions on our architecture or software design that may not be the best decisions from a pure and perfectionist technical perspective. However, the compromise between the business needs and the engineering solution might push you to adopt a particular solution. That’s how companies create technical debt, that at some stage is considered legacy software or legacy design.
Eventually, a company can decide to address that technical debt and invest resources in removing it. At Rollbar we are using Kafka to improve our service that ingests data in our databases.
The first stage of our events processing pipeline, and the ingestion service, is our API. These are few of the main responsibilities of the API:
You can see that storing the occurrences in database is not API’s responsibility. Instead, our API writes the received payloads to disk using a Node library we wrote a while ago, batchelor. This library allow us to stream the received payloads into a file that rotates after a defined period of time or maximum file size. We call these files the offline files. Each line in the offline file represents a payload with its output
row column values:
Writing the received payloads to disk allows us to have low latencies when inserting them in batches to DB since there is no network transmission. The offline loaders processes will read those files in order of creation time, get a batch of payloads, and insert them into one of our databases. We run one offline loader process per GCE instance, which is enough to handle all our traffic and we rarely observe contention at this stage.
As explained above, we are using a very custom solution for our ingestion stage. Over time we've detected few pain points with this solution:
We thought a queue or distributed log system would help us on moving our API to Kubernetes and get rid of our very custom solution to ingest occurrences in database, so we started evaluating Kafka to replace our offline files. Along with Kafka, we were also evaluating Cloud Pub/Sub since we run Rollbar in Google Cloud Platform(GCP). However, we decided not to use GCP storage or jobs brokers solutions in order to simplify the infrastructure for our on-premises solution.
Kafka fits very well in our processing model for next reasons:
Let's go deeper into the engineering solution we adopted.
When deciding the size and configuration of the Kafka cluster, it's important to be able to save the data for the retention period in the product that an engineering team needs. Sizing is not the same when the retention period is one day or one week. At same time you need to understand the nature of the data, the number of events per second the system receives, and the size of those events.
As example, we'll assume the system receives 10000 events per second and the payload average size is around 2.5KB. With the payload average size and and the events per second we receive, we can calculate the ingestion throughput as 10,000 * 2500 = 24,400 KB/s, around 24MB/s of input throughput.
We want to keep a whole week of data in our Kafka cluster, that means we need to store at least 24,400KB * 60 seconds * 60 minutes * 24 hours * 7 days = 14757120000KB = 13.74TB. So the topic size will be 13.74TB at the end of a week and that'd be the needed disk size if we don't use replication at all.
For high availability production systems, the recommended replication factor is at least 3, that means our topic will use 3 * 13.74TB = 41.22TB. Since the number of nodes should be higher than the replication factor we can use 4 nodes for now.
Assuming the replicas are equally distributed in the cluster, we'll need at least 41.22TB / 4 nodes = 10.3TB in each broker. That'd be enough for the topic we've described, however in the system we may have more topics that will increase the needed disk size in each broker. In that case, we'd have 2 options:
The first option might be more expensive since you need to add more nodes to your cluster but at same time you'll improve its availability.
The number of partitions of the topic will define the grade of parallelization for the topic, that means, the number of processes for the same consumer group that will consume messages from the topic. If the topic has N partitions, you can run up to N processes for the same consumer group, one process per partition.
A way to choose the number of partitions is understanding the performance of the producer and consumer. Most of the times the consumer will be slower than the producer, so you'll want to do some processing for every batch you consume from the topic.
kafka-producer-perf-test.sh will allow you to measure the performance of the cluster when producing messages:
./kafka-producer-perf-test.sh \ --topic test \ --num-records 10000 \ -- payload-file ./payload.json \ --throughput -1 \ --producer-props acks=1 \ bootstrap.servers=kafka:9092 \
In order to measure your consumer performance you can prepare a testing environment with one topic having a single partition. That means you'd have a single process consuming from a single partition and you can know the real performance of our consumer. You can do different tests for different batch sizes, example:
|Batch Size||Consumer Throughput|
The consumer is able to process around 2650 msgs/s with a batch size of 2000, you can size the topic based on the number of incoming events, 10,000/s, but it's always a good idea to think about the future and prepare your topic for the moment your incoming traffic grows, ex: 20,000 incoming events per second. The number of processes needed for that throughput would be 20,000 / 2650 = 7.5.
Kafka has tons of settings you can configure and it can be intimidating initially. Depending on your configuration, your Kafka cluster will provide more throughput, less latency, more availability. Also, as commented before, you can configure the retention period of your data, and depending on that setting you'll need to size your cluster differently. These are few of the settings we found important and didn't use their default values:
auto.create.topics.enablewill allow the broker to create a new topic if it doesn't exist when receiving the first message. We set this one to
falseso we prevent to create topics with
num.partitionsdefault number of partitions. This forces you to define the number of partitions for your topic and avoid having a system working with the incorrect number of partitions.
min.insync.replicasdefines the minimum number of in-sync replicas needed when a producer publishes a message. A higher value will provide higher availability and more latency, while a lower value will provide less latency since the partition leader will wait for less brokers to replicate the message. If the number of in-sync replicas doesn’t reach the configured value, the message will not be published in the topic.
log.retention.hourswill define the number of hours Kafka will keep your incoming message in the broker. Setting a high value will allow you to process very old data but you'll need to size your cluster properly and that will be more expensive.
Our API was written using Node.js so we needed to chose between one of the existing Node.js libraries available for Kafka:
kafkajs is the one with more activity in GitHub but we decided to choose
node-rdkafka for a few reasons:
Our producing logic in our API is simple, we prepare a message with the ingestion schema we defined which is serialized as JSON and we publish it to the correct partition using the related account ID as message key, that means all messages for the same account are published to the same partition. Publishing all messages of an account will provide the correct events order within all the projects of the account and decrement the per-account parallel operations in next stages of the pipeline.
Beside the broker settings you'll also want to configure your producer depending on your needs. A couple of important settings for which we don't use their default values are:
request.required.ackssets the number brokers that acknowledge a produced message to considered it to be persisted in the cluster. Value
-1makes a message to be acknowledged by all brokers in the cluster. A high value will provide less throughput but ensure that a message is received and committed by all brokers.
message.max.bytesis the maximum allowed message size. Default is
1000012which is too low for our requirements.
While Kafka is not a complicated technology to use, it's true that is important taking care of few broker and producer configuration settings so you achieve the behavior and performance your product expects. It's important to understand how your incoming traffic is and the throughput your system is expected to give.
In this post we’ve covered the broker configuration, topics design, and producer configuration. In next one, we’ll go through our consumer logic, how we monitor it and how we deployed the new system.