Over the last two years at LinkedIn, I’ve been working on a distributed key-value database called “Venice.” Venice is designed to be a significant improvement to Voldemort Read-Only for serving derived data. In late 2016, Venice started serving production traffic for batch use cases that were very similar to the existing uses of Voldemort Read-Only. In the time since then, we’ve built out the rest of the functionality to complete the dream that motivated the construction of Venice: the ability to consume data from both batch and streaming sources, which we’re calling “Venice Hybrid.”
In this blog post, we will cover some of the history of serving derived data and lambda architectures and explore how Venice solves the challenges inherent in combining batch and streaming data.
LinkedIn operates an online service that serves traffic to over 500 million members. The operation of this application requires two distinct types of data: primary data and derived data. Primary data is what we normally think about when storing data for a web application. For example: when you update your LinkedIn profile with your new job title, that is primary data. When someone else looks at your profile and sees that job title, we are serving them the primary data you provided.
The canonical example of derived data is the People You May Know (PYMK) feature on the LinkedIn application. Based on primary data, including connections and interests, we compute or derive a list of recommendations. These are members we think you might want to connect with on our platform. Traditionally, we recalculate derived data periodically. We take a snapshot in time of connection information, profile data, and other signals; we use that to generate a list of recommendations that correspond to that snapshot in time.
Serving derived data requires a different type of database compared to serving primary data. Members cannot change their PYMK recommendations, so we don’t need to optimize for online writes. We do need to support very low-latency online reads and we need to optimize for very high-speed bulk ingestion of complete datasets. For a long time, LinkedIn has used Voldemort Read-Only to serve derived data. Voldemort Read-Only includes the Build and Push Hadoop job to do bulk ingestion of complete datasets. The Build and Push job creates indexed database binary files for each partition on each storage node right on Hadoop. Then the job coordinates the upload of each partition to the different storage nodes. The Voldemort storage nodes can serve reads directly from these pre-indexed files, so the ingestion speed does not depend on any indexing or data organization operations on the local machine.
Voldemort Read-Only represents a common model in the world of derived data: batch processing. At any point in time, we can generate a very accurate derived dataset by batch processing the current state of the world. However, that batch processing is expensive and slow so we often run it only daily or even less frequently. But what about everything that happens between runs? Until the next execution, everything that happens on the LinkedIn application is ignored. There is another popular model that tries to address this shortcoming. That model is often called the lambda architecture.
In a lambda architecture, you have two different data processing systems. One of those systems is the batch processing and serving system we’ve already seen: batch processing on a platform like Hadoop and then loading into a datastore like Voldemort Read-Only for serving reads. The other system is the speed layer. The speed layer provides those up-to-the-minute updates so you don’t need to serve data that is up to a day or more old. At LinkedIn, we have stream data in Kafka that includes changes that members make to their profiles and may other events. We use Samza to do processing on that stream data and the Samza application writes into a database such as Espresso or Couchbase. This architecture allows us to generate near real-time updates to our recommendations based off of live activity.
Often the speed layer has to make computational sacrifices in order to operate quickly on the stream data as it comes in. Batch processing still has the advantage of operating on the entire corpus of data, so in many architectures it still provides more accurate and reliable results. For this reason, it provides a lot of value to continue doing batch processing even when near real-time stream processing is available.
When a member comes to the PYMK page of the LinkedIn application, the PYMK service queries both the batch system (Voldemort Read-Only) and the speed layer, does some local reconciliation between the two, and serves a result.
Doing Lambda better: Hybrid
Lambda architectures work, and they are widely used at LinkedIn and in the industry. They also have some downsides. The application needs to read from two different databases, which means the application always has to wait for the slower of two responses. The application is also only available when both databases are up. In addition to this, the application needs to be responsible for the additional complexity of integrating with two different database libraries and coordinating the logic of how to prioritize the results from each system.
Venice aims to solve these problems by providing a single database that accepts both batch and streaming ingestions and does the reconciliation at write time as a part of the infrastructure, so that each application team doesn’t need to reimplement a complex, multi-system lambda architecture—they can use one system that we provide and manage.
Another challenge in maintaining a lambda architecture is the fact that the data processing for batch and stream data requires different systems. This problem is upstream of data serving, and Venice doesn’t address this challenge; however, LinkedIn has contributed extensions to Samza to allow Samza code to operate on Hadoop data, possibly eliminating the need to maintain two different data processing code paths, since the same Samza code could run on data in HDFS and run on data streaming in Kafka. However, this is a topic for another blog post.
We’ve published a few blog posts about Venice already, but I’d like to take a moment to go over the way that Venice works for ingesting and serving data from Hadoop. The big difference between Venice and Voldemort Read-Only is the ingestion path. The Voldemort Read-Only Build and Push job builds an indexed database file on Hadoop for each database partition, and loads all of those partitions onto the various storage nodes in the cluster. Venice uses Kafka as a sort of write buffer. Any system that writes to Venice does so by writing to a Kafka topic, and the Venice storage nodes consume from that Kafka topic, persisting each record locally in order to serve queries.
The core resource in Venice is a store. A store has schemas, owners, and is isolated from other stores. A store represents a single use case for a team using Venice. When a team pushes a dataset into their Venice store, they create a new version of that store. Every bulk push of data into Venice is a complete snapshot of the dataset. Before starting the push, Venice allocates a new Kafka topic and provides that topic to the system pushing data. We do bulk data pushes from Hadoop, and the push is a MapReduce job. That job uses a defined partitioning strategy to partition all of the records across the different partitions of the Kafka topic. Now that the data is flowing into Kafka, the Venice storage nodes start consuming that data. The physical instantiation of the store version in the Venice storage nodes is also partitioned and uses the same partitioning as we use in Kafka. This means that each partition of the store in Venice is able to consume from a corresponding partition in Kafka. For simplicity, the diagram below doesn’t illustrate the partitioning.