Skip to content

Batch Handling & Data Mappers

← Back to Examples

This example demonstrates mapper transforms with onBatch and on handler patterns.

  • Define a mapper to transform SubscribedTransaction[] to custom types
  • Compare onBatch (fires once per poll) vs on (fires per transaction)
  • Verify type safety with mapped data
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
npx tsx 10-batch-and-mappers.ts

View source on GitHub

10-batch-and-mappers.ts
/**
* Example: Batch Handling & Data Mappers
*
* This example demonstrates mapper transforms with onBatch and on handler patterns.
* - Define a mapper to transform SubscribedTransaction[] to custom types
* - Compare onBatch (fires once per poll) vs on (fires per transaction)
* - Verify type safety with mapped data
*
* Prerequisites:
* - LocalNet running (via `algokit localnet start`)
*/
import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'
import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'
import type { SubscribedTransaction } from '@algorandfoundation/algokit-subscriber/types/subscription'
import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress, formatAlgo } from './shared/utils.js'
/** Custom mapped type for payment summary */
interface PaymentSummary {
id: string
sender: string
receiver: string
amountInAlgos: number
note: string
}
async function main() {
printHeader('10 — Batch Handling & Data Mappers')
// Step 1: Connect to LocalNet
printStep(1, 'Connect to LocalNet')
const algorand = AlgorandClient.defaultLocalNet()
const status = await algorand.client.algod.status()
printInfo(`Current round: ${status.lastRound.toString()}`)
printSuccess('Connected to LocalNet')
// Step 2: Create and fund accounts
printStep(2, 'Create and fund accounts')
const sender = await algorand.account.fromEnvironment('BATCH_SENDER', algo(100))
const receiver = await algorand.account.fromEnvironment('BATCH_RECEIVER', algo(10))
const senderAddr = sender.addr.toString()
const receiverAddr = receiver.addr.toString()
printInfo(`Sender: ${shortenAddress(senderAddr)}`)
printInfo(`Receiver: ${shortenAddress(receiverAddr)}`)
printSuccess('Accounts created and funded')
// Step 3: Send 5 payment transactions with varying amounts and notes
printStep(3, 'Send 5 payment transactions')
const payments = [
{ amount: algo(1), note: 'payment-1' },
{ amount: algo(2), note: 'payment-2' },
{ amount: algo(3), note: 'payment-3' },
{ amount: algo(5), note: 'payment-4' },
{ amount: algo(8), note: 'payment-5' },
]
let firstRound: bigint | undefined
for (const p of payments) {
const result = await algorand.send.payment({
sender: sender.addr,
receiver: receiver.addr,
amount: p.amount,
note: p.note,
})
const round = result.confirmation.confirmedRound!
if (!firstRound) firstRound = round
printInfo(`Sent ${p.note}: ${formatAlgo(p.amount.microAlgo)} in round ${round}`)
}
printSuccess(`Sent ${payments.length} payments`)
// Step 4: Set up subscriber with a mapper that transforms SubscribedTransaction[] -> PaymentSummary[]
printStep(4, 'Configure subscriber with mapper')
const watermarkBefore = firstRound! - 1n
let watermark = watermarkBefore
const mapper = async (txns: SubscribedTransaction[]): Promise<PaymentSummary[]> => {
return txns.map((txn) => ({
id: txn.id,
sender: txn.paymentTransaction?.receiver ? (txn.sender ?? senderAddr) : senderAddr,
receiver: txn.paymentTransaction?.receiver ?? receiverAddr,
amountInAlgos: Number(txn.paymentTransaction?.amount ?? 0n) / 1_000_000,
note: txn.note ? Buffer.from(txn.note).toString('utf-8') : '',
}))
}
printInfo(`Mapper: SubscribedTransaction[] -> PaymentSummary[] { id, sender, receiver, amountInAlgos, note }`)
printSuccess('Mapper defined')
// Step 5: Create subscriber with mapper on the filter
printStep(5, 'Create subscriber with onBatch and on handlers')
const batchResults: PaymentSummary[][] = []
const individualResults: PaymentSummary[] = []
const subscriber = new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: {
sender: senderAddr,
receiver: receiverAddr,
},
mapper,
},
],
syncBehaviour: 'sync-oldest',
maxRoundsToSync: 100,
watermarkPersistence: {
get: async () => watermark,
set: async (w: bigint) => {
watermark = w
},
},
},
algorand.client.algod,
)
// Register onBatch handler — receives the full array of mapped items per poll
subscriber.onBatch<PaymentSummary>('payments', (batch) => {
batchResults.push(batch)
console.log(`\n [onBatch] Received batch of ${batch.length} items`)
})
// Register on handler — receives individual mapped items one at a time
subscriber.on<PaymentSummary>('payments', (item) => {
individualResults.push(item)
console.log(` [on] Received item: ${item.note}${item.amountInAlgos} ALGO`)
})
printInfo(`onBatch<PaymentSummary>: registered — fires once per poll with full array`)
printInfo(`on<PaymentSummary>: registered — fires once per transaction with individual item`)
// Step 6: Poll once to trigger handlers
printStep(6, 'Poll once — observe onBatch vs on firing')
const result = await subscriber.pollOnce()
printInfo(`Raw matched count: ${result.subscribedTransactions.length.toString()}`)
// Step 7: Verify onBatch behavior
printStep(7, 'Verify onBatch behavior')
printInfo(`onBatch fired: ${batchResults.length} time(s)`)
printInfo(`Batch size: ${batchResults[0]?.length.toString() ?? '0'}`)
if (batchResults.length !== 1) {
throw new Error(`Expected onBatch to fire exactly 1 time, got ${batchResults.length}`)
}
if (batchResults[0].length !== 5) {
throw new Error(`Expected batch size of 5, got ${batchResults[0].length}`)
}
printSuccess('onBatch fired once with all 5 items')
// Step 8: Verify on behavior
printStep(8, 'Verify on behavior')
printInfo(`on fired: ${individualResults.length} time(s)`)
if (individualResults.length !== 5) {
throw new Error(`Expected on to fire 5 times, got ${individualResults.length}`)
}
printSuccess('on fired once per transaction (5 times)')
// Step 9: Show type safety — mapped items are PaymentSummary, not SubscribedTransaction
printStep(9, 'Demonstrate type safety and mapped data')
console.log()
console.log(' Batch items (from onBatch):')
for (const item of batchResults[0]) {
printInfo(` ${item.note}: ${item.amountInAlgos} ALGO | ${shortenAddress(item.sender)} -> ${shortenAddress(item.receiver)}`)
}
console.log()
console.log(' Individual items (from on):')
for (const item of individualResults) {
printInfo(` ${item.note}: ${item.amountInAlgos} ALGO | id: ${item.id.slice(0, 12)}...`)
}
// Step 10: Show the difference between onBatch and on
printStep(10, 'Summary: onBatch vs on')
console.log()
console.log(' ┌─────────────────────────────────────────────────────────┐')
console.log(' │ onBatch<T>(filterName, listener) │')
console.log(' │ - Fires: once per poll │')
console.log(` │ - Receives: T[] (array of ${batchResults[0].length} PaymentSummary items) │`)
console.log(' │ - Use for: bulk inserts, batch processing │')
console.log(' ├─────────────────────────────────────────────────────────┤')
console.log(' │ on<T>(filterName, listener) │')
console.log(` │ - Fires: once per transaction (${individualResults.length} times) │`)
console.log(' │ - Receives: T (single PaymentSummary item) │')
console.log(' │ - Use for: per-item processing, logging │')
console.log(' ├─────────────────────────────────────────────────────────┤')
console.log(' │ mapper on filter config │')
console.log(' │ - Transforms: SubscribedTransaction[] -> T[] │')
console.log(' │ - Applied BEFORE both on and onBatch handlers │')
console.log(' │ - Type parameter <T> ensures type safety │')
console.log(' └─────────────────────────────────────────────────────────┘')
console.log()
printHeader('Example complete')
}
main().catch((err) => {
printError(err.message)
process.exit(1)
})