An alternative to cursor pagination Candra Ramsi · Follow 6 min read · 3 days ago 3 days ago -- Listen Share

I’m writing this to get an engineering regret out of my chest based on my personal thoughts.

I need to first give context. I have been part of a multi-year project that splits an existing SaaS product from one AWS region into separate instances in several AWS regions, to improve availability and reliability through improved isolation. This has been greatly celebrated internally. The migration’s main motivation is consolidation of infrastructure improvements. Part of migrating infrastructure is moving existing data and I’ve been primarily working on moving elasticsearch data in the past 6 months.

Elasticsearch vs SQL database

Elasticsearch has differences to an SQL database. Here are some worth mentioning:

Elasticsearch could only output up to 10,000 docs per API request

— An SQL database could technically select un-”LIMIT”-ed data FROM a table Elasticsearch is a distributed data store and ingest documents in batches

— This trade-off produce an eventually consistent behavior

— e.g. 3 back to back requests to _count?account_id=12345 may output 900, 901, 897

— This happens because these requests hit different elasticsearch machine that may or may not have the latest data Elasticsearch under CPU or memory pressure may give an inaccurate output instead of just timing out

— This trade-off produce a rare inconsistent output

— e.g. 3 back to back requests to _count?account_id=12345 may output 1,000, 900, 1,000

— This is suspected to happen because some machine may timeout but some doesn’t producing an inconsistent aggregated output Specific to the datastore I’m working on. The data is insert-only without updates and deletes.

— This will be important later on

Cursor pagination

Based on the limitation of 10,000 docs per API request, it’s required that some version of pagination is needed. Most of our customers’ data consists of more than 10,000 ES documents.

One obvious pagination scheme is cursor pagination

cursor = 0

while True:

docs = fetchSource(timestamp >= cursor sorted by timestamp asc)

bulkIndexTarget(docs)

cursor = max(doc[‘timestamp’] for doc in docs)

There is one big benefit to cursor pagination. Everyone could understand cursor pagination in a few minutes to an hour. I definitely under-estimated how an easy to understand algorithm would benefit everyone. If ops could understand it, they are more able to optimize the machine to the usage pattern. If engineers could understand it, you can get a buy-in quicker.

However, there are a few downsides of cursor pagination that makes it insufficient for this project.

Cursor pagination may miss copying documents due to eventual consistent and rarely inconsistent behavior of elasticsearch (See above) Cursor pagination may miss copying documents because our writer micro-service doesn’t write documents in order of timestamp Data transfer of 100s of millions of documents may take a few days

— This means resumability of the cursor pagination is essential during disruptions such as OOM, deployments, etc.

— This means cursor have to be stored durably

— This means one more point of failure and one more storage to take care of

Merkle Tree + BFS pagination

What if there is a way to have a Paginator that could handle out-of-order insertions?

4 months into the project, an idea emerges from reading Merkle tree wikipedia page (Seeing the illustration in the wikipedia page helps me grasp the data structure concept).

The idea is “If we could quickly check whether a huge range of data between source and target elasticsearch is the same, we could quickly find the needle in a haystack through some process of elimination.”

We could start from a time range of timestamp between earliest documents up to now. Let’s say the earliest document is 2010. If the count of ES documents between 2010 to now are not the same between source and target, we could divide the time range into 2 and repeat the process until

the count are equal between source and destination, in this case there is no need to copy and check further

It has reached some manageable chunk size and we could just copy over the manageable chunk.

Footnotes:

Now because we don’t have documents with timestamps in the future.

We could use the count of documents as a cryptographic hash equivalent because our data is insert-only.

Interestingly, the Merkle tree is a data structure that is used in bittorrent.

It’s not necessary but funny to mention we don’t move our customer data using bittorrent protocol. ????

Here is a snippet of the BFS + Merkle tree like implementation

def merkle_copy():

q = [(0, now())]

head = 0

while head < q.size():

lo, hi = q[head]

head += 1

# checksum are equal between source and target

if countSource(lo, hi) == countTarget(lo, hi):

continue

# We've reached a manageable chunk

if countSource(lo, hi) <= 10_000:

docs = fetchSource(lo, hi, limit=10_000)

bulkInsertTarget(docs)

continue

# There are more than 10,000 docs with the same timestamp in ms

if hi-lo <= 1:

raise RuntimeError("panic!!")

mid = lo + (hi-lo) // 2

q.append([lo, mid])

q.append([mid, hi])

def copy():

while True:

# redo operation in case there are new documents inserted in source

merkle_copy()

time.sleep(10)

# count number of documents in source elasticsearch with timestamp lo <= ts && ts < hi

def countSource(lo, hi):

pass

# count number of documents in target elasticsearch with timestamp lo <= ts && ts < hi

def countTarget(lo, hi):

pass

# fetch documents in source with timestamp lo <= ts && ts < hi

def fetchSource(lo, hi, limit):

pass

# insert documents using bulk API in 1 remote call

def bulkIndexTarget(docs):

pass

This algorithm works quite well in finding an out-of-order inserted needle in a haystack. Although, one problem is BFS works poorly in scenarios where there is no data in target elasticsearch. No data in target is unfortunately a common scenario that happens in the beginning of every customer data migration. The mover would need to do a lot of checking before moving any piece of data due to the breadth first nature of the algorithm. An estimated n docs / 10,000 * 2 _count API calls.

Due to this limitation, we came up with a 2 tier solution. First, do cursor pagination to move the majority of the data. Then, transition to BFS Merkle Paginator to pick up the needles in the haystack. This has been proven to work reliably in production.

The regret

The keen eye among you may have noticed. An alternative to BFS is DFS.

def merkle_dfs_copy():

stack = [(0,0)] * 101 # pre-allocate 101 elements to allow up to roughly 2¹⁰⁰ milliseconds

stack[0] = (0, now())

top = 0

while top >= 0:

lo, hi = stack[top]

top -= 1

if countSource(lo, hi) == countTarget(lo, hi):

continue

if countSource(lo, hi) <= 10_000:

docs = fetchSource(lo, hi, limit=10_000)

bulkIndexTarget(docs)

continue

if hi — lo <= 1:

raise RuntimeError(“panic!!”)

top += 1

stack[top] = (mid, hi)

top += 1

stack[top] = (lo, mid)

DFS

Could resume data migration quickly without a durable cursor

— Will take at worst log2(milliseconds between 2010 to now) = 29 iterations to continue copying the next chunk of data.

— Will take at worst log2(milliseconds between 2010 to now) = 29 iterations to continue copying the next chunk of data. Could start a data migration quickly when there is no data in target elasticsearch.

— Will take at worst log2(milliseconds between 2010 to now) to start copying the first chunk of data.

— Will take at worst log2(milliseconds between 2010 to now) to start copying the first chunk of data. Could also handle out-of-order insertion

DFS didn’t cross my mind 2 months ago. I couldn’t have implemented the DFS version today because the project is coming to an end. It’s grossly irresponsible to redo a huge chunk of code when the 2-tiered solution has been proven to work quite well. But it would have been cool to have a one size fits all general solution.

Thank you for reading this far!

Let me know if you come up with a creative way to do pagination.