Lifecycle Hooks & Error Handling
Description
Section titled “Description”This example demonstrates lifecycle hooks and retry patterns.
- Hook execution order: onBeforePoll -> processing -> onPoll -> inspect
- start(inspect) callback in continuous polling
- Error recovery with onError and retry logic
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscribernpx tsx 15-lifecycle-hooks.ts/** * Example: Lifecycle Hooks & Error Handling * * This example demonstrates lifecycle hooks and retry patterns. * - Hook execution order: onBeforePoll -> processing -> onPoll -> inspect * - start(inspect) callback in continuous polling * - Error recovery with onError and retry logic * * Prerequisites: * - LocalNet running (via `algokit localnet start`) */import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress } from './shared/utils.js'
async function main() { printHeader('15 — Lifecycle Hooks & Error Handling')
// Step 1: Connect to LocalNet printStep(1, 'Connect to LocalNet') const algorand = AlgorandClient.defaultLocalNet() const status = await algorand.client.algod.status() printInfo(`Current round: ${status.lastRound.toString()}`) printSuccess('Connected to LocalNet')
// Step 2: Create and fund sender account printStep(2, 'Create and fund sender account') const sender = await algorand.account.fromEnvironment('LIFECYCLE_SENDER', algo(10)) const senderAddr = sender.addr.toString() printInfo(`Sender: ${shortenAddress(senderAddr)}`)
// ───────────────────────────────────────────────────────────── // Part A: Demonstrate hook execution order with pollOnce() // ───────────────────────────────────────────────────────────── printStep(3, 'Part A — Hook execution order (pollOnce)')
// Send a transaction so we have something to match const txn1 = await algorand.send.payment({ sender: sender.addr, receiver: sender.addr, amount: algo(1), note: 'lifecycle txn 1', }) const firstRound = txn1.confirmation!.confirmedRound! printInfo(`Sent txn: ${txn1.txIds.at(-1)}`)
let watermarkA = firstRound - 1n const timeline: string[] = []
const subscriberA = new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr }, }, ], frequencyInSeconds: 1, waitForBlockWhenAtTip: false, syncBehaviour: 'sync-oldest', watermarkPersistence: { get: async () => watermarkA, set: async (w: bigint) => { watermarkA = w }, }, }, algorand.client.algod, )
// Register lifecycle hooks subscriberA.onBeforePoll((metadata) => { timeline.push(`onBeforePoll(watermark=${metadata.watermark}, currentRound=${metadata.currentRound})`) })
subscriberA.on('payments', (txn) => { timeline.push(`on("payments") — txn ${txn.id}`) })
subscriberA.onPoll((result) => { timeline.push( `onPoll(txns=${result.subscribedTransactions.length}, rounds=[${result.syncedRoundRange[0]}, ${result.syncedRoundRange[1]}])`, ) })
printInfo(`Hooks registered: onBeforePoll, on("payments"), onPoll`)
// Execute a single poll const pollResult = await subscriberA.pollOnce() printInfo(`Poll matched: ${pollResult.subscribedTransactions.length} transaction(s)`)
// Print the timeline printSuccess('Hook execution order:') for (let i = 0; i < timeline.length; i++) { printInfo(` ${i + 1}: ${timeline[i]}`) }
// Verify order: onBeforePoll -> on("payments") -> onPoll if (timeline.length < 3) { throw new Error(`Expected at least 3 timeline entries, got ${timeline.length}`) } if (!timeline[0].startsWith('onBeforePoll')) { throw new Error(`Expected first hook to be onBeforePoll, got: ${timeline[0]}`) } if (!timeline[1].startsWith('on("payments")')) { throw new Error(`Expected second hook to be on("payments"), got: ${timeline[1]}`) } if (!timeline[timeline.length - 1].startsWith('onPoll')) { throw new Error(`Expected last hook to be onPoll, got: ${timeline[timeline.length - 1]}`) } printSuccess('Order verified: onBeforePoll -> [transaction processing] -> onPoll')
// ───────────────────────────────────────────────────────────── // Part B: start(inspect) callback in the continuous loop // ───────────────────────────────────────────────────────────── printStep(4, 'Part B — start(inspect) callback')
// Send 2 more transactions for (let i = 2; i <= 3; i++) { await algorand.send.payment({ sender: sender.addr, receiver: sender.addr, amount: algo(1), note: `lifecycle txn ${i}`, }) } printInfo(`Sent: 2 more transactions`)
let watermarkB = watermarkA const timelineB: string[] = []
const subscriberB = new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr }, }, ], frequencyInSeconds: 1, waitForBlockWhenAtTip: true, syncBehaviour: 'sync-oldest', watermarkPersistence: { get: async () => watermarkB, set: async (w: bigint) => { watermarkB = w }, }, }, algorand.client.algod, )
subscriberB.onBeforePoll((metadata) => { timelineB.push(`onBeforePoll(watermark=${metadata.watermark}, currentRound=${metadata.currentRound})`) })
subscriberB.on('payments', (txn) => { timelineB.push(`on("payments") — txn ${txn.id}`) })
subscriberB.onPoll((result) => { timelineB.push( `onPoll(txns=${result.subscribedTransactions.length}, rounds=[${result.syncedRoundRange[0]}, ${result.syncedRoundRange[1]}])`, ) })
// start() with inspect callback — fires AFTER onPoll, at the end of each loop iteration subscriberB.start((result) => { timelineB.push( `inspect(txns=${result.subscribedTransactions.length}, newWatermark=${result.newWatermark})`, ) }, true) // suppressLog = true to reduce noise
// Wait for the subscriber to catch up await new Promise<void>((resolve) => { setTimeout(async () => { await subscriberB.stop('part-b-done') resolve() }, 3000) })
printSuccess('Timeline with start(inspect):') for (let i = 0; i < timelineB.length; i++) { printInfo(` ${i + 1}: ${timelineB[i]}`) }
// Verify inspect appears after onPoll const pollIdx = timelineB.findIndex((e) => e.startsWith('onPoll')) const inspectIdx = timelineB.findIndex((e) => e.startsWith('inspect')) if (pollIdx === -1 || inspectIdx === -1) { throw new Error('Expected both onPoll and inspect entries in timeline') } if (inspectIdx <= pollIdx) { throw new Error(`Expected inspect (idx=${inspectIdx}) to come after onPoll (idx=${pollIdx})`) } printSuccess('Order verified: onBeforePoll -> [transaction processing] -> onPoll -> inspect')
// ───────────────────────────────────────────────────────────── // Part C: Error recovery with onError + retry logic // ───────────────────────────────────────────────────────────── printStep(5, 'Part C — Error recovery with onError')
const MAX_RETRIES = 3 let retryCount = 0 let errorsCaught = 0 let successfulPolls = 0 const errorTimeline: string[] = []
// We'll simulate errors by throwing in the on() handler for the first 2 polls let pollNumber = 0
let watermarkC = watermarkB const subscriberC = new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr }, }, ], frequencyInSeconds: 1, waitForBlockWhenAtTip: true, syncBehaviour: 'sync-oldest-start-now', watermarkPersistence: { get: async () => watermarkC, set: async (w: bigint) => { watermarkC = w }, }, }, algorand.client.algod, )
subscriberC.onBeforePoll(() => { pollNumber++ errorTimeline.push(`onBeforePoll (poll #${pollNumber})`) })
subscriberC.onPoll(() => { // Simulate an error on the first poll if (pollNumber === 1) { errorTimeline.push(`onPoll — throwing simulated error!`) throw new Error('Simulated processing error') } successfulPolls++ errorTimeline.push(`onPoll — success (poll #${pollNumber})`) })
subscriberC.onError(async (error) => { errorsCaught++ retryCount++ const message = error instanceof Error ? error.message : String(error) errorTimeline.push(`onError — caught: "${message}" (retry ${retryCount}/${MAX_RETRIES})`)
if (retryCount > MAX_RETRIES) { errorTimeline.push(`onError — max retries exceeded, stopping`) return }
errorTimeline.push(`onError — restarting subscriber`) // Brief pause before retry await new Promise((r) => setTimeout(r, 500)) subscriberC.start(undefined, true) })
// Send a transaction so there's something to process await algorand.send.payment({ sender: sender.addr, receiver: sender.addr, amount: algo(1), note: 'lifecycle error test', })
printInfo(`Starting subscriber: will throw on first poll, then recover`) subscriberC.start(undefined, true)
// Wait for error + recovery + successful poll await new Promise<void>((resolve) => { setTimeout(async () => { await subscriberC.stop('part-c-done') resolve() }, 5000) })
printSuccess('Error recovery timeline:') for (let i = 0; i < errorTimeline.length; i++) { printInfo(` ${i + 1}: ${errorTimeline[i]}`) }
printInfo(`Errors caught: ${errorsCaught.toString()}`) printInfo(`Retries used: ${retryCount.toString()}`) printInfo(`Successful polls after recovery: ${successfulPolls.toString()}`)
if (errorsCaught < 1) { throw new Error(`Expected at least 1 error caught, got ${errorsCaught}`) } if (successfulPolls < 1) { throw new Error(`Expected at least 1 successful poll after recovery, got ${successfulPolls}`) } printSuccess('Error recovery verified: error -> onError -> retry -> success')
// ───────────────────────────────────────────────────────────── // Summary // ───────────────────────────────────────────────────────────── printStep(6, 'Summary')
printSuccess('Lifecycle hook execution order:') printInfo(` 1: onBeforePoll(metadata) — before each poll, receives { watermark, currentRound }`) printInfo(` 2: [transaction processing] — filter matching, mapper, on(), onBatch()`) printInfo(` 3: onPoll(result) — after processing, receives TransactionSubscriptionResult`) printInfo(` 4: inspect(result) — in start() loop only, after onPoll, same result object`)
printSuccess('Error handling:') printInfo(` -: onError(error) replaces default throw-on-error behavior`) printInfo(` -: Can implement retry by calling subscriber.start() from within onError`) printInfo(` -: Demonstrated retry up to ${MAX_RETRIES} times before giving up`)
printHeader('Example complete') process.exit(0)}
main().catch((err) => { printError(err.message) process.exit(1)})Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer Subscription
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling