June 01, 2022
May 26, 2022
Introducing PlanetScale Insights: Advanced query monitoring
How Mike Lyndon is using Supabase to accelerate development of AllPullTogether
May 25, 2022
Extract, load, and transform your data with PlanetScale Connect
You can now see how much data your endpoints scan in real-time
May 24, 2022
Introducing PlanetScale Portals: Read-only regions
May 22, 2022
MongoDB secondary only index
May 20, 2022
I Don't Want to Shard (MySQL)
Chapter 5 of Efficient MySQL Performance addresses sharding, and it was difficult to write but not for technical reasons. Let me say a little more on the matter.
You can now create Materialized Views in the Tinybird UI
May 17, 2022
Let's build a distributed Postgres proof of concept
What is CockroachDB under the hood? Take a look at its go.mod and notice a number of dependencies that do a lot of work: a PostgreSQL wire protocol implementation, a storage layer, a Raft implementation for distributed consensus. And not part of go.mod but still building on 3rd party code, PostgreSQL's grammar definition.
To be absurdly reductionist, CockroachDB is just the glue around these libraries. With that reductionist mindset, let's try building a distributed Postgres proof of concept ourselves! We'll use only four major external libraries: for parsing SQL, handling Postgres's wire protocol, handling Raft, and handling the storage of table metadata and rows themselves.
For a not-reductionist understanding of the CockroachDB internals, I recommend following the excellent Cockroach Engineering blog and Jordan Lewis's Hacking CockroachDB Twitch stream.
By the end of this post, in around 600 lines of code, we'll have a
distributed "Postgres implementation" that will accept writes
(CREATE TABLE
, INSERT
) on the leader and accept reads (SELECT
)
on any node. All nodes will contain the same data.
Here is a sample interaction against the leader:
$ psql -h localhost -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> create table x (age int, name text);
CREATE ok
phil=> insert into x values(14, 'garry'), (20, 'ted');
could not interpret result from server: INSERT ok
INSERT ok
phil=> select name, age from x;
name | age
---------+-----
"garry" | 14
"ted" | 20
(2 rows)
And against a follower (note the different port):
$ psql -h 127.0.0.1 -p 6001
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> select age, name from x;
age | name
-----+---------
20 | "ted"
14 | "garry"
(2 rows)
All code for this post is available on Github in the fondly named WaterbugDB repo.
Plan of attack
Influenced by Philip O'Toole's talk on rqlite at Hacker
Nights we'll
have a Postgres wire protocol server in front. As it receives queries
it will respond immediately to SELECT
s. Otherwise for CREATE TABLE
s
and INSERT
s it will send the entire query string to the Raft
cluster. Each process that is part of the Raft cluster will implement
the appropriate functions for handling Raft messages. In this case the
messages will just be to create a table or insert data.
So every running process will run a Postgres wire protocol server, a Raft server, and an HTTP server that you'll see is an implementation detail about how processes join to the same Raft cluster.
Every running process will have its own directory for storing data.
Raft
There is likely a difference between Raft, the paper, and Raft, the implementations. When I refer to Raft in the rest of this post I'm going to be referring to an implementation.
And although CockroachDB use's etcd's Raft implementation, I didn't realize that when I started building this project. I used Hashicorp's Raft implementation.
Raft allows us to reliably keep multiple nodes in sync with a log of messages. Each node in the Raft cluster implements a finite state machine (FSM) with three operations: apply, snapshot, and restore. Our finite state machine will embed a postgres engine we'll build out after this to handle query execution.
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"path"
"strings"
"time"
"github.com/google/uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/jackc/pgproto3/v2"
pgquery "github.com/pganalyze/pg_query_go/v2"
bolt "go.etcd.io/bbolt"
)
type pgFsm struct {
pe *pgEngine
}
From what I understand, the snapshot operation allows Raft to truncate logs. It is used in conjuction with restoring. On startup if there is a snapshot, restore is called so you can load the snapshot. Then afterwards all logs not yet snapshotted are replayed through the apply operation.
To keep this implementation simple we'll just fail all snapshots so restore will never be called and all logs will be replayed every time on startup through the apply operation. This is of course inefficient but it keeps the code simpler.
When we write the startup code we'll need to delete the database so that these apply calls happen fresh.
type snapshotNoop struct{}
func (sn snapshotNoop) Persist(sink raft.SnapshotSink) error {
return sink.Cancel()
}
func (sn snapshotNoop) Release() {}
func (pf *pgFsm) Snapshot() (raft.FSMSnapshot, error) {
return snapshotNoop{}, nil
}
func (pf *pgFsm) Restore(rc io.ReadCloser) error {
return fmt.Errorf("Nothing to restore")
}
Finally, applying is receiving a single message and applying it for the
node. In this project the message will be a CREATE TABLE
or INSERT
query. So we'll parse the query and pass it to the postgres engine for
execution.
func (pf *pgFsm) Apply(log *raft.Log) interface{} {
switch log.Type {
case raft.LogCommand:
ast, err := pgquery.Parse(string(log.Data))
if err != nil {
panic(fmt.Errorf("Could not parse payload: %s", err))
}
err = pf.pe.execute(ast)
if err != nil {
panic(err)
}
default:
panic(fmt.Errorf("Unknown raft log type: %#v", log.Type))
}
return nil
}
Panic-ing here is actually the advised behavior.
Raft server
Now we can set up the actual Raft server and pass an instance of this FSM. This is a bunch of boilerplate that would matter in production installs but for us basically we just need to tell Raft where to run and how to store its own internal data, including its all-important message log.
func setupRaft(dir, nodeId, raftAddress string, pf *pgFsm) (*raft.Raft, error) {
os.MkdirAll(dir, os.ModePerm)
store, err := raftboltdb.NewBoltStore(path.Join(dir, "bolt"))
if err != nil {
return nil, fmt.Errorf("Could not create bolt store: %s", err)
}
snapshots, err := raft.NewFileSnapshotStore(path.Join(dir, "snapshot"), 2, os.Stderr)
if err != nil {
return nil, fmt.Errorf("Could not create snapshot store: %s", err)
}
tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddress)
if err != nil {
return nil, fmt.Errorf("Could not resolve address: %s", err)
}
transport, err := raft.NewTCPTransport(raftAddress, tcpAddr, 10, time.Second*10, os.Stderr)
if err != nil {
return nil, fmt.Errorf("Could not create tcp transport: %s", err)
}
raftCfg := raft.DefaultConfig()
raftCfg.LocalID = raft.ServerID(nodeId)
r, err := raft.NewRaft(raftCfg, pf, store, store, snapshots, transport)
if err != nil {
return nil, fmt.Errorf("Could not create raft instance: %s", err)
}
// Cluster consists of unjoined leaders. Picking a leader and
// creating a real cluster is done manually after startup.
r.BootstrapCluster(raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(nodeId),
Address: transport.LocalAddr(),
},
},
})
return r, nil
}
Every instance of this process will run this and will start off as a leader in a new cluster. We'll expose an HTTP server that allows a leader to talk to other leaders to tell them to stop leading and follow it. This HTTP endpoint in the HTTP server is how we'll get from N process with N leaders and N clusters to N processes with 1 leader and 1 cluster.
That's basically it for the core Raft bits. So let's build out that HTTP server and follow endpoint.
HTTP follow endpoint
Our HTTP server will have just one endpoint that tells the process (a) to contact another process (b) so that process (b) joins the process (a) cluster.
The HTTP server will need to have the process (a)'s Raft instance to be able to start this join action. And in order for Raft to know how to contact the process (b) we'll need to tell it both the process (b)'s unique Raft node id (we'll give it a unique id ourselves when we start the process) and the process (b)'s Raft server port.
type httpServer struct {
r *raft.Raft
}
func (hs httpServer) addFollowerHandler(w http.ResponseWriter, r *http.Request) {
followerId := r.URL.Query().Get("id")
followerAddr := r.URL.Query().Get("addr")
if hs.r.State() != raft.Leader {
json.NewEncoder(w).Encode(struct {
Error string `json:"error"`
}{
"Not the leader",
})
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
err := hs.r.AddVoter(raft.ServerID(followerId), raft.ServerAddress(followerAddr), 0, 0).Error()
if err != nil {
log.Printf("Failed to add follower: %s", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
}
That's it! Let's move on to the query engine.
Query engine
The query engine is a wrapper around a storage layer. We'll bring in bbolt.
I originally built this with Cockroach's pebble but pebble has a transitive dependency on a C library that has function names that conflict with function names in the C library that pg_query_go wraps.
type pgEngine struct {
db *bolt.DB
bucketName []byte
}
func newPgEngine(db *bolt.DB) *pgEngine {
return &pgEngine{db, []byte("data")}
}
bbolt organizes data into buckets. Buckets might be a natural way to store table rows (one bucket per table) but to keep the implementation simple we'll put all table metadata and row data into a single `data` bucket.
The entrypoint we called in the Raft apply implementation above was
execute
. It took a parsed list of statements. We'll iterate over the
statements, figuring out the kind of each statement, and call out to a
dedicated helper for each kind.
func (pe *pgEngine) execute(tree *pgquery.ParseResult) error {
for _, stmt := range tree.GetStmts() {
n := stmt.GetStmt()
if c := n.GetCreateStmt(); c != nil {
return pe.executeCreate(c)
}
if c := n.GetInsertStmt(); c != nil {
return pe.executeInsert(c)
}
if c := n.GetSelectStmt(); c != nil {
_, err := pe.executeSelect(c)
return err
}
return fmt.Errorf("Unknown statement type: %s", stmt)
}
return nil
}
The pg_query_go docs are not super helpful. I had to build a separate AST explorer program to make it easier to understand this parser.
Let's start with creating a table.
Create table
When a table is created, we'll need to store its metadata.
type tableDefinition struct {
Name string
ColumnNames []string
ColumnTypes []string
}
First we pull that metadata out of the AST.
func (pe *pgEngine) executeCreate(stmt *pgquery.CreateStmt) error {
tbl := tableDefinition{}
tbl.Name = stmt.Relation.Relname
for _, c := range stmt.TableElts {
cd := c.GetColumnDef()
tbl.ColumnNames = append(tbl.ColumnNames, cd.Colname)
// Names is namespaced. So `INT` is pg_catalog.int4. `BIGINT` is pg_catalog.int8.
var columnType string
for _, n := range cd.TypeName.Names {
if columnType != "" {
columnType += "."
}
columnType += n.GetString_().Str
}
tbl.ColumnTypes = append(tbl.ColumnTypes, columnType)
}
Now we need to store this in the storage layer. The easiest/dumbest
way to do this is to serialize the metadata to JSON and store it with
key: tables_${tableName}
.
tableBytes, err := json.Marshal(tbl)
if err != nil {
return fmt.Errorf("Could not marshal table: %s", err)
}
err = pe.db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
if err != nil {
return err
}
return bkt.Put([]byte("tables_"+tbl.Name), tableBytes)
})
if err != nil {
return fmt.Errorf("Could not set key-value: %s", err)
}
return nil
}
Next we'll build a helper to reverse that operation, pulling out table metadata from the storage layer by the table name:
func (pe *pgEngine) getTableDefinition(name string) (*tableDefinition, error) {
var tbl tableDefinition
err := pe.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pe.bucketName)
if bkt == nil {
return fmt.Errorf("Table does not exist")
}
valBytes := bkt.Get([]byte("tables_" + name))
err := json.Unmarshal(valBytes, &tbl)
if err != nil {
return fmt.Errorf("Could not unmarshal table: %s", err)
}
return nil
})
return &tbl, err
}
That's it for our basic CREATE TABLE
support! Let's do INSERT
next.
Insert row
Our support for insert will only support literal/constant VALUES
.
func (pe *pgEngine) executeInsert(stmt *pgquery.InsertStmt) error {
tblName := stmt.Relation.Relname
slct := stmt.GetSelectStmt().GetSelectStmt()
for _, values := range slct.ValuesLists {
var rowData []any
for _, value := range values.GetList().Items {
if c := value.GetAConst(); c != nil {
if s := c.Val.GetString_(); s != nil {
rowData = append(rowData, s.Str)
continue
}
if i := c.Val.GetInteger(); i != nil {
rowData = append(rowData, i.Ival)
continue
}
}
return fmt.Errorf("Unknown value type: %s", value)
}
It would be better to abstract this VALUES
code into a helper so it
could be used by SELECT
s too but out of laziness we'll just keep
this here.
Next we need to write the row to the storage layer. We'll serialize the row data to JSON (inefficient because we know the row structure, but JSON is easy). We'll store the row with a prefix including the table name and we'll give its key a unique UUID. When we're iterating over rows in the table we'll be able to do a prefix scan that will recover just the rows in this table.
rowBytes, err := json.Marshal(rowData)
if err != nil {
return fmt.Errorf("Could not marshal row: %s", err)
}
id := uuid.New().String()
err = pe.db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
if err != nil {
return err
}
return bkt.Put([]byte("rows_"+tblName+"_"+id), rowBytes)
})
if err != nil {
return fmt.Errorf("Could not store row: %s", err)
}
}
return nil
}
Finally we can move on to support SELECT
!
Select rows
Unlike CREATE TABLE
and INSERT
, SELECT
will need to return rows,
column names, and because the Postgres wire protocol wants it, column
types.
type pgResult struct {
fieldNames []string
fieldTypes []string
rows [][]any
}
First we pull out the table name and the fields selected, looking up field types in the table metadata.
func (pe *pgEngine) executeSelect(stmt *pgquery.SelectStmt) (*pgResult, error) {
tblName := stmt.FromClause[0].GetRangeVar().Relname
tbl, err := pe.getTableDefinition(tblName)
if err != nil {
return nil, err
}
results := &pgResult{}
for _, c := range stmt.TargetList {
fieldName := c.GetResTarget().Val.GetColumnRef().Fields[0].GetString_().Str
results.fieldNames = append(results.fieldNames, fieldName)
fieldType := ""
for i, cn := range tbl.ColumnNames {
if cn == fieldName {
fieldType = tbl.ColumnTypes[i]
}
}
if fieldType == "" {
return nil, fmt.Errorf("Unknown field: %s", fieldName)
}
results.fieldTypes = append(results.fieldTypes, fieldType)
}
Finally, we do a prefix scan to grab all rows in the table from the storage layer.
prefix := []byte("rows_" + tblName + "_")
pe.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(pe.bucketName).Cursor()
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
var row []any
err = json.Unmarshal(v, &row)
if err != nil {
return fmt.Errorf("Unable to unmarshal row: %s", err)
}
var targetRow []any
for _, target := range results.fieldNames {
for i, field := range tbl.ColumnNames {
if target == field {
targetRow = append(targetRow, row[i])
}
}
}
results.rows = append(results.rows, targetRow)
}
return nil
})
return results, nil
}
That's it for SELECT
! The last function we'll implement is a
helper for deleting all data in the storage layer. This will be called
on startup before Raft logs are applied so the database always ends up
in a consistent state.
func (pe *pgEngine) delete() error {
return pe.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pe.bucketName)
if bkt != nil {
return tx.DeleteBucket(pe.bucketName)
}
return nil
})
}
And we're ready to move on to the final layer, the Postgres wire protocol.
Postgres wire protocol server
jackc/pgproto3 is an
implementation of the Postgres wire protocol for Go. It allows us to
implement a server that can respond to requests by Postgres clients
like psql
.
It works by wrapping a TCP connection. So we'll start by building a function that does the TCP serving loop.
func runPgServer(port string, db *bolt.DB, r *raft.Raft) {
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
log.Fatal(err)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
pc := pgConn{conn, db, r}
go pc.handle()
}
}
The pgConn
instance needs access to the database directly so it can
respond to SELECT
s. And it needs the Raft instance for all other
queries.
type pgConn struct {
conn net.Conn
db *bolt.DB
r *raft.Raft
}
The handle
function we called above will grab the current message
via the pgproto3 package and handle startup messages and regular
messages.
func (pc pgConn) handle() {
pgc := pgproto3.NewBackend(pgproto3.NewChunkReader(pc.conn), pc.conn)
defer pc.conn.Close()
err := pc.handleStartupMessage(pgc)
if err != nil {
log.Println(err)
return
}
for {
err := pc.handleMessage(pgc)
if err != nil {
log.Println(err)
return
}
}
}
Startup messages include authorization and SSL checks. We'll allow anything in the former and respond "no" to the latter.
func (pc pgConn) handleStartupMessage(pgconn *pgproto3.Backend) error {
startupMessage, err := pgconn.ReceiveStartupMessage()
if err != nil {
return fmt.Errorf("Error receiving startup message: %s", err)
}
switch startupMessage.(type) {
case *pgproto3.StartupMessage:
buf := (&pgproto3.AuthenticationOk{}).Encode(nil)
buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
_, err = pc.conn.Write(buf)
if err != nil {
return fmt.Errorf("Error sending ready for query: %s", err)
}
return nil
case *pgproto3.SSLRequest:
_, err = pc.conn.Write([]byte("N"))
if err != nil {
return fmt.Errorf("Error sending deny SSL request: %s", err)
}
return pc.handleStartupMessage(pgconn)
default:
return fmt.Errorf("Unknown startup message: %#v", startupMessage)
}
}
Within the main handleMessage
logic we'll check the type of message.
func (pc pgConn) handleMessage(pgc *pgproto3.Backend) error {
msg, err := pgc.Receive()
if err != nil {
return fmt.Errorf("Error receiving message: %s", err)
}
switch t := msg.(type) {
case *pgproto3.Query:
How Typeform Built a Fully Functional User Dash With Tinybird
Creating an API that groups different UTM parameters from Typeform URLs