Skip to content

Lifecycle Hooks & Error Handling

← Back to Examples

This example demonstrates lifecycle hooks and retry patterns.

  • Hook execution order: onBeforePoll -> processing -> onPoll -> inspect
  • start(inspect) callback in continuous polling
  • Error recovery with onError and retry logic
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
npx tsx 15-lifecycle-hooks.ts

View source on GitHub

15-lifecycle-hooks.ts
/**
* Example: Lifecycle Hooks & Error Handling
*
* This example demonstrates lifecycle hooks and retry patterns.
* - Hook execution order: onBeforePoll -> processing -> onPoll -> inspect
* - start(inspect) callback in continuous polling
* - Error recovery with onError and retry logic
*
* Prerequisites:
* - LocalNet running (via `algokit localnet start`)
*/
import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'
import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'
import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress } from './shared/utils.js'
async function main() {
printHeader('15 — Lifecycle Hooks & Error Handling')
// Step 1: Connect to LocalNet
printStep(1, 'Connect to LocalNet')
const algorand = AlgorandClient.defaultLocalNet()
const status = await algorand.client.algod.status()
printInfo(`Current round: ${status.lastRound.toString()}`)
printSuccess('Connected to LocalNet')
// Step 2: Create and fund sender account
printStep(2, 'Create and fund sender account')
const sender = await algorand.account.fromEnvironment('LIFECYCLE_SENDER', algo(10))
const senderAddr = sender.addr.toString()
printInfo(`Sender: ${shortenAddress(senderAddr)}`)
// ─────────────────────────────────────────────────────────────
// Part A: Demonstrate hook execution order with pollOnce()
// ─────────────────────────────────────────────────────────────
printStep(3, 'Part A — Hook execution order (pollOnce)')
// Send a transaction so we have something to match
const txn1 = await algorand.send.payment({
sender: sender.addr,
receiver: sender.addr,
amount: algo(1),
note: 'lifecycle txn 1',
})
const firstRound = txn1.confirmation!.confirmedRound!
printInfo(`Sent txn: ${txn1.txIds.at(-1)}`)
let watermarkA = firstRound - 1n
const timeline: string[] = []
const subscriberA = new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: { sender: senderAddr },
},
],
frequencyInSeconds: 1,
waitForBlockWhenAtTip: false,
syncBehaviour: 'sync-oldest',
watermarkPersistence: {
get: async () => watermarkA,
set: async (w: bigint) => {
watermarkA = w
},
},
},
algorand.client.algod,
)
// Register lifecycle hooks
subscriberA.onBeforePoll((metadata) => {
timeline.push(`onBeforePoll(watermark=${metadata.watermark}, currentRound=${metadata.currentRound})`)
})
subscriberA.on('payments', (txn) => {
timeline.push(`on("payments") — txn ${txn.id}`)
})
subscriberA.onPoll((result) => {
timeline.push(
`onPoll(txns=${result.subscribedTransactions.length}, rounds=[${result.syncedRoundRange[0]}, ${result.syncedRoundRange[1]}])`,
)
})
printInfo(`Hooks registered: onBeforePoll, on("payments"), onPoll`)
// Execute a single poll
const pollResult = await subscriberA.pollOnce()
printInfo(`Poll matched: ${pollResult.subscribedTransactions.length} transaction(s)`)
// Print the timeline
printSuccess('Hook execution order:')
for (let i = 0; i < timeline.length; i++) {
printInfo(` ${i + 1}: ${timeline[i]}`)
}
// Verify order: onBeforePoll -> on("payments") -> onPoll
if (timeline.length < 3) {
throw new Error(`Expected at least 3 timeline entries, got ${timeline.length}`)
}
if (!timeline[0].startsWith('onBeforePoll')) {
throw new Error(`Expected first hook to be onBeforePoll, got: ${timeline[0]}`)
}
if (!timeline[1].startsWith('on("payments")')) {
throw new Error(`Expected second hook to be on("payments"), got: ${timeline[1]}`)
}
if (!timeline[timeline.length - 1].startsWith('onPoll')) {
throw new Error(`Expected last hook to be onPoll, got: ${timeline[timeline.length - 1]}`)
}
printSuccess('Order verified: onBeforePoll -> [transaction processing] -> onPoll')
// ─────────────────────────────────────────────────────────────
// Part B: start(inspect) callback in the continuous loop
// ─────────────────────────────────────────────────────────────
printStep(4, 'Part B — start(inspect) callback')
// Send 2 more transactions
for (let i = 2; i <= 3; i++) {
await algorand.send.payment({
sender: sender.addr,
receiver: sender.addr,
amount: algo(1),
note: `lifecycle txn ${i}`,
})
}
printInfo(`Sent: 2 more transactions`)
let watermarkB = watermarkA
const timelineB: string[] = []
const subscriberB = new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: { sender: senderAddr },
},
],
frequencyInSeconds: 1,
waitForBlockWhenAtTip: true,
syncBehaviour: 'sync-oldest',
watermarkPersistence: {
get: async () => watermarkB,
set: async (w: bigint) => {
watermarkB = w
},
},
},
algorand.client.algod,
)
subscriberB.onBeforePoll((metadata) => {
timelineB.push(`onBeforePoll(watermark=${metadata.watermark}, currentRound=${metadata.currentRound})`)
})
subscriberB.on('payments', (txn) => {
timelineB.push(`on("payments") — txn ${txn.id}`)
})
subscriberB.onPoll((result) => {
timelineB.push(
`onPoll(txns=${result.subscribedTransactions.length}, rounds=[${result.syncedRoundRange[0]}, ${result.syncedRoundRange[1]}])`,
)
})
// start() with inspect callback — fires AFTER onPoll, at the end of each loop iteration
subscriberB.start((result) => {
timelineB.push(
`inspect(txns=${result.subscribedTransactions.length}, newWatermark=${result.newWatermark})`,
)
}, true) // suppressLog = true to reduce noise
// Wait for the subscriber to catch up
await new Promise<void>((resolve) => {
setTimeout(async () => {
await subscriberB.stop('part-b-done')
resolve()
}, 3000)
})
printSuccess('Timeline with start(inspect):')
for (let i = 0; i < timelineB.length; i++) {
printInfo(` ${i + 1}: ${timelineB[i]}`)
}
// Verify inspect appears after onPoll
const pollIdx = timelineB.findIndex((e) => e.startsWith('onPoll'))
const inspectIdx = timelineB.findIndex((e) => e.startsWith('inspect'))
if (pollIdx === -1 || inspectIdx === -1) {
throw new Error('Expected both onPoll and inspect entries in timeline')
}
if (inspectIdx <= pollIdx) {
throw new Error(`Expected inspect (idx=${inspectIdx}) to come after onPoll (idx=${pollIdx})`)
}
printSuccess('Order verified: onBeforePoll -> [transaction processing] -> onPoll -> inspect')
// ─────────────────────────────────────────────────────────────
// Part C: Error recovery with onError + retry logic
// ─────────────────────────────────────────────────────────────
printStep(5, 'Part C — Error recovery with onError')
const MAX_RETRIES = 3
let retryCount = 0
let errorsCaught = 0
let successfulPolls = 0
const errorTimeline: string[] = []
// We'll simulate errors by throwing in the on() handler for the first 2 polls
let pollNumber = 0
let watermarkC = watermarkB
const subscriberC = new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: { sender: senderAddr },
},
],
frequencyInSeconds: 1,
waitForBlockWhenAtTip: true,
syncBehaviour: 'sync-oldest-start-now',
watermarkPersistence: {
get: async () => watermarkC,
set: async (w: bigint) => {
watermarkC = w
},
},
},
algorand.client.algod,
)
subscriberC.onBeforePoll(() => {
pollNumber++
errorTimeline.push(`onBeforePoll (poll #${pollNumber})`)
})
subscriberC.onPoll(() => {
// Simulate an error on the first poll
if (pollNumber === 1) {
errorTimeline.push(`onPoll — throwing simulated error!`)
throw new Error('Simulated processing error')
}
successfulPolls++
errorTimeline.push(`onPoll — success (poll #${pollNumber})`)
})
subscriberC.onError(async (error) => {
errorsCaught++
retryCount++
const message = error instanceof Error ? error.message : String(error)
errorTimeline.push(`onError — caught: "${message}" (retry ${retryCount}/${MAX_RETRIES})`)
if (retryCount > MAX_RETRIES) {
errorTimeline.push(`onError — max retries exceeded, stopping`)
return
}
errorTimeline.push(`onError — restarting subscriber`)
// Brief pause before retry
await new Promise((r) => setTimeout(r, 500))
subscriberC.start(undefined, true)
})
// Send a transaction so there's something to process
await algorand.send.payment({
sender: sender.addr,
receiver: sender.addr,
amount: algo(1),
note: 'lifecycle error test',
})
printInfo(`Starting subscriber: will throw on first poll, then recover`)
subscriberC.start(undefined, true)
// Wait for error + recovery + successful poll
await new Promise<void>((resolve) => {
setTimeout(async () => {
await subscriberC.stop('part-c-done')
resolve()
}, 5000)
})
printSuccess('Error recovery timeline:')
for (let i = 0; i < errorTimeline.length; i++) {
printInfo(` ${i + 1}: ${errorTimeline[i]}`)
}
printInfo(`Errors caught: ${errorsCaught.toString()}`)
printInfo(`Retries used: ${retryCount.toString()}`)
printInfo(`Successful polls after recovery: ${successfulPolls.toString()}`)
if (errorsCaught < 1) {
throw new Error(`Expected at least 1 error caught, got ${errorsCaught}`)
}
if (successfulPolls < 1) {
throw new Error(`Expected at least 1 successful poll after recovery, got ${successfulPolls}`)
}
printSuccess('Error recovery verified: error -> onError -> retry -> success')
// ─────────────────────────────────────────────────────────────
// Summary
// ─────────────────────────────────────────────────────────────
printStep(6, 'Summary')
printSuccess('Lifecycle hook execution order:')
printInfo(` 1: onBeforePoll(metadata) — before each poll, receives { watermark, currentRound }`)
printInfo(` 2: [transaction processing] — filter matching, mapper, on(), onBatch()`)
printInfo(` 3: onPoll(result) — after processing, receives TransactionSubscriptionResult`)
printInfo(` 4: inspect(result) — in start() loop only, after onPoll, same result object`)
printSuccess('Error handling:')
printInfo(` -: onError(error) replaces default throw-on-error behavior`)
printInfo(` -: Can implement retry by calling subscriber.start() from within onError`)
printInfo(` -: Demonstrated retry up to ${MAX_RETRIES} times before giving up`)
printHeader('Example complete')
process.exit(0)
}
main().catch((err) => {
printError(err.message)
process.exit(1)
})