Batch Handling & Data Mappers
Description
Section titled “Description”This example demonstrates mapper transforms with onBatch and on handler patterns.
- Define a mapper to transform SubscribedTransaction[] to custom types
- Compare onBatch (fires once per poll) vs on (fires per transaction)
- Verify type safety with mapped data
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscribernpx tsx 10-batch-and-mappers.ts/** * Example: Batch Handling & Data Mappers * * This example demonstrates mapper transforms with onBatch and on handler patterns. * - Define a mapper to transform SubscribedTransaction[] to custom types * - Compare onBatch (fires once per poll) vs on (fires per transaction) * - Verify type safety with mapped data * * Prerequisites: * - LocalNet running (via `algokit localnet start`) */import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'import type { SubscribedTransaction } from '@algorandfoundation/algokit-subscriber/types/subscription'import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress, formatAlgo } from './shared/utils.js'
/** Custom mapped type for payment summary */interface PaymentSummary { id: string sender: string receiver: string amountInAlgos: number note: string}
async function main() { printHeader('10 — Batch Handling & Data Mappers')
// Step 1: Connect to LocalNet printStep(1, 'Connect to LocalNet') const algorand = AlgorandClient.defaultLocalNet() const status = await algorand.client.algod.status() printInfo(`Current round: ${status.lastRound.toString()}`) printSuccess('Connected to LocalNet')
// Step 2: Create and fund accounts printStep(2, 'Create and fund accounts') const sender = await algorand.account.fromEnvironment('BATCH_SENDER', algo(100)) const receiver = await algorand.account.fromEnvironment('BATCH_RECEIVER', algo(10)) const senderAddr = sender.addr.toString() const receiverAddr = receiver.addr.toString() printInfo(`Sender: ${shortenAddress(senderAddr)}`) printInfo(`Receiver: ${shortenAddress(receiverAddr)}`) printSuccess('Accounts created and funded')
// Step 3: Send 5 payment transactions with varying amounts and notes printStep(3, 'Send 5 payment transactions') const payments = [ { amount: algo(1), note: 'payment-1' }, { amount: algo(2), note: 'payment-2' }, { amount: algo(3), note: 'payment-3' }, { amount: algo(5), note: 'payment-4' }, { amount: algo(8), note: 'payment-5' }, ]
let firstRound: bigint | undefined for (const p of payments) { const result = await algorand.send.payment({ sender: sender.addr, receiver: receiver.addr, amount: p.amount, note: p.note, }) const round = result.confirmation.confirmedRound! if (!firstRound) firstRound = round printInfo(`Sent ${p.note}: ${formatAlgo(p.amount.microAlgo)} in round ${round}`) } printSuccess(`Sent ${payments.length} payments`)
// Step 4: Set up subscriber with a mapper that transforms SubscribedTransaction[] -> PaymentSummary[] printStep(4, 'Configure subscriber with mapper') const watermarkBefore = firstRound! - 1n let watermark = watermarkBefore
const mapper = async (txns: SubscribedTransaction[]): Promise<PaymentSummary[]> => { return txns.map((txn) => ({ id: txn.id, sender: txn.paymentTransaction?.receiver ? (txn.sender ?? senderAddr) : senderAddr, receiver: txn.paymentTransaction?.receiver ?? receiverAddr, amountInAlgos: Number(txn.paymentTransaction?.amount ?? 0n) / 1_000_000, note: txn.note ? Buffer.from(txn.note).toString('utf-8') : '', })) }
printInfo(`Mapper: SubscribedTransaction[] -> PaymentSummary[] { id, sender, receiver, amountInAlgos, note }`) printSuccess('Mapper defined')
// Step 5: Create subscriber with mapper on the filter printStep(5, 'Create subscriber with onBatch and on handlers')
const batchResults: PaymentSummary[][] = [] const individualResults: PaymentSummary[] = []
const subscriber = new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr, receiver: receiverAddr, }, mapper, }, ], syncBehaviour: 'sync-oldest', maxRoundsToSync: 100, watermarkPersistence: { get: async () => watermark, set: async (w: bigint) => { watermark = w }, }, }, algorand.client.algod, )
// Register onBatch handler — receives the full array of mapped items per poll subscriber.onBatch<PaymentSummary>('payments', (batch) => { batchResults.push(batch) console.log(`\n [onBatch] Received batch of ${batch.length} items`) })
// Register on handler — receives individual mapped items one at a time subscriber.on<PaymentSummary>('payments', (item) => { individualResults.push(item) console.log(` [on] Received item: ${item.note} — ${item.amountInAlgos} ALGO`) })
printInfo(`onBatch<PaymentSummary>: registered — fires once per poll with full array`) printInfo(`on<PaymentSummary>: registered — fires once per transaction with individual item`)
// Step 6: Poll once to trigger handlers printStep(6, 'Poll once — observe onBatch vs on firing') const result = await subscriber.pollOnce() printInfo(`Raw matched count: ${result.subscribedTransactions.length.toString()}`)
// Step 7: Verify onBatch behavior printStep(7, 'Verify onBatch behavior') printInfo(`onBatch fired: ${batchResults.length} time(s)`) printInfo(`Batch size: ${batchResults[0]?.length.toString() ?? '0'}`)
if (batchResults.length !== 1) { throw new Error(`Expected onBatch to fire exactly 1 time, got ${batchResults.length}`) } if (batchResults[0].length !== 5) { throw new Error(`Expected batch size of 5, got ${batchResults[0].length}`) } printSuccess('onBatch fired once with all 5 items')
// Step 8: Verify on behavior printStep(8, 'Verify on behavior') printInfo(`on fired: ${individualResults.length} time(s)`)
if (individualResults.length !== 5) { throw new Error(`Expected on to fire 5 times, got ${individualResults.length}`) } printSuccess('on fired once per transaction (5 times)')
// Step 9: Show type safety — mapped items are PaymentSummary, not SubscribedTransaction printStep(9, 'Demonstrate type safety and mapped data') console.log() console.log(' Batch items (from onBatch):') for (const item of batchResults[0]) { printInfo(` ${item.note}: ${item.amountInAlgos} ALGO | ${shortenAddress(item.sender)} -> ${shortenAddress(item.receiver)}`) }
console.log() console.log(' Individual items (from on):') for (const item of individualResults) { printInfo(` ${item.note}: ${item.amountInAlgos} ALGO | id: ${item.id.slice(0, 12)}...`) }
// Step 10: Show the difference between onBatch and on printStep(10, 'Summary: onBatch vs on') console.log() console.log(' ┌─────────────────────────────────────────────────────────┐') console.log(' │ onBatch<T>(filterName, listener) │') console.log(' │ - Fires: once per poll │') console.log(` │ - Receives: T[] (array of ${batchResults[0].length} PaymentSummary items) │`) console.log(' │ - Use for: bulk inserts, batch processing │') console.log(' ├─────────────────────────────────────────────────────────┤') console.log(' │ on<T>(filterName, listener) │') console.log(` │ - Fires: once per transaction (${individualResults.length} times) │`) console.log(' │ - Receives: T (single PaymentSummary item) │') console.log(' │ - Use for: per-item processing, logging │') console.log(' ├─────────────────────────────────────────────────────────┤') console.log(' │ mapper on filter config │') console.log(' │ - Transforms: SubscribedTransaction[] -> T[] │') console.log(' │ - Applied BEFORE both on and onBatch handlers │') console.log(' │ - Type parameter <T> ensures type safety │') console.log(' └─────────────────────────────────────────────────────────┘') console.log()
printHeader('Example complete')}
main().catch((err) => { printError(err.message) process.exit(1)})Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer Subscription
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling