Continuous Subscriber
Description
Section titled “Description”This example demonstrates continuous polling with start/stop and event handlers.
- Create a subscriber with frequencyInSeconds and waitForBlockWhenAtTip
- Register event handlers with subscriber.on()
- Start continuous polling with subscriber.start()
- Graceful shutdown with signal handlers
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscribernpx tsx 02-continuous-subscriber.ts/** * Example: Continuous Subscriber * * This example demonstrates continuous polling with start/stop and event handlers. * - Create a subscriber with frequencyInSeconds and waitForBlockWhenAtTip * - Register event handlers with subscriber.on() * - Start continuous polling with subscriber.start() * - Graceful shutdown with signal handlers * * Prerequisites: * - LocalNet running (via `algokit localnet start`) */import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress } from './shared/utils.js'
async function main() { printHeader('02 — Continuous Subscriber')
// Step 1: Set up AlgorandClient for LocalNet printStep(1, 'Connect to LocalNet') const algorand = AlgorandClient.defaultLocalNet() const status = await algorand.client.algod.status() printInfo(`Current round: ${status.lastRound.toString()}`) printSuccess('Connected to LocalNet')
// Step 2: Fund a sender account via KMD printStep(2, 'Create and fund sender account') const sender = await algorand.account.fromEnvironment('CONTINUOUS_SENDER', algo(10)) const senderAddr = sender.addr.toString() printInfo(`Sender: ${shortenAddress(senderAddr)}`)
// Step 3: Create subscriber with continuous polling config printStep(3, 'Create AlgorandSubscriber') let watermark = 0n
const subscriber = new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr, }, }, ], frequencyInSeconds: 1, waitForBlockWhenAtTip: true, syncBehaviour: 'sync-oldest-start-now', watermarkPersistence: { get: async () => watermark, set: async (w: bigint) => { watermark = w }, }, }, algorand.client.algod, ) printInfo(`frequencyInSeconds: 1`) printInfo(`waitForBlockWhenAtTip: true`) printInfo(`syncBehaviour: sync-oldest-start-now`) printSuccess('Subscriber created')
// Step 4: Register event handler for matched payments printStep(4, 'Register event handlers') const matchedTxns: string[] = []
subscriber.on('payments', (txn) => { matchedTxns.push(txn.id) printInfo(`Matched payment: ${txn.id}`) }) printSuccess('Registered on("payments") listener')
// Step 5: Register SIGINT/SIGTERM handlers for graceful shutdown printStep(5, 'Register signal handlers') const signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM'] for (const signal of signals) { process.on(signal, async () => { printInfo(`Signal received: ${signal}`) await subscriber.stop(signal) process.exit(0) }) } printSuccess('Registered SIGINT and SIGTERM handlers')
// Step 6: Start continuous polling with inspect callback printStep(6, 'Start continuous subscriber')
subscriber.start((pollResult) => { printInfo( `Poll: round range [${pollResult.syncedRoundRange[0]}, ${pollResult.syncedRoundRange[1]}] — ` + `${pollResult.subscribedTransactions.length} matched, watermark ${pollResult.newWatermark}`, ) }) printSuccess('Subscriber started')
// Step 7: Send 3 payment transactions with short delays printStep(7, 'Send 3 payment transactions')
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
for (let i = 1; i <= 3; i++) { const result = await algorand.send.payment({ sender: sender.addr, receiver: sender.addr, amount: algo(1), note: `continuous txn ${i}`, }) printInfo(`Txn ${i} ID: ${result.txIds.at(-1)}`) printInfo(`Txn ${i} round: ${result.confirmation!.confirmedRound!.toString()}`) await delay(500) } printSuccess('Sent 3 payment transactions')
// Step 8: Auto-stop after ~4 seconds printStep(8, 'Wait for subscriber to catch up, then stop')
await new Promise<void>((resolve) => { setTimeout(async () => { printInfo(`Auto-stop: stopping after ~4 seconds`) await subscriber.stop('example-done') resolve() }, 4000) })
// Step 9: Verify at least 3 matched transactions printStep(9, 'Verify matched transactions') printInfo(`Total matched: ${matchedTxns.length.toString()}`)
if (matchedTxns.length < 3) { printError(`Expected at least 3 matched transactions, got ${matchedTxns.length}`) throw new Error(`Expected at least 3 matched transactions, got ${matchedTxns.length}`) } printSuccess(`${matchedTxns.length} transactions matched (>= 3)`)
for (const id of matchedTxns) { printInfo(`Matched txn: ${id}`) }
printHeader('Example complete') process.exit(0)}
main().catch((err) => { printError(err.message) process.exit(1)})Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer Subscription
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling