a curated list of database news from authoritative sources

November 17, 2025

Nested Loop and Hash Join for MongoDB $lookup

SQL databases offer several join algorithms. The query planner selects the most efficient one by evaluating cardinality and estimated cost. For example, a Nested Loop join is ideal when the outer table has few rows and an index allows fast access to the inner table. In contrast, a Hash Join is better suited for situations where the outer table contains many rows and the inner table must be fully scanned, resulting in fewer costly loops.

While MongoDB provides similar algorithms, adapted to flexible documents, being a NoSQL database means it shifts more responsibility to the developer. Developers must design for optimal data access, already in the data model, but has the advantage of resulting in more predictable performance.

I'll base my example on a question on Reddit: Optimizing a MongoDB JOIN with $lookup and $limit. I use a collection of users, where each user references a profile. The profile has a status. The query lists the users with no profile or with a profile with a status equal to 2.

In my demo, I set up two profile keys: "_id," which is automatically indexed in MongoDB, and an "ID" field, which is not. This setup illustrates both situations—an indexed lookup table versus a non-indexed one. Normally, you'd use just one method, depending on which join algorithm you favor.

db.profiles.drop()
db.users.drop()

db.profiles.insertMany([
  { _id:102, ID: 102, status: 2 },
  { _id:201, ID: 201, status: 1 },
  { _id:302, ID: 302, status: 2 },
  { _id:403, ID: 403, status: 3 }
]);

db.users.insertMany([
  { name: "Alice" , profileID: 403 },   // profile status = 3
  { name: "Bob", profileID: 102 },      // profile status = 2
  { name: "Charlie", profileID: 201 },  // profile status = 1
  { name: "Diana", profileID: 102 },    // profile status = 2
  { name: "Eve" },                      // no profile
  { name: "Franck" , profileID: 403 },  // profile status = 3
  { name: "Gaspar" , profileID: 403 },  // profile status = 3
  { name: "Hans" , profileID: 403 },    // profile status = 3
  { name: "Iona" , profileID: 403 },    // profile status = 3
  { name: "Jane" , profileID: 403 },    // profile status = 3
  { name: "Karl" , profileID: 403 },    // profile status = 3
  { name: "Lili" },                     // no profile
  { name: "Math" },                     // no profile
  { name: "Niall" },                    // no profile
  { name: "Oscar" , profileID: 403 },   // status = 3  
  { name: "Paula" , profileID: 102 },   // status = 2  
  { name: "Quentin" , profileID: 201 }, // status = 1  
  { name: "Ravi" , profileID: 102 },    // status = 2  
  { name: "Sofia" },                    // no profile  
  { name: "Takumi" , profileID: 403 },  // status = 3  
  { name: "Uma" , profileID: 403 },     // status = 3  
  { name: "Viktor" , profileID: 403 },  // status = 3  
  { name: "Wafa" , profileID: 403 },    // status = 3  
  { name: "Ximena" , profileID: 403 },  // status = 3  
  { name: "Yara" },                     // no profile  
  { name: "Zubair" },                   // no profile 
]);

Here is my query on this small data set:

db.users.aggregate([  
  {  
    $lookup: {  
      from: "profiles",  
      localField: "profileID",  
      foreignField: "ID",  
      as: "profile"  
    }  
  },  
  {  
    $match: {  
      $or: [  
        { profile: { $eq: [] } }, // no profile  
        { profile: { $elemMatch: { status: 2 } } } // profile status = 2  
      ]  
    }  
  },  
  {  
    $project: {  
      _id: 0,  
      name: 1,  
      "profile.status": 1 // keep only the status field from joined profile  
    }  
  }  
]);  

Here is the result:

[
  { name: 'Bob', profile: [ { status: 2 } ] },
  { name: 'Diana', profile: [ { status: 2 } ] },
  { name: 'Eve', profile: [] },
  { name: 'Lili', profile: [] },
  { name: 'Math', profile: [] },
  { name: 'Niall', profile: [] },
  { name: 'Paula', profile: [ { status: 2 } ] },
  { name: 'Ravi', profile: [ { status: 2 } ] },
  { name: 'Sofia', profile: [] },
  { name: 'Yara', profile: [] },
  { name: 'Zubair', profile: [] },
]
Type "it" for more
test>

To scale the number of users, I multiply each existing user by 10,000 using the following script:

const currentUsers = db.users.find({},{_id:0, name:1, profileID:1});
currentUsers.forEach(userDoc => {  
    print(`Inserting 10,000 documents for: ${JSON.stringify(userDoc)}`);  
    for (let i = 0; i < 10000; i++) { 
      const newUsers = [];
      const clone = { ...userDoc };
      clone.name=`${i}${clone.name}`
      newUsers.push(clone);
      db.users.insertMany(newUsers);
    }
})

I have now 260,026 users.

Indexed nested loop join

I run my query with explain("executionStats") and extract the most important statistics about the time and the number of documents examined:

x=db.users.aggregate([
  { "$lookup" : {
    "from" : "profiles",
    "localField" : "profileID",
    "foreignField" : "_id",
    "as" : "profile" }},
  {
    $match: {
      $or: [
        { "profile": { $eq: [] } }, 
        { "profile": { $elemMatch: { status: 2 } } },
      ]
    }
  },
]).explain("executionStats")

xs=x["stages"][0]["$cursor"]["executionStats"];
xp=x["stages"][0]["$cursor"]["queryPlanner"]["winningPlan"]["queryPlan"];
print("nReturned:",           xs["nReturned"],
      "executionTimeMillis:", xs["executionTimeMillis"],
      "totalKeysExamined:",   xs["totalKeysExamined"],
      "totalDocsExamined:",   xs["totalDocsExamined"],
      xp["stage"]+" strategy:",  xp["strategy"],
)

The lookup stage has returned all documents, as it must be joined before filtering, in 2.5 seconds:

 nReturned: 260026
 executionTimeMillis: 2456
 totalKeysExamined: 190019
 totalDocsExamined: 450045
 EQ_LOOKUP strategy: IndexedLoopJoin

The equality lookup used an indexed loop join strategy, with an index scan for each document:

  • nReturned: 260026: All local documents with their joined profile arrays
  • executionTimeMillis: 2456: Total execution time including both join and filter stages
  • totalKeysExamined: 190019: Only keys that found matches in the profiles collection's index on "_id" are accounted (lookup_query_stats). The index can determine "key not found" without actually examining a key entry.
  • totalDocsExamined: 450045: users collection scan (260,026) + profile documents fetched (190,019)

The number of profiles examined is high compared to the number of profiles in the collection, then another algorithm can be faster by reading all profiles once and lookup from a hash table.

Hash Join on small unindexed table

MongoDB 8.0 chooses a hash join on the following conditions:

  • allowDiskUse: true is set (required for spilling if hash table exceeds memory)
  • Foreign collection is small enough - controlled by these parameters: internalQueryCollectionMaxNoOfDocumentsToChooseHashJoin (default: 10,000 docs), internalQueryCollectionMaxDataSizeBytesToChooseHashJoin (default: 100 MB), and internalQueryCollectionMaxStorageSizeBytesToChooseHashJoin (default: 100 MB)
  • No compatible index exists, disk use is allowed and hash join is more efficient

To show that, I use the "ID" field, that is not indexed, for the lookup:

x=db.users.aggregate([
  { "$lookup" : {
    "from" : "profiles",
    "localField" : "profileID",
    "foreignField" : "ID",
    "as" : "profile" }},
  {
    $match: {
      $or: [
        { "profile": { $eq: [] } }, 
        { "profile": { $elemMatch: { status: 2 } } },
      ]
    }
  },
],{ allowDiskUse: true } ).explain("executionStats")

xs=x["stages"][0]["$cursor"]["executionStats"];
xp=x["stages"][0]["$cursor"]["queryPlanner"]["winningPlan"]["queryPlan"];
print("nReturned:",           xs["nReturned"],
      "executionTimeMillis:", xs["executionTimeMillis"],
      "totalKeysExamined:",   xs["totalKeysExamined"],
      "totalDocsExamined:",   xs["totalDocsExamined"],
      xp["stage"]+" strategy:",  xp["strategy"],
)

The HashJoin completed 3.3x faster (750ms vs 2,456ms) with significantly different execution patterns:

 nReturned: 260026
 executionTimeMillis: 750
 totalKeysExamined: 0
 totalDocsExamined: 260030
 EQ_LOOKUP strategy: HashJoin

HashJoin Works in two phases, with no index required (totalKeysExamined: 0):

  • Build Phase - Scans the foreign collection once to build an in-memory hash table keyed by foreignField values. It has read the 4 profiles.
  • Probe Phase - Scans the local collection once, probing the hash table for each local key. It has read 260,026 users.

The total is 260,030 documents examined.

Nested loop join without index

A third option is a nested loop join that scans the collection in each loop, rather than using an index or building a hash table. I disable disk usage to avoid hash join and use the non-indexed field. to avoid indexed nested loop:

x=db.users.aggregate([
  { "$lookup" : {
    "from" : "profiles",
    "localField" : "profileID",
    "foreignField" : "ID",
    "as" : "profile" }},
  {
    $match: {
      $or: [
        { "profile": { $eq: [] } }, 
        { "profile": { $elemMatch: { status: 2 } } },
      ]
    }
  },
],{ allowDiskUse: false } ).explain("executionStats")

xs=x["stages"][0]["$cursor"]["executionStats"];
xp=x["stages"][0]["$cursor"]["queryPlanner"]["winningPlan"]["queryPlan"];
print("nReturned:",           xs["nReturned"],
      "executionTimeMillis:", xs["executionTimeMillis"],
      "totalKeysExamined:",   xs["totalKeysExamined"],
      "totalDocsExamined:",   xs["totalDocsExamined"],
      xp["stage"]+" strategy:",  xp["strategy"],
)

Here are the execution statistics:

 nReturned: 260026
 executionTimeMillis: 1618
 totalKeysExamined: 0
 totalDocsExamined: 1300130
 EQ_LOOKUP strategy: NestedLoopJoin

Like other algorithms, 260,026 users were read. Since there was no index on the foreign field, threre's no index scan at all (totalKeysExamined: 0). This caused the system to scan the 4 profiles for each user, resulting in a total of 260,026 + 4 × 260,026 = 1300130 documents examined.

In this example, this approach is five times more costly than IndexedLoopJoin in terms of documents examined, and three times more than HashJoin because NestedLoopJoin requires repeatedly scanning the foreign collection. Interestingly, because the lookup collection is very small and sequential scans are cache-friendly, the execution time surpasses that of the indexed nested loop in this case, as index seeks incur additional costs.

Summary

Like with any database, it is important to understand the join algorithms. When you use a lookup in the aggregtion pipleline, MongoDB selects the join strategy based on the query, collections and existing indexes. The three algorithms are:

  1. IndexedLoopJoin - Best when:

    • Compatible index exists on foreignField
    • Low to medium match rate
    • Foreign collection is large
  2. HashJoin - Best when:

    • allowDiskUse: true is set
    • Foreign collection is small (< 10,000 docs, < 100MB)
    • High match rate or no compatible index
  3. NestedLoopJoin - Fallback when:

    • No compatible index exists
    • allowDiskUse: false prevents HashJoin
    • Acceptable only for tiny foreign collections

Unlike SQL databases, where the optimizer makes all decisions but can also lead to surprises, MongoDB shifts responsibility to developers. You must:

  • Design your schema with join performance in mind. For bounded relstionships, embedding may be the best solution.
  • Understand your data (collection sizes, match rates) to predict which strategy will be be the best.
  • Test different strategies by creating/dropping indexes and toggling allowDiskUse
  • Measure performance using explain("executionStats") to validate your choices.

This design favors predictability and control over relying on automatic optimization. So when you hear statements like 'joins are slow' or 'lookups are fast', take time to understand the facts, how these operations are actually executed, before forming an opinion.

November 14, 2025

Amazon DocumentDB New Query Planner

In September 2025, AWS announced it will contribute to the open-source DocumentDB extension for PostgreSQL, originally developed by Microsoft for CosmosDB and donated to the Linux Foundation. Amazon DocumentDB plans to migrate to this extension, but some improvements were already on the roadmap for 2025 such as a new query planner (also known as NQP or planner_version 2.0). Here is a great improvement for a simple query: get the last ten orders for a product in a specific country.

I created the "orders" collection and an index that covers the filtering and pagination for my use case:

db.orders.createIndex({  
  country_id: 1,  
  "order_details.product_id": 1,  
  created_at: -1  
});  

I place 20,000 orders across three countries, averaging about 6,667 per country:

let products = []; for (let p = 1; p <= 20; p++) {  
  products.push({ product_id: p, qty: (p % 5) + 1 });  
}  

let docs = []; for (let i = 20000; i > 0; i--) {  
  const country_id = (i % 3); 
  const order_details = products.slice(0, 1 + Math.floor(i / 1000));  
  docs.push({  
    country_id: country_id,  
    order_details: order_details,  
    created_at: new Date(Date.now() - (20000 - i) * 60000)  
  });  
}  
db.orders.insertMany(docs);  

The following query retrieves the 10 most recent orders from country "1" that include product "15" in their order details array, sorted by order creation date in descending order:

db.orders.find(  
    { country_id: 1,
      order_details: { $elemMatch: { product_id: 15 } } 
} ).sort({ created_at: -1 }).limit(10)  
;  

The compound index on ("country_id", "order_details.product_id", "created_at") is optimized for filtering and sorting directly through the index. It effectively demonstrates Amazon DocumentDB’s new query planner (NQP) enhancements — particularly in multi-key index bounds calculation, $elemMatch optimization, and sort removal — resulting in fewer documents scanned and faster query execution than with planner v1.

MongoDB

I began by executing this command in the reference for a document database—MongoDB:

test> db.orders.find(
...     { country_id: 1, order_details: { $elemMatch: { product_id: 15 } } }
...   ).sort({ created_at: -1 }).limit(10).explain("executionStats")

{
  explainVersion: '1',
  queryPlanner: {
    namespace: 'test.orders',
    parsedQuery: {
      '$and': [
        {
          order_details: { '$elemMatch': { product_id: { '$eq': 15 } } }
        },
        { country_id: { '$eq': 1 } }
      ]
    },
    indexFilterSet: false,
    queryHash: 'F529912D',
    planCacheShapeHash: 'F529912D',
    planCacheKey: '0FAD6C41',
    optimizationTimeMillis: 0,
    maxIndexedOrSolutionsReached: false,
    maxIndexedAndSolutionsReached: false,
    maxScansToExplodeReached: false,
    prunedSimilarIndexes: false,
    winningPlan: {
      isCached: false,
      stage: 'LIMIT',
      limitAmount: 10,
      inputStage: {
        stage: 'FETCH',
        filter: {
          order_details: { '$elemMatch': { product_id: { '$eq': 15 } } }
        },
        inputStage: {
          stage: 'IXSCAN',
          keyPattern: {
            country_id: 1,
            'order_details.product_id': 1,
            created_at: -1
          },
          indexName: 'country_id_1_order_details.product_id_1_created_at_-1',
          isMultiKey: true,
          multiKeyPaths: {
            country_id: [],
            'order_details.product_id': [ 'order_details' ],
            created_at: []
          },
          isUnique: false,
          isSparse: false,
          isPartial: false,
          indexVersion: 2,
          direction: 'forward',
          indexBounds: {
            country_id: [ '[1, 1]' ],
            'order_details.product_id': [ '[15, 15]' ],
            created_at: [ '[MaxKey, MinKey]' ]
          }
        }
      }
    },
    rejectedPlans: []
  },
  executionStats: {
    executionSuccess: true,
    nReturned: 10,
    executionTimeMillis: 0,
    totalKeysExamined: 10,
    totalDocsExamined: 10,
    executionStages: {
      isCached: false,
      stage: 'LIMIT',
      nReturned: 10,
      executionTimeMillisEstimate: 0,
      works: 11,
      advanced: 10,
      needTime: 0,
      needYield: 0,
      saveState: 0,
      restoreState: 0,
      isEOF: 1,
      limitAmount: 10,
      inputStage: {
        stage: 'FETCH',
        filter: {
          order_details: { '$elemMatch': { product_id: { '$eq': 15 } } }
        },
        nReturned: 10,
        executionTimeMillisEstimate: 0,
        works: 10,
        advanced: 10,
        needTime: 0,
        needYield: 0,
        saveState: 0,
        restoreState: 0,
        isEOF: 0,
        docsExamined: 10,
        alreadyHasObj: 0,
        inputStage: {
          stage: 'IXSCAN',
          nReturned: 10,
          executionTimeMillisEstimate: 0,
          works: 10,
          advanced: 10,
          needTime: 0,
          needYield: 0,
          saveState: 0,
          restoreState: 0,
          isEOF: 0,
          keyPattern: {
            country_id: 1,
            'order_details.product_id': 1,
            created_at: -1
          },
          indexName: 'country_id_1_order_details.product_id_1_created_at_-1',
          isMultiKey: true,
          multiKeyPaths: {
            country_id: [],
            'order_details.product_id': [ 'order_details' ],
            created_at: []
          },
          isUnique: false,
          isSparse: false,
          isPartial: false,
          indexVersion: 2,
          direction: 'forward',
          indexBounds: {
            country_id: [ '[1, 1]' ],
            'order_details.product_id': [ '[15, 15]' ],
            created_at: [ '[MaxKey, MinKey]' ]
          },
          keysExamined: 10,
          seeks: 1,
          dupsTested: 10,
          dupsDropped: 0
        }
      }
    }
  },
  queryShapeHash: '4919DD1DFF01E767C00E497A5C23ADA7B3AC64059E2D454E3975CE17B836BC8A',
  command: {
    find: 'orders',
    filter: {
      country_id: 1,
      order_details: { '$elemMatch': { product_id: 15 } }
    },
    sort: { created_at: -1 },
    limit: 10,
    '$db': 'test'
  },
  serverInfo: {
    host: '365bf79b0370',
    port: 27017,
    version: '8.2.1',
    gitVersion: '3312bdcf28aa65f5930005e21c2cb130f648b8c3'
  },
  serverParameters: {
    internalQueryFacetBufferSizeBytes: 104857600,
    internalQueryFacetMaxOutputDocSizeBytes: 104857600,
    internalLookupStageIntermediateDocumentMaxSizeBytes: 104857600,
    internalDocumentSourceGroupMaxMemoryBytes: 104857600,
    internalQueryMaxBlockingSortMemoryUsageBytes: 104857600,
    internalQueryProhibitBlockingMergeOnMongoS: 0,
    internalQueryMaxAddToSetBytes: 104857600,
    internalDocumentSourceSetWindowFieldsMaxMemoryBytes: 104857600,
    internalQueryFrameworkControl: 'trySbeRestricted',
    internalQueryPlannerIgnoreIndexWithCollationForRegex: 1
  },
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1763157330, i: 1 }),
    signature: {
      hash: Binary.createFromBase64('AAAAAAAAAAAAAAAAAAAAAAAAAAA=', 0),
      keyId: Long('0')
    }
  },
  operationTime: Timestamp({ t: 1763157330, i: 1 })
}

This is an efficient execution plan that examines only ten index entries (keysExamined: 10) to retrieve the ten documents in the result (nReturned: 10).

Oracle

Before examining DocumentDB, I tested the same query on Oracle Autonomous Database to compare it with a less efficient execution plan:

db.aggregate([ { $sql : {
 statement: "alter session set  \
 statistics_level=all",
 resetSession  : false }} ]);

db.orders.find(  
    { country_id: 1, order_details: { $elemMatch: { product_id: 15 } } }  
  ).sort({ created_at: -1 }).limit(10)  
; 

db.aggregate( [ { $sql : `
select * from 
 dbms_xplan.display_cursor(format=>'ALLSTATS LAST +PROJECTION +PEEKED_BINDS')
` } ] ).forEach(row => print(row.PLAN_TABLE_OUTPUT));

SQL_ID  cx0upnw3dsmrt, child number 1
-------------------------------------
select /*+ FIRST_ROWS(10) */ "DATA",rawtohex("RESID"),"ETAG" from
"ORA"."orders" where JSON_EXISTS("DATA",'$?( (@.country_id.numberOnly()
== $B0) && ( exists(@.order_details[*]?( (@.product_id.numberOnly() ==
$B1) )) ) )' passing :1 as "B0", :2 as "B1" type(strict)) order by
JSON_QUERY("DATA", '$.created_at[*].max()') desc nulls last fetch next
10 rows only

Plan hash value: 345710747

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| Id  | Operation                              | Name                                                              | Starts | E-Rows | A-Rows |   A-Time   | Buffers |  OMem |  1Mem | Used-Mem |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT                       |                                                                   |      1 |        |     10 |00:00:00.01 |    1604 |       |       |          |
|*  1 |  COUNT STOPKEY                         |                                                                   |      1 |        |     10 |00:00:00.01 |    1604 |       |       |          |
|   2 |   VIEW                                 |                                                                   |      1 |      1 |     10 |00:00:00.01 |    1604 |       |       |          |
|*  3 |    SORT ORDER BY STOPKEY               |                                                                   |      1 |      1 |     10 |00:00:00.01 |    1604 |  9216 |  9216 | 8192  (0)|
|   4 |     TABLE ACCESS BY INDEX ROWID BATCHED| orders                                                            |      1 |      1 |   2000 |00:00:00.01 |    1604 |       |       |          |
|   5 |      HASH UNIQUE                       |                                                                   |      1 |      1 |   2000 |00:00:00.01 |      19 |  1323K|  1323K| 1479K (0)|
|*  6 |       INDEX RANGE SCAN (MULTI VALUE)   | $ora:orders.country_id_1_order_details.product_id_1_created_at_-1 |      1 |      1 |   2000 |00:00:00.01 |      19 |       |       |          |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Peeked Binds (identified by position):
--------------------------------------

   1 - (NUMBER): 1
   2 - (NUMBER): 15

Predicate Information (identified by operation id):
---------------------------------------------------

   1 - filter(ROWNUM<=10)
   3 - filter(ROWNUM<=10)
   6 - access("orders"."SYS_NC00005$"=SYS_CONS_ANY_SCALAR(:1, 3) AND "orders"."SYS_NC00006$"=SYS_CONS_ANY_SCALAR(:2, 3))

Column Projection Information (identified by operation id):
-----------------------------------------------------------

   1 - "from$_subquery$_002"."DATA"[JSON,8200], "from$_subquery$_002"."RAWTOHEX("RESID")"[VARCHAR2,4000], "from$_subquery$_002"."ETAG"[RAW,16]
   2 - "from$_subquery$_002"."DATA"[JSON,8200], "from$_subquery$_002"."RAWTOHEX("RESID")"[VARCHAR2,4000], "from$_subquery$_002"."ETAG"[RAW,16]
   3 - (#keys=1) JSON_VALUE( /*+ QJSNMD5_TC_JCMP_JV */ JSON_QUERY("DATA" /*+ LOB_BY_VALUE */  FORMAT OSON , '$.created_at[*].max()' RETURNING JSON WITHOUT ARRAY WRAPPER NULL ON ERROR TYPE(LAX) ) FORMAT OSON , '$'
       RETURNING ANY ORA_RAWCOMPARE(32767) ERROR ON ERROR TYPE(LAX) )[32767], "DATA" /*+ LOB_BY_VALUE */ [JSON,8200], "orders"."RESID"[RAW,2000], "ETAG"[RAW,16]
   4 - "DATA" /*+ LOB_BY_VALUE */ [JSON,8200], "orders"."RESID"[RAW,2000], "ETAG"[RAW,16]
   5 - (#keys=2) "orders".ROWID[ROWID,10], SYSVARCOL[8]
   6 - "orders".ROWID[ROWID,10], "orders"."SYS_NC00005$"[RAW,4000], "orders"."SYS_NC00006$"[RAW,4000], SYSVARCOL[8]

Here, 2000 (A-Rows) index entries have been read, fetching the documents (TABLE ACCESS BY INDEX ROWID), to be all sorted (SORT ORDER BY STOPKEY), and filtered (COUNT STOPKEY) down to the ten rows of the result. The index has been used, but not efficiently because it is multi-key (MULTI VALUE) and must be deduplicated afterwards (HASH UNIQUE), which doesn't preserve the key order. This is less efficient than MongoDB multi-key indexes where deduplication happens during the scan and preserve the ordering.

Amazon DocumentDB with query planner v1

I created a DocumentDB cluster on AWS which defaults to the first version of the query planner:

I ran the same query and display the execution plan (plannerVersion: 1):

rs0 [direct: primary] test> db.orders.find(
...     { country_id: 1, order_details: { $elemMatch: { product_id: 15 } } }
...   ).sort({ created_at: -1 }).limit(10).explain("executionStats")
... ;

{
  queryPlanner: {
    plannerVersion: 1,
    namespace: 'test.orders',
    winningPlan: {
      stage: 'SUBSCAN',
      inputStage: {
        stage: 'LIMIT_SKIP',
        inputStage: {
          stage: 'SORT',
          sortPattern: { created_at: -1 },
          inputStage: {
            stage: 'FETCH',
            inputStage: {
              stage: 'IXSCAN',
              indexName: 'country_id_1_order_details.product_id_1_created_at_-1'
            }
          }
        }
      }
    }
  },
  executionStats: {
    executionSuccess: true,
    executionTimeMillis: '43.478',
    planningTimeMillis: '0.318',
    executionStages: {
      stage: 'SUBSCAN',
      nReturned: '10',
      executionTimeMillisEstimate: '43.046',
      inputStage: {
        
                                    
                                    
                                    
                                    
                                

Introducing fully managed Blue/Green deployments for Amazon Aurora Global Database

Today, we're introducing Amazon RDS Blue/Green support for Aurora Global Database, enabling database upgrades and modifications with minimal downtime. With just a few steps, you can create a blue/green deployment that establishes a fully managed staging (green) environment mirroring the existing production (blue) environment, including the primary and its associated secondary regions of the Global Database.

Op Color Plots

A lot of my work involves staring at visualizations trying to get an intuitive feeling for what a system is doing. I’ve been working on a new visualization for Jepsen, a distributed systems testing library. This is something I’ve had in the back of my head for years but never quite got around to.

A Jepsen test records a history of operations. Those operations often come in a few different flavors. For instance, if we’re testing a queue, we might send messages into the queue, and try to read them back at the end. It would be bad if some messages didn’t come back; that could mean data loss. It would also be bad if messages came out that were never enqueued; that could signify data corruption. A Jepsen checker for a queue might build up some data structures with statistics and examples of these different flavors: which records were lost, unexpected, and so on. Here’s an example from the NATS test I’ve been working on this month:

{:valid?             false,
 :attempt-count      529583,
 :acknowledged-count 529369,
 :read-count         242123,
 :ok-count           242123,
 :recovered-count    3
 :hole-count         159427,
 :lost-count         287249,
 :unexpected-count   0,
 :lost               #{"110-6014" ... "86-8234"},
 :holes              #{"110-4072" ...  "86-8234"},
 :unexpected         #{}}

You can tell just by eyeballing the numbers that most attempted writes were acknowledged, and about half of them were later read back. There were just three “recovered” writes where we didn’t know if they succeeded or not, and they later appeared. About half were lost: acknowledged but never read. About half of those were “holes”—writes which were missing even though some later write was read. And there’s a few examples, in case you want to go digging into the history and see what might have happened to specific writes.

At the same time, there are lots of qualitative questions that are hard to answer statistically. For instance, were the lost writes clustered together in time, or were they spread out? What faults might have happened to trigger write loss? Is data loss universal on short timescales, or do, say, 40% of writes survive? Is the rate of writes uniform over time, or do lost writes happen faster or slower? Did the data loss event destroy all records prior to some time, or did some survive? Could apparent data loss be attributed to slow delivery of messages, or is it likely that the data is truly missing?

This plot helps answer some of those questions. Time flows left to right. Each operation (measured by its invocation time) becomes a single point. Colors indicate the flavor of that operation: OK, lost, unknown, or so on. Operations are splayed out vertically, in such a way that the aggregate “shape” of the plot traces out the rough throughput of the system over time. Operations from the fault-injection system are shown as vertical lines and (for process kills) horizontal bars spanning them.

From this, we can see that data loss occurred in two large chunks, starting near a file-corruption operation at roughly 65 seconds and running until the end of the test. They were not evenly mixed: writes are lost in blocks. A few records survived around 87 seconds in, then everything later was lost as well. These OK records in the middle hint that this is “real” data loss, as opposed to readers lagging behind. The rate of OK and lost operations was essentially constant at ~6,800 records/sec. Unknown operations happened much slower–likely due to timeouts. Some, but not all, process kills caused throughput to tank. You can guess that some of them took down a majority of nodes, halting the cluster until nodes were restarted; others were recoverable after a few seconds.

Jepsen tests can range from a handful of operations to hundreds of millions, and our plots need to work for both extremes. In this case, the plot used single-pixel dots for frequent operations like ok and lost, but for the handful of unknown operations, switched to a larger cross style. These infrequent operations are often of the most interest, and could easily get lost in the noise, so it makes sense to visually emphasize them.

This isn’t a good plot yet. I am, for instance, running out of colors to represent all the kinds of faults, and that leads to awkward color-blind issues like the red/green pairing here. There’s a sort of aliasing/moire pattern caused by the point layout algorithm, which divides the history into 512 time windows, computes a height for each window based on throughput, and then spreads the window’s points along the y axis uniformly. I feel like I may be able to use some sort of adaptively-determined transparency and overlapping dots to get something a little closer to a density field, and that might read more clearly, but I’m wary of what might happen when some windows have lots of plots and others have only a few.

Despite these shortcomings, this plot has been remarkably useful! I’m using them to get an at-a-glance feel for how bad a given test run is, to figure out where in the history I should look, and to refine the checker itself.

Because there’s lots of ways you could interpret these plots—showing lost elements of a set, highlighting transaction anomalies in an SQL test, showing how read-only and read-write queries are affected by faults—I don’t really know what to name them yet. For now I’m calling them “op color plots”. They’re available in the current Jepsen 0.3.10-SNAPSHOT, and I’m hoping they’ll be useful in all kinds of tests to come.

A Tale of Two Databases: No-Op Updates in PostgreSQL and MySQL

I’m lazy when I’m speakin’ I’m lazy when I walk I’m lazy when I’m dancin’ I’m lazy when I talk   X-Press 2 Feat. David Byrne – Lazy While preparing a blog post to compare how PostgreSQL and MySQL handle locks, as part of a series covering the different approaches to MVCC for these databases, […]

$5 PlanetScale is live

You can now create single node Postgres databases on PlanetScale starting at just $5.

November 13, 2025

Distributing Data in a Redis/Valkey Cluster: Slots, Hash Tags, and Hot Spots

When scaling Redis or its open source fork Valkey, a single instance can cause a bottleneck. The solution is to create a sharded cluster, where the cluster partitions data across multiple nodes. Understanding how this partitioning works is crucial for designing efficient, scalable applications. This article explores the mechanics of key distribution, the use of […]

November 12, 2025

PostgreSQL OIDC Authentication with pg_oidc_validator

Among the new fetures introduced in PostgreSQL 18 is support for OAuth-based authentication. This opened the door for the community to create extensions that integrate systems providing Single Sign-On (SSO) through OAuth 2.0 authentication with PostgreSQL. The reason this integration was not added directly to the core of PostgreSQL is due to the particularities found in those […]

November 11, 2025

Disaggregated Database Management Systems

This paper is based on a panel discussion from the TPC Technology Conference 2022. It surveys how cloud hardware and software trends are reshaping database system architecture around the idea of disaggregation.

For me, the core action is in Section 4: Disaggregated Database Management Systems. Here the paper discusses three case studies (Google AlloyDB, Rockset, and Nova-LSM) to give a taste of the software side of the movement. Of course there are many more. You can find Aurora, Socrates, and Taurus, and TaurusMM reviews in my blog. In addition, Amazon DSQL (which I worked on) is worth discussing soon. I’ll also revisit the PolarDB series of papers, which trace a fascinating arc from active log-replay storage toward simpler, compute-driven designs. Alibaba has been prolific in this space, but the direction they are ultimately advocating remains muddled across publications, which reflect conflicting goals/priorities.


AlloyDB

AlloyDB extends PostgreSQL with compute–storage disaggregation and HTAP support. Figure 4 in the paper shows its layered design: the primary node  (RW node) handles writes, a set of read pool replicas (RO nodes) provide scalable reads, and a shared distributed storage engine persists data in Google's Colossus file system. The read pools can be elastically scaled up or down with no data movement, because the data lives in disaggregated storage.

AlloyDB's hybrid nature enables it ot combine transactional and analytical processing by maintaining both a row cache and a pluggable columnar engine. The columnar engine vectorizes execution and automatically converts hot data into columnar format when it benefits analytic queries.

Under the covers, the database storage engine materializes pages from logs and stores blocks on Colossus. Logs are written to regional log storage; log-processing servers (LPS) continuously replay and materialize pages in the zones where compute nodes run. Durability and availability are decoupled: the logs are durable in regional log storage, while LPS workers ensure the blocks are always available near the compute.

This is a nice example of disaggregation serving elasticity and performance: compute scales independently and HTAP workloads benefit from a unified, multi-format cache hierarchy.


Rockset

Rockset seems to be a poster child for disaggregation in real-time analytics. Rockset's architecture follows the Aggregator–Leaf–Tailer (ALT) pattern (Figure 6). ALT separates compute for writes (Tailers), compute for reads (Aggregators and Leaves), and storage. Tailers fetch new data from sources such as Kafka or S3. Leaves index that data into multiple index types (columnar, inverted, geo, document). Aggregators then run SQL queries on top of those indexes, scaling horizontally to serve high-concurrency, low-latency workloads.

The key insight is that real-time analytics demands strict isolation between writes and reads. Ingest bursts must not impact query latencies. Disaggregation makes that possible by letting each tier scale independently: more Tailers when ingest load spikes, more Aggregators when query demand surges, and more Leaves as data volume grows.

Rockset also shows why LSM-style storage engines (and append-only logs in general) are natural fits for disaggregation. RocksDB-Cloud never mutates SST files after creation. All SSTs are immutable and stored in cloud object stores like S3. This makes them safely shareable across servers. A compaction job can be sent from one server to another: server A hands the job to a stateless compute node B, which fetches SSTs, merges them, writes new SSTs to S3, and returns control. Storage and compaction compute are fully decoupled.


Memory Disaggregation

The panel also discussed disaggregated memory as an emerging frontier. Today's datacenters waste over half their DRAM capacity due to static provisioning. It's shocking, no? RDMA-based systems like Redy have shown that remote memory can be used elastically to extend caches. The paper looks ahead to CXL as the next step as its coherent memory fabric can make remote memory behave like local. CXL promises fine-grained sharing and coherence. 


Hardware Disaggregation

On the hardware side, the paper surveys how storage, GPUs, and memory are being split from servers and accessed via high-speed fabrics. An interesting case study here is Fungible's DPU-based approach. The DPU offloads data-centric tasks (networking, storage, security) from CPUs, enabling server cores to focus solely on application logic. In a way, the DPU is a hardware embodiment of disaggregation.


Future Directions

Disaggregated databases are already here. Yet there are still many open questions.

  • How do we automatically assemble microservice DBMSs on demand, choosing the right compute, memory, and storage tiers for a workload?
  • How do we co-design software and hardware across fabrics like CXL to avoid data movement while preserving performance isolation?
  • How do we verify the correctness of such dynamic compositions?
  • Can a DBMS learn to reconfigure itself (rebalancing compute and storage) to stay optimal under changing workload patterns?
  • How do we deal fault-tolerance availability issues and develop new distributed systems protocols that exploit opportunities that open up in the disaggregated model?

As Swami said in Sigmod 2023 Panel "The customer value is here, and the technical problems will be solved in time. Thanks to the  complexities of disaggregation problems, every database/systems assistant professor is going to get tenure figuring how to solve them." 

How does it scale? The most basic benchmark on MongoDB

Choosing a database requires ensuring that performance remains fast as your data grows. For example, if a query takes 10 milliseconds on a small dataset, it should still be quick as the data volume increases and should never approach the 100ms threshold that users perceive as waiting. Here’s a simple benchmark: we insert batches of 1,000 operations into random accounts, then query the account with the most recent operation in a specific category—an OLTP scenario using filtering and pagination. As the collection grows, a full collection scan would slow down, so secondary indexes are essential.

We create an accounts collection, where each account belongs to a category and holds multiple operations—a typical one-to-many relationship, with an index for our query on operations per categories:

db.accounts.createIndex({
  category: 1,
  "operations.date": 1,
  "operations.amount": 1,
});

To increase data volume, this function inserts operations into accounts (randomly distributed to ten million accounts over three categories):

function insert(num) {
  const ops = [];
  for (let i = 0; i < num; i++) {
    const account  = Math.floor(Math.random() * 10_000_000) + 1;
    const category = Math.floor(Math.random() * 3);
    const operation = {
      date: new Date(),
      amount: Math.floor(Math.random() * 1000) + 1,
    };
    ops.push({
      updateOne: {
        filter: { _id: account },
        update: {
          $set: { category: category },
          $push: { operations: operation },
        },
        upsert: true,
      }
    });
  }
  db.accounts.bulkWrite(ops);
}

This adds 1,000 operations and should take less than one second:

let time = Date.now();
insert(1000);
console.log(`Elapsed ${Date.now() - time} ms`);

A typical query fetches the account, in a category, that had the latest operation:

function query(category) {
  return db.accounts.find(
    { category: category },
    { "operations.amount": 1 , "operations.date": 1 }
  )
    .sort({ "operations.date": -1 })
    .limit(1);
}

Such query should take a few milliseconds:

let time = Date.now();
print(query(1).toArray());
console.log(`Elapsed ${Date.now() - time} ms`);

I repeatedly insert new operations, by batches of one thousand, in a loop, and measure the time taken for the query while the collection grows, stopping once I reach one billion operations randomly distributed into the accounts:

for (let i = 0; i < 1_000_000; i++) { 
  // more data  
  insert(1000);  
  // same query
  const start = Date.now();  
  const results = query(1).toArray();  
  const elapsed = Date.now() - start;  
  print(results);  
  console.log(`Elapsed ${elapsed} ms`);  
}  
console.log(`Total accounts: ${db.accounts.countDocuments()}`);  

In a scalable database, the response time should not significantly increase while the collection grows. I've run that in MongoDB, and response time stays in single digit milliseconds. I've run that in an Oracle Autonomous Database, with the MongoDB emulation, but I can't publish the results as Oracle Corporations forbids the publication of database benchmarks (DeWitt Clause).

You can copy/paste this test and watch the elapsed time while data is growing, on you own infrastructure.

November 10, 2025

The Future of Fact-Checking is Lies, I Guess

Last weekend I was trying to pull together sources for an essay and kept finding “fact check” pages from factually.co. For instance, a Kagi search for “pepper ball Chicago pastor” returned this Factually article as the second result:

Fact check: Did ice agents shoot a pastor with pepperballs in October in Chicago

The claim that “ICE agents shot a pastor with pepperballs in October” is not supported by the available materials supplied for review; none of the provided sources document a pastor being struck by pepperballs in October, and the only closely related reported incident involves a CBS Chicago reporter’s vehicle being hit by a pepper ball in late September [1][2]. Available reports instead describe ICE operations, clergy protests, and an internal denial of excessive force, but they do not corroborate the specific October pastor shooting allegation [3][4].

Here’s another “fact check”:

Fact check: Who was the pastor shot with a pepper ball by ICE

No credible reporting in the provided materials identifies a pastor who was shot with a pepper‑ball by ICE; multiple recent accounts instead document journalists, protesters, and community members being hit by pepper‑ball munitions at ICE facilities and demonstrations. The available sources (dated September–November 2025) describe incidents in Chicago, Los Angeles and Portland, note active investigations and protests, and show no direct evidence that a pastor was targeted or injured by ICE with a pepper ball [1] [2] [3] [4].

These certainly look authoritative. They’re written in complete English sentences, with professional diction and lots of nods to neutrality and skepticism. There are lengthy, point-by-point explanations with extensively cited sources. The second article goes so far as to suggest “who might be promoting a pastor-victim narrative”.

The problem is that both articles are false. This story was broadly reported, as in this October 8th Fox News article unambiguously titled “Video shows federal agent shoot Chicago pastor in head with pepper ball during Broadview ICE protest”. DHS Assistant Secretary Tricia McLaughlin even went on X to post about it. This event definitely happened, and it would not have been hard to find coverage at the time these articles were published. It was, quite literally, all over the news.

Or maybe they’re sort of true. Each summary disclaims that its findings are based on “the available materials supplied for review”, or “the provided materials”. This is splitting hairs. Source selection is an essential part of the fact-checking process, and Factually selects its own sources in response to user questions. Instead of finding authoritative sources, Factually selected irrelevant ones and spun them into a narrative which is the opposite of true. Many readers will not catch this distinction. Indeed, I second-guessed myself when I saw the Factually articles—and I read the original reporting when it happened.

“These conversations matter for democracy,” says the call-to-action at the top of every Factually article. The donation button urges readers to “support independent reporting.”

But this is not reporting. Reporters go places and talk to people. They take photographs and videos. They search through databases, file FOIA requests, read court transcripts, evaluate sources, and integrate all this with an understanding of social and historical context. People go to journalism school to do this.

What Factually does is different. It takes a question typed by a user and hands it to a Large Language Model, or LLM, to generate some query strings. It performs up to three Internet search queries, then feeds the top nine web pages it found to an LLM, and asks a pair of LLMs to spit out some text shaped like a fact check. This text may resemble the truth, or—as in these cases—utterly misrepresent it.

Calling Factually’s articles “fact checks” is a category error. A fact checker diligently investigates a contentious claim, reasons about it, and ascertains some form of ground truth. Fact checkers are held to a higher evidentiary standard; they are what you rely on when you want to be sure of something. The web pages on factually.co are fact-check-shaped slurry, extruded by a statistical model which does not understand what it is doing. They are fancy Mad Libs.

Some times the Mad Libs are right. Some times they’re blatantly wrong. Some times it is clear that the model simply has no idea what it is doing, as in this article where Factually is asked whether it “creates fake fact-checking articles”, and in response turns to web sites like Scam Adviser, which evaluate site quality based on things like domain age and the presence of an SSL certificate, or Scam Detector, which looks for malware and phishing. Neither of these sources has anything to do with content accuracy. When asked if Factually is often incorrect (people seem to ask this a lot) Factually’s LLM process selects sources like DHS Debunks Fake News Media Narratives from June, Buzzfeed’s 18 Science Facts You Believed in the 1990s That Are Now Totally Wrong, and vocabulary.com’s definition of the word “Wrong”. When asked about database safety, Factually confidently asserts that “Advocates who state that MongoDB is serializable typically refer to the database’s support for snapshot isolation,” omitting that Snapshot Isolation is a completely different, weaker property. Here’s a Factually article on imaginary “med beds” which cites this incoherent article claiming to have achieved quantum entanglement via a photograph. If a real fact checker shows you a paper like this with any degree of credulousness, you can safely ignore them.

The end result of this absurd process is high-ranking, authoritative-sounding web pages which sometimes tell the truth, and sometimes propagate lies. Factual has constructed a stochastic disinformation machine which exacerbates the very problems fact-checkers are supposed to solve.

Please stop doing this.

Comparing Integers and Doubles

During automated testing we stumbled upon a problem that boiled down to transitive comparisons: If a=b, and a=c, when we assumed that b=c. Unfortunately that is not always the case, at least not in all systems. Consider the following SQL query:

select a=b, a=c, b=c
from (values(
   1234567890123456789.0::double precision,
   1234567890123456788::bigint,
   1234567890123456789::bigint)) s(a,b,c)

If you execute that in Postgres (or DuckDB, or SQL Server, or ...) the answer is (true, true, false). That is, the comparison is not transitive! Why does that happen? When these systems compare a bigint and a double, they promote the bigint to double and then compare. But a double has only 52 bits of mantissa, which means it will lose precision when promoting large integers to double, producing false positives in the comparison.

This behavior is highly undesirable, first because it confuses the optimizer, and second because (at least in our system) joins work very differently: Hash joins promote to the most restrictive type and discard all values that cannot be represented, as they will never produce a join partner for sure. For double/bigint joins that leads to observable differences between joins and plain comparisons, which is very bad.

How should we compare correctly? Conceptually the situation is clear, an IEEE 754 floating point with sign s, mantissa m, and exponent e represents the values (-1)^s*m*2^e, we just have to compare the integer with that value. But there is no easy way to do that, if we do a int/double comparison in, e.g., C++, the compiler does the same promotion to double, messing up the comparison.

We can get the logic right by doing two conversions: We first convert the int to double and compare that. If the values are not equal, the order is clear and we can use that. Otherwise, we convert the double back to an integer and check if the conversion rounded up or down, and handle the result. Plus some extra checks to avoid undefined behavior (the conversion of intmax64->double->int64 is not defined) and to handle non-finite values, and we get: 

int cmpDoubleInt64(double a, int64_t b) {
   // handle intmax and nan
   if (!(a<=0x1.fffffffffffffp+62)) return 1;

   // fast path comparison
   double bd = b;
   if (a!=bd) return (a<bd)?-1:1;

   // handle loss of precision
   int64_t ai = a;
   if (ai!=b) return (ai<b)?-1:1;
   return 0;
}

Which is the logic that we now use. Who else does it correctly? Perhaps somewhat surprisingly, Python and SQLLite.  Other database systems (and programming languages) that we tested all lost precision during the comparison, leading to tons of problems. IMHO a proper int/double comparison should be available in every programming language, at least as library function. But in most languages (and DBMSes) it isn't. You can use that code above if you ever have this problem.

Taurus MM: A Cloud-Native Shared-Storage Multi-Master Database

This VLDB'23 paper presents Taurus MM, Huawei's cloud-native, multi-master OLTP database built to scale write throughput in clusters between 2 to 16 masters. It extends the single-master TaurusDB design (which we reviewed yesterday) into a multi-master design while following its shared-storage architecture with separate compute and storage layers. Each master maintains its own write-ahead log (WAL) and executes transactions independently; there are no distributed transactions. All masters share the same Log Stores and Page Stores, and data is coordinated through new algorithms that reduce network traffic and preserve strong consistency.

The system uses pessimistic concurrency control to avoid frequent aborts on contended workloads. Consistency is maintained through two complementary mechanisms: a new clock design that makes causal ordering efficient, and a new hybrid locking protocol that cuts coordination cost.


Vector-Scalar (VS) Clocks

A core contribution is the Vector-Scalar (VS) clock, a new type of logical clock that combines the compactness of Lamport clocks with the causal precision/completenes of vector clocks.

Ordinary Lamport clocks are small but they fail to capture causality fully, in both directions. Vector clocks capture causality fully, but scale poorly. An 8-node vector clock adds 64 bytes to every message or log record, which turns into a prohibitive cost when millions of short lock and log messages per second are exchanged in a cluster. Taurus MM solves this by letting the local component of each node's VS clock behave like a Lamport clock, while keeping the rest of the vector to track other masters' progress. This hybrid makes the local counter advance faster (it reflects causally related global events, not just local ones) yet still yields vector-like ordering when needed.

VS clocks can stamp messages either with a scalar or a vector timestamp depending on context. Scalar timestamps are used when causality is already known, such as for operations serialized by locks or updates to the same page. Vector timestamps are used when causality is uncertain, such as across log flush buffers or when creating global snapshots.

I really like the VC clocks algorithm, and how it keeps most timestamps compact while still preserving ordering semantics. It's conceptually related to Hybrid Logical Clocks (HLC) in that it keeps per-node clock values close and comparable, but VS clocks are purely logical, driven by Lamport-style counters instead of synchronized physical time. The approach enables rapid creation of globally consistent snapshots and reduces timestamp size and bandwidth consumption by up to 60%.

I enjoyed the paper's pedagogical style in Section 5, as it walks the reader through deciding whether each operation needs scalar or vector timestamps. This  makes it clear how we can enhance efficiency by applying the right level of causality tracking to each operation.


Hybrid Page-Row Locking

The second key contribution is a hybrid page-row locking protocol. Taurus MM maintains a Global Lock Manager (GLM) that manages page-level locks (S and X) across all masters. Each master also runs a Local Lock Manager (LLM) that handles row-level locks independently once it holds the covering page lock.

The GLM grants page locks, returning both the latest page version number and any row-lock info. Once a master holds a page lock, it can grant compatible row locks locally without contacting the GLM. When the master releases a page, it sends back the updated row-lock state so other masters can reconstruct the current state lazily.

Finally, row-lock changes don't need to be propagated immediately and are piggybacked on the page lock release flow. This helps reduce lock traffic dramatically. The GLM only intervenes when another master requests a conflicting page lock.

This separation of global page locks and local row locks resembles our 2014 Panopticon work, where we combined global visibility and local autonomy to limit coordination overhead.


Physical and Logical Consistency

Taurus MM distinguishes between physical and logical consistency. Physical consistency ensures structural correctness of pages. The master groups log records into log flush buffers (LFBs) so that each group ends at a physically consistent point (e.g., a B-tree split updates parent and children atomically within LFB bounds). Read replicas apply logs only up to group boundaries, avoiding partial structural states without distributed locks.

Logical consistency ensures isolation-level correctness for user transactions (Repeatable Read isolation). Row locks are held until commit, while readers can use consistent snapshots without blocking writers.


Ordering and Replication

Each master periodically advertises the location of its latest log records to all others in a lightweight, peer-to-peer fashion. This mechanism is new in Taurus MM. In single-master TaurusDB, the metadata service (via Metadata PLogs) tracked which log segments were active, but not the current write offsets within them (the master itself notified read replicas of the latest log positions). In Taurus MM, with multiple masters generating logs concurrently, each master broadcasts its current log positions to the others, avoiding a centralized metadata bottleneck.

To preserve global order, each master groups its recent log records (updates from multiple transactions and pages) into a log flush buffer (LFB) before sending it to the Log and Page Stores. Because each LFB may contain updates to many pages, different LFBs may touch unrelated pages. It becomes unclear which buffer depends on which, so the system uses vector timestamps to capture causal relationships between LFBs produced on different masters. Each master stamps an LFB with its current vector clock and also includes the timestamp of the previous LFB, allowing receivers to detect gaps or missing buffers. When an LFB reaches a Page Store, though, this global ordering is no longer needed. The Page Store processes each page independently, and all updates to a page are already serialized by that page's lock and carry their own scalar timestamps (LSNs). The Page Store simply replays each page's log records in increasing LSN order, ignoring the vector timestamp on the LFB. In short, vector timestamps ensure causal ordering across masters before the LFB reaches storage, and scalar timestamps ensure correct within-page ordering after.

For strict transaction consistency, a background thread exchanges full vector (VS) timestamps among masters to ensure that every transaction sees all updates committed before it began. A master waits until its local clock surpasses this merged/pairwise-maxed timestamp before serving the read in order to guarantee a globally up-to-date view. If VS were driven by physical rather than purely logical clocks, these wait times could shrink further.


Evaluation and Takeaways

Experiments on up to eight masters show good scaling on partitioned workloads and performance advantages over both Aurora Multi-Master (shared-storage, optimistic CC) and CockroachDB (shared-nothing, distributed commit).

The paper compares Taurus MM with CockroachDB using TPC-C–like OLTP workloads. CockroachDB follows a shared-nothing design, with each node managing its own storage and coordinating writes through per-key Raft consensus. Since Taurus MM uses four dedicated nodes for its shared storage layer, while CockroachDB combines compute and storage on the same nodes, the authors matched configurations by comparing 2 and 8 Taurus masters with 6- and 12-node CockroachDB clusters, respectively. For CockroachDB, they used its built-in TPC-C–like benchmark; for Taurus MM, the Percona TPC-C variant with zero think/keying time. Results for 1000 and 5000 warehouses show Taurus MM delivering 60% to 320% higher throughput and lower average and 95th-percentile latencies. The authors also report scaling efficiency, showing both systems scaling similarly on smaller datasets (1000 warehouses), but CockroachDB scaling slightly more efficiently on larger datasets with fewer conflicts. They attribute this to CockroachDB’s distributed-commit overhead, which dominates at smaller scales but diminishes once transactions touch only a subset of nodes, whereas Taurus MM maintains consistent performance by avoiding distributed commits altogether.

Taurus MM shows that multi-master can work in the cloud if coordination is carefully scoped. The VS clock is a general and reusable idea, as it provides a middle ground between Lamport and vector clocks. I think VS clocks are useful for other distributed systems that need lightweight causal ordering across different tasks/components.

But is the additional complexity worth it for the workloads? Few workloads may truly demand concurrent writes across primaries. Amazon Aurora famously abandoned its own multi-master mode. Still from a systems-design perspective, Taurus MM contributes a nice architectural lesson.

November 09, 2025

Taurus Database: How to be Fast, Available, and Frugal in the Cloud

This SIGMOD’20 paper presents TaurusDB, Huawei's disaggregated MySQL-based cloud database. TaurusDB refines the disaggregated architecture pioneered by Aurora and Socrates, and provides a simpler and cleaner separation of compute and storage. 

In my writeup on Aurora, I discussed how "log is the database" approach reduces network load, since the compute primary only sends logs and the storage nodes apply them to reconstruct pages. But Aurora did conflate durability and availability somewhat and used quorum-based replication of six replicas for both logs and pages.

In my review of Socrates, I explained how Socrates (Azure SQL Cloud) separates durability and availability by splitting the system into four layers: compute, log, page, and storage. Durability (logs) ensures data is not lost after a crash. Availability (pages/storage) ensures data can still be served while some replicas or nodes fail. Socrates stores pages separately from logs to improve performance but the excessive layering introduces significant architectural overhead.

Taurus takes this further and uses different replication and consistency schemes for logs and pages, exploiting their distinct access patterns. Logs are append-only and used for durability. Log records are independent, so they can be written to any available Log Store nodes. As long as three healthy Log Stores exist, writes can proceed without quorum coordination. Pages, however, depend on previous versions. A Page Store must reconstruct the latest version by applying logs to old pages. To leverage this asymmetry, Taurus uses synchronous, reconfigurable replication for Log Stores to ensure durability, and asynchronous replication for Page Stores to improve scalability, latency, and availability.


But hey, why do we disaggregate in the first place?

Traditional databases were designed for local disks and dedicated servers. In the cloud, this model wastes resources as shown in Figure 1. Each MySQL replica keeps its own full copy of the data, while the underlying virtual disks already store three replicas for reliability. Three database instances mean nine copies in total, and every transactional update is executed three times. This setup is clearly redundant, costly, and inefficient.

Disaggregation fixes this and also brings true elasticity! Compute and storage are separated because they behave differently. Compute is expensive and variable; storage is cheaper and grows slowly. Compute can be stateless and scaled quickly, while storage must remain durable. Separating them allows faster scaling, shared I/O at storage, better resource use, and the capability of scaling compute to zero and restarting quickly when needed.


Architecture overview

Taurus has two physical layers, compute and storage, and four logical components: Log Stores, Page Stores, the Storage Abstraction Layer (SAL), and the database front end. Keeping only two layers minimizes cross-network hops.

The database front end (a modified MySQL) handles queries, transactions, and log generation. The master handles writes; read replicas serve reads.

Log Store stores (well duh!) write-ahead-logs as fixed-size, append-only objects called PLogs. These are synchronously replicated across three nodes. Taurus favors reconfiguration-based replication: If one replicaset fails or lags, a new PLog is created. Metadata PLogs track active PLogs.

Page Store materializes/manages 10 GB slices of page data. Each page version is identified by an LSN, and the Page Store can reconstruct any version. Pages are written append-only, which is 2–5x faster than random writes and gentler on flash. Each slice maintains a lock-free Log Directory mapping (page, version) to log offset. Consolidation of logs into pages happens in memory. Taurus originally prioritized by longest chain first, but then reverted to oldest unapplied write first to prevent metadata buildup. A local buffer pool accelerates log application. For cache eviction, Taurus finds that LFU (least frequently used) performs better than LRU (least recently used), because it keeps these hot pages in cache longer, reducing I/O and improving consolidation throughput. 

Storage Abstraction Layer (SAL) hides the storage complexity from MySQL by serving as an intermediary. It coordinates between Log Stores and Page Stores, manages slice placement, and tracks the Cluster Visible LSN, the latest globally consistent point. SAL advances CV-LSN only when the logs are durable in Log Stores and at least one Page Store has acknowledged them. SAL also batches writes per slice to reduce small I/Os.


Write path and replication

Did you notice the lack of LogStore to PageStore communication in Figure 2 and Figure 3? The paper does not address this directly, but yest there is no direct LogStore-to-PageStore communication. The SAL in the master mediates this instead. SAL first writes logs to the Log Stores for durability. Once acknowledged, SAL forwards the same logs to the relevant Page Stores. This ensures that Page Stores only see durable logs and lets SAL track exactly what each replica has received. SAL monitors per-slice persistent LSNs for Page Stores, and resends missing logs from the Log Stores if it detects regressions.

I think, this choice adds coupling and complexity. A chain-replication design, where LogStores streamed logs directly to PageStores, would simplify the system. This way, SAL wouldn't need to track every PageStore’s persistent LSN. And Log truncation could be driven by LogStores once all replicas confirmed receipt, instead of being tracked by SAL again. 


Read path

Database front ends read data at page granularity. A dirty page in the buffer pool cannot be evicted until its logs have been written to at least one Page Store replica. This ensures that the latest version is always recoverable.

As mentioned above, SAL maintains the last LSN sent per slice. Reads are routed to the lowest-latency Page Store replica. If one is unavailable or behind, SAL retries with others.

Read replicas don't stream WAL directly from the master. Instead, the master publishes which PLog holds new updates. Replicas fetch logs from the Log Stores, apply them locally, and track their visible LSN. They don't advance past the Page Stores' persisted LSNs, keeping reads consistent. This design keeps replica lag below 20 ms even under high load and prevents the master from becoming a bandwidth bottleneck.


Recovery model

If a Log Store fails temporarily, writes to its PLogs pause. For long failures, the cluster re-replicates its data to healthy nodes.

Page Store recovery is more involved. After short outages, a Page Store gossips with peers to catch up. For longer failures, the system creates a new replica by copying another's data. If recent logs were lost before replication, SAL detects gaps in persistent LSNs and replays the missing records from Log Stores. Gossip runs periodically but can be triggered early when lag is detected.

If the primary fails, SAL ensures all Page Stores have every log record persisted in Log Stores. This is the redo phase (similar to ARIES). Then the database front end performs undo for in-flight transactions.


Nitpicks

I can't refrain from bringing up a couple of issues.

First, RDMA appears in Figure 2 as part of the storage network but then disappears entirely until a brief mention in the final "future work" paragraph.

Second, the evaluation section feels underdeveloped. It lacks the depth expected from a system of this ambition. I skipped detailed discussion of this section in my review, as it adds little insight beyond what is discussed in the protocols.