October 15, 2024
October 10, 2024
Database Triggers
Live Share: Connect to in-browser PGlite with any Postgres client
Anatomy of a Throttler, part 2
October 09, 2024
MongoDB Realm & Device Sync Alternatives - Supabase
October 08, 2024
Building offline-first mobile apps with Supabase, Flutter and Brick
Why You Shouldn't Forget to Optimize the Data Layout
Why You Shouldn’t Forget to Optimize the Data Layout
When you want to speed up your program, the obvious step is to recall the learnings of your data structure class and optimize the algorithmic complexity.
Clearly, algorithms are the star of each program as swapping a hot O(n) algorithm with a lower complexity one, such as O(log n), yields almost arbitrary performance improvements.
However, the way data is structured also affects performance significantly: Programs run on physical machines with physical properties such as varying data latencies to caches, disks, or RAM.
After you optimized your algorithm, you need to consider these properties to achieve the best possible performance.
An optimized data layout takes your algorithms and access patterns into account when deciding on how to store the bytes of your data structure on physical storage.
Therefore, it can make your algorithms run several times faster.
In this blog post, we will show you an example where we can achieve a 4x better read performance by just changing the data layout according to our access pattern.
Intuitions for Distributed Consensus by Phil Eaton
This is an external talk of mine. Click here if you are not redirected.
September 30, 2024
Supabase Launch Week 12 Hackathon Winners
September 29, 2024
Build a serverless ACID database with this one neat trick (atomic PutIfAbsent)
Delta Lake is an open protocol for serverless ACID databases. Due to its simplicity, scalability, and the number of open-source implementations, it's quickly becoming the DuckDB of serverless transactional databases for analytics workloads. Iceberg is a contender too, and is similar in many ways. But since Delta Lake is simpler (simple != better) that's where we'll focus in this post.
Delta Lake has one of the most accessible database papers I've read (link). It's kind of like the movfuscator of databases.
Thanks to its simplicity, in this post we'll implement a Delta Lake-inspired serverless ACID database in 500 lines of Go code with zero dependencies. It will support creating tables, inserting rows into a table, and scanning all rows in a table. All while allowing concurrent readers and writers and achieving snapshot isolation.
There are other critical parts of Delta Lake we'll ignore: updating rows, deleting rows, checkpointing the transaction metadata log, compaction, and probably much more I'm not aware of. We must start somewhere.
All code for this post is available on GitHub.
Delta Lake basics
Delta Lake writes immutable data files to blob storage. It stores the names of new data files for a transaction in a metadata file. It handles concurrency (i.e. achieves snapshot isolation) with an atomic PutIfAbsent operation on the metadata file for the transaction.
This method of concurrency control works because the metadata files follow a naming scheme that includes the transaction id in the file name. When a new transaction starts, it finds all existing metadata files and picks its own transaction id by adding 1 to the largest transaction id it sees.
When a transaction goes to commit, writing the metadata file will fail if another transaction has already picked the same transaction id.
If a transaction does no writes and creates no tables, the transaction does not attempt to write any metadata file. Snapshot isolation!
Let's dig into the implementation.
Boilerplate
Let's give ourselves some nice assertion methods, a debug method, and
a uuid generator. In main.go:
package main
import (
"encoding/json"
"fmt"
"io"
"os"
"path"
"slices"
"strings"
)
func assert(b bool, msg string) {
if !b {
panic(msg)
}
}
func assertEq[C comparable](a C, b C, prefix string) {
if a != b {
panic(fmt.Sprintf("%s '%v' != '%v'", prefix, a, b))
}
}
var DEBUG = slices.Contains(os.Args, "--debug")
func debug(a ...any) {
if !DEBUG {
return
}
args := append([]any{"[DEBUG]"}, a...)
fmt.Println(args...)
}
// https://datatracker.ietf.org/doc/html/rfc4122#section-4.4
func uuidv4() string {
f, err := os.Open("/dev/random")
assert(err == nil, fmt.Sprintf("could not open /dev/random: %s", err))
defer f.Close()
buf := make([]byte, 16)
n, err := f.Read(buf)
assert(err == nil, fmt.Sprintf("could not read 16 bytes from /dev/random: %s", err))
assert(n == len(buf), "expected 16 bytes from /dev/random")
// Set bit 6 to 0
buf[8] &= ^(byte(1) << 6)
// Set bit 7 to 1
buf[8] |= 1 << 7
// Set version
buf[6] &= ^(byte(1) << 4)
buf[6] &= ^(byte(1) << 5)
buf[6] |= 1 << 6
buf[6] &= ^(byte(1) << 7)
return fmt.Sprintf("%x-%x-%x-%x-%x",
buf[:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}
Is that uuid method correct? Hopefully. Efficient? No. But it's preferable to avoid dependencies in pedagogical projects.
Moving on.
Blob storage requirements
As mentioned above, the basic requirement is that we support atomically writing some bytes to a location if the location doesn't already exist.
On top of that we also need the ability to list locations by prefix, and the ability to read the bytes at some location.
We'll diverge from Delta Lake in how we name files on disk. For one,
we'll keep all files in the same directory with a fixed prefix for
metadata and another table name prefix for each data file. This
simplifies the implementation of listPrefix a bit.
However, this also diverges from Delta Lake in that transactions
will represent all tables. In Delta Lake that is not so. Delta Lake
has a per-table transaction log. Only transactions that read and
write the same table in Delta Lake achieve snapshot isolation.
So let's set up an interface to describe these requirements:
type objectStorage interface {
// Must be atomic.
putIfAbsent(name string, bytes []byte) error
listPrefix(prefix string) ([]string, error)
read(name string) ([]byte, error)
}
And this is literally all we need to get ACID transactions. That's crazy!
Atomic Put and cloud blob storage
We could implement the atomic putIfAbsent part of this interface in
2024 using conditional
writes
on S3. Or we could implement this interface with the If-None-Match
header
on Azure Cloud Storage. Or we could implement this interface with the
x-goog-if-generation-match
header on
Google Cloud Storage.
Indeed a good exercise for the reader would be to implement this interface for other blob storage providers and see your serverless cloud database in action!
But the simplest method of all is to implement it on the filesystem, which is what we'll do next.
A filesystem blob store
If we had a server we could implement atomic putIfAbsent with a
mutex. But we're serverless baby. Thankfully, POSIX supports atomic
link
which will fail if the new name is already a file.
So we'll just create a temporary file and write out all bytes. Finally, we link the temporary file to the permanent name we intended. For cleanliness (not correctness), if there is an error at any point, we'll remove the temporary file.
type fileObjectStorage struct {
basedir string
}
func newFileObjectStorage(basedir string) *fileObjectStorage {
return &fileObjectStorage{basedir}
}
func (fos *fileObjectStorage) putIfAbsent(name string, bytes []byte) error {
tmpfilename := path.Join(fos.basedir, uuidv4())
f, err := os.OpenFile(tmpfilename, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
written := 0
bufSize := 1024 * 16
for written < len(bytes) {
toWrite := min(written+bufSize, len(bytes))
n, err := f.Write(bytes[written:toWrite])
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
written += n
}
err = f.Sync()
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
err = f.Close()
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
filename := path.Join(fos.basedir, name)
err = os.Link(tmpfilename, filename)
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
return nil
}
yencabulator
on HN pointed out that an earlier version of this post had a buggy
implementation of putIfAbsent (that attempted to manage
atomicity solely via O_EXCL | O_CREAT) would leave
around potentially bad metadata files if the os.Remove
call ever failed.
The link approach works around that because the file is
already fully and correctly written by the time we do the link.
listPrefix and read are minimal wrappers around filesystem APIs:
func (fos *fileObjectStorage) listPrefix(prefix string) ([]string, error) {
dir := path.Join(fos.basedir)
f, err := os.Open(dir)
if err != nil {
return nil, err
}
var files []string
for err != io.EOF {
var names []string
names, err = f.Readdirnames(100)
if err != nil && err != io.EOF {
return nil, err
}
for _, n := range names {
if prefix == "" || strings.HasPrefix(n, prefix) {
files = append(files, n)
}
}
}
err = f.Close()
return files, err
}
func (fos *fileObjectStorage) read(name string) ([]byte, error) {
filename := path.Join(fos.basedir, name)
return os.ReadFile(filename)
}
It is worth talking a bit about reading a directory though. Go doesn't
provide a nice iterator API for us and I didn't want to implement this
as callbacks with
path/filepath.WalkDir.
We could use os.File.ReadDir
but it allocates for all files in the directory. Sure, in a
pedagogical project we don't worry about millions of files. But the
ReadDir API, the error cases in particular, also isn't much simpler
than Readdirnames.
What's more, even though we iterated through batches of directory entries, and did prefix filtering before accumulating, we still could have considered returning an iterator here ourselves. It seems possible and likely that the number of data files grows quite large in a production system. But I was lazy.
It would be nice if Go introduced an actual iterator API for reading a directory. :)
Delta Lake and stale reads
In any case the ACID properties of Delta Lake (and Iceberg) don't depend on being able to read up-to-date data.
This is because concurrent (or stale) transactions that write will fail on commit. And also because all files written (even metadata files) are immutable.
Since all data is immutable, we will always be able to read at least a consistent snapshot of data. But we will never be able to get SERIALIZABLE read-only transactions. This is just how Delta Lake and Iceberg work. And it is a similar or better consistency level to what any major SQL database gives you by default.
You'll see what I mean later on when we implement transaction commits.
Transaction boilerplate
Now that we've got a blob storage abstraction and a filesystem implementation of it, let's start sketching out what a client and what a transaction looks like.
In Delta Lake, a transaction consists of a list of actions. An action might be to define a table's schema, or to add a data file, or to remove a data file, etc. In this post we'll only implement the first two actions.
type DataobjectAction struct {
Name string
Table string
}
type ChangeMetadataAction struct {
Table string
Columns []string
}
// an enum, only one field will be non-nil
type Action struct {
AddDataobject *DataobjectAction
ChangeMetadata *ChangeMetadataAction
// TODO: Support object removal.
// DeleteDataobject *DataobjectAction
}
These fields are all exported (i.e. capitalized, if you're not familiar with Go) because we will be writing them to disk when the transaction commits as the transaction's metadata.
In fact Actions and the transaction's id will be the only parts of
the transaction we write to disk. Everything else will be in-memory
state.
For our convenience we will track in memory a history of all previous actions, a mapping of table columns, and a mapping of unflushed data by table.
type transaction struct {
Id int
// Both are mapping table name to a list of actions on the table.
previousActions map[string][]Action
Actions map[string][]Action
// Mapping tables to column names.
tables map[string][]string
// Mapping table name to unflushed/in-memory rows. When rows
// are flushed, the dataobject that contains them is added to
// `tx.actions` above and `tx.unflushedDataPointer[table]` is
// reset to `0`.
unflushedData map[string]*[DATAOBJECT_SIZE][]any
unflushedDataPointer map[string]int
}
Only the current transaction will ever have
transaction.previousActions filled out. transaction.tables will be
populated when the transaction starts by reading through
transaction.previousActions for ChangeMetadataActions, and we will
also add onto it when we create a table in the current transaction.
We will append to transaction.Actions every time we write a new data
file and every time we create a new table.
We will add rows to transaction.unflushedData for a table until
transaction.unflushedDataPointer for that table reaches
DATAOBJECT_SIZE upon which time we will write that data to disk and
add a DataobjectAction entry to transaction.Actions.
Client boilerplate
A client will consist of an objectStorage implementation and a
possibly empty *transaction. Empty meaning there is no current
transaction.
type client struct {
os objectStorage
// Current transaction, if any. Only one transaction per
// client at a time. All reads and writes must be within a
// transaction.
tx *transaction
}
func newClient(os objectStorage) client {
return client{os, nil}
}
var (
errExistingTx = fmt.Errorf("Existing Transaction")
errNoTx = fmt.Errorf("No Transaction")
errTableExists = fmt.Errorf("Table Exists")
errNoTable = fmt.Errorf("No Such Table")
)
Client or database?
In a previous version of my code I named this client struct
database. But that's misleading. There is no central database. There
is just the client and the blob storage.
Clients work with transactions directly and only when attempting to commit does the blob storage abstraction let the client know if the transaction succeeded or not.
Starting a transaction
When we start a transaction, we will first read all existing transactions from disk and accumulate the actions from each prior transaction.
We will interpret ChangeMetadataActions and materialize them into a
current view of all tables.
And we will assign a transaction ID to this transaction to be 1 greater than the largest existing transaction ID we see.
Again it doesn't matter if the listPrefix call we use returns an
up-to-date list. Notably on blob storage there are few guarantees
about LIST operations recency. The Delta Lake paper mentions this too.
Out-of-date transactions attempting to write will be caught when we go to commit the transaction. Out-of-date transactions attempting only to read will still read a consistent snapshot.
func (d *client) newTx() error {
if d.tx != nil {
return errExistingTx
}
logPrefix := "_log_"
txLogFilenames, err := d.os.listPrefix(logPrefix)
if err != nil {
return err
}
tx := &transaction{}
tx.previousActions = map[string][]Action{}
tx.Actions = map[string][]Action{}
tx.tables = map[string][]string{}
tx.unflushedData = map[string]*[DATAOBJECT_SIZE][]any{}
tx.unflushedDataPointer = map[string]int{}
for _, txLogFilename := range txLogFilenames {
bytes, err := d.os.read(txLogFilename)
if err != nil {
return err
}
var oldTx transaction
err = json.Unmarshal(bytes, &oldTx)
if err != nil {
return err
}
// Transaction metadata files are sorted
// lexicographically so that the most recent
// transaction (i.e. the one with the largest
// transaction id) will be last and tx.Id will end up
// 1 greater than the most recent transaction ID we
// see on disk.
tx.Id = oldTx.Id + 1
for table, actions := range oldTx.Actions {
for _, action := range actions {
if action.AddDataobject != nil {
tx.previousActions[table] = append(tx.previousActions[table], action)
} else if action.ChangeMetadata != nil {
// Store the latest version of
// each table in memory for
// easy lookup.
mtd := action.ChangeMetadata
tx.tables[table] = mtd.Columns
} else {
panic(fmt.Sprintf("unsupported action: %v", action))
}
}
}
}
d.tx = tx
return nil
}
And we're set.
Creating a table
When we create a table, we need to add a ChangeMetadataAction to the
transactions Actions. And we also want to add the table info to the
in-memory transaction.tables field.
We don't do any of this durably. The change here will be written to disk on commit (if the transaction succeeds).
func (d *client) createTable(table string, columns []string) error {
if d.tx == nil {
return errNoTx
}
if _, exists := d.tx.tables[table]; exists {
return errTableExists
}
// Store it in the in-memory mapping.
d.tx.tables[table] = columns
// And also add it to the action history for future transactions.
d.tx.Actions[table] = append(d.tx.Actions[table], Action{
ChangeMetadata: &ChangeMetadataAction{
Table: table,
Columns: columns,
},
})
return nil
}
Easy peasy. Now for the fun part, writing data!
Writing a row
This is the next area where we'll diverge from Delta Lake. For the
sake of zero dependencies we are going to store data in-memory as an
array of array of any. And when we later write rows to disk we'll
write them as JSON. A real Delta Lake implementation would store data
in-memory in Apache Arrow format, and write to disk as Parquet.
In line with Delta Lake though we will buffer data in memory until we get 64K rows. When we get 64K rows for a particular table we will flush all those rows to disk. (When we go to commit a transaction we will flush any outstanding rows.)
func (d *client) writeRow(table string, row []any) error {
if d.tx == nil {
return errNoTx
}
if _, ok := d.tx.tables[table]; !ok {
return errNoTable
}
// Try to find an unflushed/in-memory dataobject for this table
pointer, ok := d.tx.unflushedDataPointer[table]
if !ok {
d.tx.unflushedDataPointer[table] = 0
d.tx.unflushedData[table] = &[DATAOBJECT_SIZE][]any{}
}
if pointer == DATAOBJECT_SIZE {
d.flushRows(table)
pointer = 0
}
d.tx.unflushedData[table][pointer] = row
d.tx.unflushedDataPointer[table]++
return nil
}
Now let's implement flushing.
Flushing a data object
Recall that data objects in Delta Lake (and Iceberg) are
immutable. Once we've got enough data to write a data object, we give
it a unique name, write it to disk, and add a AddObjectAction to the
transaction's list of Actions.
type dataobject struct {
Table string
Name string
Data [DATAOBJECT_SIZE][]any
Len int
}
func (d *client) flushRows(table string) error {
if d.tx == nil {
return errNoTx
}
// First write out dataobject if there is anything to write out.
pointer, exists := d.tx.unflushedDataPointer[table]
if !exists || pointer == 0 {
return nil
}
df := dataobject{
Table: table,
Name: uuidv4(),
Data: *d.tx.unflushedData[table],
Len: pointer,
}
bytes, err := json.Marshal(df)
if err != nil {
return err
}
err = d.os.putIfAbsent(fmt.Sprintf("_table_%s_%s", table, df.Name), bytes)
if err != nil {
return err
}
// Then record the newly written data file.
d.tx.Actions[table] = append(d.tx.Actions[table], Action{
AddDataobject: &DataobjectAction{
Table: table,
Name: df.Name,
},
})
// Reset in-memory pointer.
d.tx.unflushedDataPointer[table] = 0
return nil
}
That's it for writing data! Let's now look at reading data.
Scanning a table
We're going to make scanning mildly more complicated than it needed to
be in pedagogical code because we'll have client.scan() return an
iterator rather than an array with all rows.
The scanIterator will first read from in-memory (unflushed)
data. And then it will read through every data object for the table
that is still a part of this transaction. We will know which data
objects are still a part of this transaction by reading through all
AddDataobject actions. A future version of this project would also
eliminate data object files from the list by observing
DeleteDataobject actions. But we don't do that in this post.
func (d *client) scan(table string) (*scanIterator, error) {
if d.tx == nil {
return nil, errNoTx
}
var dataobjects []string
allActions := append(d.tx.previousActions[table], d.tx.Actions[table]...)
for _, action := range allActions {
if action.AddDataobject != nil {
dataobjects = append(dataobjects, action.AddDataobject.Name)
}
}
var unflushedRows [DATAOBJECT_SIZE][]any
if data, ok := d.tx.unflushedData[table]; ok {
unflushedRows = *data
}
return &scanIterator{
unflushedRows: unflushedRows,
unflushedRowsLen: d.tx.unflushedDataPointer[table],
d: d,
table: table,
dataobjects: dataobjects,
}, nil
}
The scanIterator needs to track where we are in in-memory rows, in
data objects, and within a particular data object.
type scanIterator struct {
d *client
table string
// First we iterate through unflushed rows.
unflushedRows [DATAOBJECT_SIZE][]any
unflushedRowsLen int
unflushedRowPointer int
// Then we move through each dataobject.
dataobjects []string
dataobjectsPointer int
// And within each dataobject we iterate through rows.
dataobject *dataobject
dataobjectRowPointer int
}
And the scanIterator will be driven by a next() method that goes
through in-memory data first and then through what's on disk.
func (d *client) readDataobject(table, name string) (*dataobject, error) {
bytes, err := d.os.read(fmt.Sprintf("_table_%s_%s", table, name))
if err != nil {
return nil, err
}
var do dataobject
err = json.Unmarshal(bytes, &do)
return &do, err
}
// returns (nil, nil) when done
func (si *scanIterator) next() ([]any, error) {
// Iterate through in-memory rows first.
if si.unflushedRowPointer < si.unflushedRowsLen {
row := si.unflushedRows[si.unflushedRowPointer]
si.unflushedRowPointer++
return row, nil
}
// If we've gotten through all dataobjects on disk we're done.
if si.dataobjectsPointer == len(si.dataobjects) {
return nil, nil
}
if si.dataobject == nil {
name := si.dataobjects[si.dataobjectsPointer]
o, err := si.d.readDataobject(si.table, name)
if err != nil {
return nil, err
}
si.dataobject = o
}
if si.dataobjectRowPointer > si.dataobject.Len {
si.dataobjectsPointer++
si.dataobject = nil
si.dataobjectRowPointer = 0
return si.next()
}
row := si.dataobject.Data[si.dataobjectRowPointer]
si.dataobjectRowPointer++
return row, nil
}
That's it for scanning a table! The final piece of the puzzle is committing a transaction.
Committing a transaction
When we commit a transaction we must flush any remaining data. A
read-only transaction (one which has no Actions) is immediately
done. There is no concurrency check.
Otherwise we will serialize transaction state and attempt to
atomically putIfAbsent.
The only way this will fail is if there is another concurrent writer.
func (d *client) commitTx() error {
if d.tx == nil {
return errNoTx
}
// Flush any outstanding data
for table := range d.tx.tables {
err := d.flushRows(table)
if err != nil {
d.tx = nil
return err
}
}
wrote := false
for _, actions := range d.tx.Actions {
if len(actions) > 0 {
wrote