a curated list of database news from authoritative sources

December 10, 2024

DDIA: Chapter 11 - Stream Processing


Daily batch processes introduce significant latency, since input changes reflected in the output only after a day. For fast paced business, this is too slow. To reduce delays, stream processing occurs more frequently (e.g., every second) or continuously, where events are handled as they happen. 

In stream processing, a record is typically called an event—a small, immutable object containing details of an occurrence, often with a timestamp. Polling for new events becomes costly when striving for low-latency continuous processing. Frequent polling increases overhead as most requests return no new data. Instead, systems should notify consumers when new events are available. Messaging systems handle this by pushing events from producers to consumers.

Direct messaging systems require application code to handle message loss and assume producers and consumers are always online, limiting fault tolerance. Message brokers (or message queues) improve reliability by acting as intermediaries. Producers write messages to the broker, which clients (consumers) read. Examples include RabbitMQ, ActiveMQ, Azure Service Bus, and Google Cloud Pub/Sub.

Partitioned logs store messages as append-only logs, assigning offsets to messages within each partition. Log-based brokers like Apache Kafka, Amazon Kinesis Streams, and Twitter’s DistributedLog prioritize high throughput and message ordering. Google Cloud Pub/Sub offers a similar architecture but exposes a JMS-style API. Log-based brokers suit high-throughput scenarios with fast, ordered message processing. In contrast, JMS/AMQP brokers work better when messages are expensive to process, order isn't critical, and parallel processing is needed.


Databases and Streams

Dual writes, where applications update multiple systems (e.g., a database, search index, and cache) concurrently or sequentially, are error-prone. Race conditions can occur, leading to inconsistent states (see Fig. 11-4). A better approach is to designate one system as the leader (e.g., the database) and make others, like the search index, its followers.

Traditionally, databases treat replication logs as internal details, not public APIs. However, Change Data Capture (CDC) extracts data changes from a database, often as a stream, enabling replication to other systems. For example, a database change log can update a search index in the same order as the changes occur, ensuring consistency (see Fig. 11-5). This makes derived data systems, like search indexes, consumers of the change stream.

CDC designates the source database as the leader, with derived systems as followers. Log-based message brokers like Kafka are ideal for delivering CDC events, as they maintain order. Large-scale CDC implementations include LinkedIn's Databus, Facebook's Wormhole, and Debezium for MySQL. Tools like Kafka Connect offer connectors for various databases.

To manage disk space, CDC uses snapshots in conjunction with logs. A snapshot corresponds to a specific log position, ensuring changes after the snapshot can be applied correctly. Some CDC tools automate this; others require manual handling.

Event sourcing builds on immutability, recording all changes as an append-only log of events. This approach mirrors accounting, where transactions are never altered but corrected with new entries if mistakes occur. Immutable logs improve auditability, ease recovery from bugs, and preserve historical data for analytics. For instance, a customer adding and removing an item from a shopping cart generates two events. Though the cart's final state is empty, the log records the customer’s interest, which is valuable for analytics.

Event sourcing simplifies concurrency control. Instead of multi-object transactions, a single event encapsulates a user action, requiring only an atomic append to the log. Partitioning the log and application state similarly (e.g., by customer) enables single-threaded processing per partition, eliminating concurrency issues. For cross-partition events, additional coordination is needed.


Processing streams

Processing streams supports use cases like monitoring (e.g., fraud detection, trading systems, manufacturing) and involves joins, which combine data across streams and databases. Three join types include stream-stream joins for matching related events within a time window, stream-table joins for enriching events using database changelogs, and table-table joins for producing materialized view updates. Fault tolerance in stream processing invole using techniques like microbatching, checkpointing, transactions, and idempotent writes to ensure reliability in long-running processes.

December 09, 2024

How to analyze usage from your MCP Server

Since Anthropic launched Model Context Protocol (MCP), thousands of developers have built MCP Servers to connect Claude AI to their unique contexts. Here's how to monitor your MCP Servers for errors and analyze usage with standard logging handlers, Tinybird, and Grafana.

Exploring the NaiadClock TLA+ model in TLA-Web

I have been impressed by the usability of TLA-Web from Will Schultz. Recently I have been using it for my TLA+ modeling of MongoDB catalog protocols internally, and found it very useful to explore and understand behavior. This got me thinking that TLA-Web would be really useful when exploring and understanding an unfamiliar spec I picked up on the web.

To test my hunch, I browsed through the TLA+ spec examples here,  and I came across this spec about the Naiad Clock. Since I had read DBSP paper recently, this was all the more interesting to me. I had written about Naiad in 2014, and about dataflow systems more broadly in 2017.


Getting to the ASCII version of the spec

Unfortunately, I would not be able to play with the spec, because it only came in paper form: "The Naiad Clock Protocol: Specification, Model Checking, and Correctness Proof."  The spec was available only as 13 pages of latex symbols in the Appendix A of this paper. I did briefly consider manually transforming this to the ASCII version by spending 2-3 hours, but then I remembered I am lazy, ADHD, and hahaha it is so funny that I even considered I could do this. Then I had a thought. What if I give this pdf to ChatGPT and asked back the ASCII version. Lo and behold that worked almost flawless, and in less than a minute's work!

I was so impressed that I tweeted about what a marvel this is. This could be a great way to salvage some TLA+ specs that only appear in latex formatted pdf publications. 



There was only one place that didn't parse, and ChatGPT was confused about prec versus preceq. This is understandable because this is a rare symbol; I haven't seen those used in a tla spec before. The interesting thing is that, it didn't even matter. I found that the prec definitions where superfluous. I could have just used leq[a][b] instead, which I ended up doing.


The spec

After I got to a compiling spec in a couple of minutes, I started looking for simplifications and improvements. I ended up making several of these over the next hour. For example, I parametrized the next actions to make the spec more readable. Parametrized next actions also enable us to select which parametrized version of the action to choose for the next action in TLA-Web and makes interactive exploration/simulation possible, as we discuss soon. 

In the spec, there are a lot of definitions early on about Delta vectors. It is better to skim over those, as the interesting part of the spec starts with variable definitions towards the end. The protocol has the flavor of incrementalized computation as I describe below. 

Where is the clock? There is no clock, this is more of a distributed convergent progress tracker maintained across the processes. As the paper says: "The Naiad Clock Protocol oversees the progress of a computation running within Naiad."

Here is the ASCII version I ended up producing. I hope this helps someone else when modeling a similar protocol. The model checking takes a  long time, even with small value choices for the parameters, e.g.,  Point = {p1,p2, p3} and Proc = {a,b}.

In a Naiad computation, each record is associated with a point in "virtual time," which represents its stage in the computation. These points form a partially ordered set. Operations transform input records from one set of points into output records at another set. The Procs {a,b} are just stateless compute nodes that perform that processing/operation.

This is a lot to take in. The spec also looks daunting, but TLA-Web makes exploring this specification easier and understandable, and that is what I'll describe next.


TLA-Web

After loading spec in TLA-Web, we get to choose an initial spec. To keep things interesting I choose a partial ordering of the points such that p3>p2 and p3>p1 and p1 and p2 are not ordered. So we have a DAG. This initial state might be hard to find because there are so many initial states allowed by the spec. So, if you click this link, these all comes preselected/loaded, and you can get to explore the computation by clicking on the enabled actions on the left panel.



On the right panel for state visualization, try this. Click on (un)explode and choose explode "points". This groups variables that are point-indexed together for better visualization. As an alternative process centered view, you can try exploding "Procs" and following the computation from that lens as well. 

As I mentioned above, "points" are important. Records go up through the points DAG. The processors {a,b} are just stateless compute nodes that help with that processing. The processors also exchange messages to inform each other and catchup on the current progress of records in the DAG. You can track the progress in the temp and nrec variables that are  point-indexed.  

When clicking an enabled NextPerformOperation in the left pane, notice that you get options. In some of the options, a p1 or p2 consumption may lead to a p3 increase, a new record created for processing at p3. That makes things interesting. 

An action chosen for processor a to process the record at p1 may create a record for p3. This action creates a local diff at temp[a] but that hasn't been shared with processor b. Temp is basically the delta vector to update the current state. temp[a] and temp[b] that keeps delta updates.  

After all NextPerformOperations are performed, and after you select all send updates receive updates to make these crdts converge, you have quiescence as in this computation trace


Epilogue 

This was fun. As I predicted, TLA-Web proved really useful when exploring and understanding a new spec I picked up on the web. I don't think I would have gotten a good sense of what the spec was doing without playing with it using TLA-Web. 



Will keeps adding features on TLA-Web, even though he does this as a hobby project. He just added animation support to TLA-Web, checkout the Cabbage Goat Wolf puzzle and select the animation tab on the right. There are other examples on the repo. This kind of visualization/exploration tools is a huge boon for the TLA+ ecosystem. Will is on a roll, he just finished his PhD, and joined me and Jesse at the distributed systems team at MongoDB Research today. Welcome Will!


December 07, 2024

Galloping Search

I recently learned about Galloping Search while building a distributed log called s3-log. It’s used to search sorted items when the upper bound is unknown. In this short post, I will share my notes and other alternatives I discovered for searching over unbounded items

December 06, 2024

December 05, 2024

Supabase Queues

Durable Message Queues with Guaranteed Delivery in Postgres

High Performance Disk

Store up to 60 TB of data with 100x improved durability and 5x more IOPS

December 04, 2024

Supabase Cron

Schedule Recurring Jobs in Postgres

December 03, 2024

Building Real-Time Live Sports Viewer Analytics with Tinybird and AWS

Ever tried to show millions of viewers real-time stats about how many other people like them are watching the same event? It's a bit like trying to count grains of sand while they're being poured into your bucket. Fun times! Let's look at how to build this without breaking the bank (or your sanity). The Challenge: Fan Engagement at Massive Scale Imagine you're streaming a major live event and want to show each viewer some engaging stats: * How many people in their state are watching? * How

The History of the Decline and Fall of In-Memory Database Systems

The History of the Decline and Fall of In-Memory Database Systems

In the early 2010s, the drop in memory prices combined with an overall increase in the reliability of computer hardware fueled a minor revolution in the world of database systems. Traditionally, slow but durable magnetic disk storage was the source of truth for a database system. Only when data needed to be analyzed or updated would it be briefly cached in memory by the buffer manager. And as memory got bigger and faster, the access latency of magnetic disks quickly became a bottleneck for many systems.