Skip to main content

Background

The demos in the powersync-js monorepo provide a minimal working example that illustrate the use of PowerSync with different frameworks. The demos are therefore not necessarily optimized for performance and can therefore be improved. This tutorial demonstrates how to improve the Supabase Connector’s performance by implementing two batching strategies that reduce the number of database operations.

Batching Strategies

The two batching strategies that will be implemented are:
  1. Sequential Merge Strategy, and
  2. Pre-sorted Batch Strategy
Overview:
  • Merge adjacent PUT and DELETE operations for the same table
  • Limit the number of operations that are merged into a single API request to Supabase
Shoutout to @christoffer_configura for the original implementation of this optimization.
async uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
    const transaction = await database.getNextCrudTransaction();
    if (!transaction) {
        return;
    }
    /**
     * Maximum number of PUT or DELETE operations that are merged into a single API request to Supabase.
     * Larger numbers can speed up the sync process considerably, but watch out for possible payload size limitations.
     * A value of 1 or below disables merging.
     */
    const MERGE_BATCH_LIMIT = 100;
    let batchedOps: CrudEntry[] = [];

    try {
        console.log(`Processing transaction with ${transaction.crud.length} operations`);

        for (let i = 0; i < transaction.crud.length; i++) {
            const cruds = transaction.crud;
            const op = cruds[i];
            const table = this.client.from(op.table);
            batchedOps.push(op);

            let result: any;
            let batched = 1;

            switch (op.op) {
                case UpdateType.PUT:
                    const records = [{ ...cruds[i].opData, id: cruds[i].id }];
                    while (
                        i + 1 < cruds.length &&
                        cruds[i + 1].op === op.op &&
                        cruds[i + 1].table === op.table &&
                        batched < MERGE_BATCH_LIMIT
                    ) {
                        i++;
                        records.push({ ...cruds[i].opData, id: cruds[i].id });
                        batchedOps.push(cruds[i]);
                        batched++;
                    }
                    result = await table.upsert(records);
                    break;
                case UpdateType.PATCH:
                    batchedOps = [op];
                    result = await table.update(op.opData).eq('id', op.id);
                    break;
                case UpdateType.DELETE:
                    batchedOps = [op];
                    const ids = [op.id];
                    while (
                        i + 1 < cruds.length &&
                        cruds[i + 1].op === op.op &&
                        cruds[i + 1].table === op.table &&
                        batched < MERGE_BATCH_LIMIT
                    ) {
                        i++;
                        ids.push(cruds[i].id);
                        batchedOps.push(cruds[i]);
                        batched++;
                    }
                    result = await table.delete().in('id', ids);
                    break;
            }
            if (batched > 1) {
                console.log(`Merged ${batched} ${op.op} operations for table ${op.table}`);
            }
        }
        await transaction.complete();
    } catch (ex: any) {
        console.debug(ex);
        if (typeof ex.code == 'string' && FATAL_RESPONSE_CODES.some((regex) => regex.test(ex.code))) {
            /**
             * Instead of blocking the queue with these errors,
             * discard the (rest of the) transaction.
             *
             * Note that these errors typically indicate a bug in the application.
             * If protecting against data loss is important, save the failing records
             * elsewhere instead of discarding, and/or notify the user.
             */
            console.error('Data upload error - discarding:', ex);
            await transaction.complete();
        } else {
            // Error may be retryable - e.g. network error or temporary server error.
            // Throwing an error here causes this call to be retried after a delay.
            throw ex;
        }
    }
}
Overview:
  • Create three collections to group operations by type:
    • putOps: For PUT operations, organized by table name
    • deleteOps: For DELETE operations, organized by table name
    • patchOps: For PATCH operations (partial updates)
  • Loop through all operations, sort them into the three collections, and then process all operations in batches.
async uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
    const transaction = await database.getNextCrudTransaction();
    if (!transaction) {
        return;
    }

    try {
        // Group operations by type and table
        const putOps: { [table: string]: any[] } = {};
        const deleteOps: { [table: string]: string[] } = {};
        let patchOps: CrudEntry[] = [];

        // Organize operations
        for (const op of transaction.crud) {
            switch (op.op) {
                case UpdateType.PUT:
                    if (!putOps[op.table]) {
                        putOps[op.table] = [];
                    }
                    putOps[op.table].push({ ...op.opData, id: op.id });
                    break;
                case UpdateType.PATCH:
                    patchOps.push(op);
                    break;
                case UpdateType.DELETE:
                    if (!deleteOps[op.table]) {
                        deleteOps[op.table] = [];
                    }
                    deleteOps[op.table].push(op.id);
                    break;
            }
        }

        // Execute bulk operations
        for (const table of Object.keys(putOps)) {
            const result = await this.client.from(table).upsert(putOps[table]);
            if (result.error) {
                console.error(result.error);
                throw new Error(`Could not bulk PUT data to Supabase table ${table}: ${JSON.stringify(result)}`);
            }
        }

        for (const table of Object.keys(deleteOps)) {
            const result = await this.client.from(table).delete().in('id', deleteOps[table]);
            if (result.error) {
                console.error(result.error);
                throw new Error(`Could not bulk DELETE data from Supabase table ${table}: ${JSON.stringify(result)}`);
            }
        }

        // Execute PATCH operations individually since they can't be easily batched
        for (const op of patchOps) {
            const result = await this.client.from(op.table).update(op.opData).eq('id', op.id);
            if (result.error) {
                console.error(result.error);
                throw new Error(`Could not PATCH data in Supabase: ${JSON.stringify(result)}`);
            }
        }

        await transaction.complete();
    } catch (ex: any) {
        console.debug(ex);
        if (typeof ex.code == 'string' && FATAL_RESPONSE_CODES.some((regex) => regex.test(ex.code))) {
            /**
             * Instead of blocking the queue with these errors,
            * discard the (rest of the) transaction.
            *
            * Note that these errors typically indicate a bug in the application.
            * If protecting against data loss is important, save the failing records
            * elsewhere instead of discarding, and/or notify the user.
            */
            console.error('Data upload error - discarding transaction:', ex);
            await transaction.complete();
        } else {
            // Error may be retryable - e.g. network error or temporary server error.
            // Throwing an error here causes this call to be retried after a delay.
            throw ex;
        }
    }
}

Differences

Sequential merge strategy

const MERGE_BATCH_LIMIT = 100;
let batchedOps: CrudEntry[] = [];
  • Pre-sorts all operations by type and table
  • Processes each type in bulk after grouping

Pre-sorted batch strategy

 const putOps: { [table: string]: any[] } = {};
 const deleteOps: { [table: string]: string[] } = {};
 let patchOps: CrudEntry[] = [];
  • Processes operations sequentially
  • Merges consecutive operations of the same type up to a batch limit
  • More dynamic/streaming approach

Sequential merge strategy

  • Uses a sliding window approach with MERGE_BATCH_LIMIT
  • Merges consecutive operations up to the limit
  • More granular control over batch sizes
  • Better for mixed operation types

Pre-sorted batch strategy

  • Groups ALL operations of the same type together
  • Executes one bulk operation per type per table
  • Better for large numbers of similar operations

Key similarities and differences

Key Similarities

Handling of CRUD operations (PUT, PATCH, DELETE) to sync local changes to Supabase
Transaction management with getNextCrudTransaction()
Implement similar error handling for fatal and retryable errors
Complete the transaction after successful processing

Key Differences

Operation grouping strategy
Batching methodology

Use cases

Sequential Merge Strategy

You need more granular control over batch sizesYou want more detailed operation loggingYou need to handle mixed operation types more efficiently
Best for: Mixed operation types
Optimizes for: Memory efficiency
Trade-off: Potentially more network requests

Pre-sorted Batch Strategy

You have a large number of similar operations.You want to minimize the number of network requests.

Best for: Large volumes of similar operations
Optimizes for: Minimal network requests
Trade-off: Higher memory usage
I