Skip to content

Continuous Subscriber

← Back to Examples

This example demonstrates continuous polling with start/stop and event handlers.

  • Create a subscriber with frequencyInSeconds and waitForBlockWhenAtTip
  • Register event handlers with subscriber.on()
  • Start continuous polling with subscriber.start()
  • Graceful shutdown with signal handlers
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
npx tsx 02-continuous-subscriber.ts

View source on GitHub

02-continuous-subscriber.ts
/**
* Example: Continuous Subscriber
*
* This example demonstrates continuous polling with start/stop and event handlers.
* - Create a subscriber with frequencyInSeconds and waitForBlockWhenAtTip
* - Register event handlers with subscriber.on()
* - Start continuous polling with subscriber.start()
* - Graceful shutdown with signal handlers
*
* Prerequisites:
* - LocalNet running (via `algokit localnet start`)
*/
import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'
import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'
import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress } from './shared/utils.js'
async function main() {
printHeader('02 — Continuous Subscriber')
// Step 1: Set up AlgorandClient for LocalNet
printStep(1, 'Connect to LocalNet')
const algorand = AlgorandClient.defaultLocalNet()
const status = await algorand.client.algod.status()
printInfo(`Current round: ${status.lastRound.toString()}`)
printSuccess('Connected to LocalNet')
// Step 2: Fund a sender account via KMD
printStep(2, 'Create and fund sender account')
const sender = await algorand.account.fromEnvironment('CONTINUOUS_SENDER', algo(10))
const senderAddr = sender.addr.toString()
printInfo(`Sender: ${shortenAddress(senderAddr)}`)
// Step 3: Create subscriber with continuous polling config
printStep(3, 'Create AlgorandSubscriber')
let watermark = 0n
const subscriber = new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: {
sender: senderAddr,
},
},
],
frequencyInSeconds: 1,
waitForBlockWhenAtTip: true,
syncBehaviour: 'sync-oldest-start-now',
watermarkPersistence: {
get: async () => watermark,
set: async (w: bigint) => {
watermark = w
},
},
},
algorand.client.algod,
)
printInfo(`frequencyInSeconds: 1`)
printInfo(`waitForBlockWhenAtTip: true`)
printInfo(`syncBehaviour: sync-oldest-start-now`)
printSuccess('Subscriber created')
// Step 4: Register event handler for matched payments
printStep(4, 'Register event handlers')
const matchedTxns: string[] = []
subscriber.on('payments', (txn) => {
matchedTxns.push(txn.id)
printInfo(`Matched payment: ${txn.id}`)
})
printSuccess('Registered on("payments") listener')
// Step 5: Register SIGINT/SIGTERM handlers for graceful shutdown
printStep(5, 'Register signal handlers')
const signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM']
for (const signal of signals) {
process.on(signal, async () => {
printInfo(`Signal received: ${signal}`)
await subscriber.stop(signal)
process.exit(0)
})
}
printSuccess('Registered SIGINT and SIGTERM handlers')
// Step 6: Start continuous polling with inspect callback
printStep(6, 'Start continuous subscriber')
subscriber.start((pollResult) => {
printInfo(
`Poll: round range [${pollResult.syncedRoundRange[0]}, ${pollResult.syncedRoundRange[1]}] — ` +
`${pollResult.subscribedTransactions.length} matched, watermark ${pollResult.newWatermark}`,
)
})
printSuccess('Subscriber started')
// Step 7: Send 3 payment transactions with short delays
printStep(7, 'Send 3 payment transactions')
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
for (let i = 1; i <= 3; i++) {
const result = await algorand.send.payment({
sender: sender.addr,
receiver: sender.addr,
amount: algo(1),
note: `continuous txn ${i}`,
})
printInfo(`Txn ${i} ID: ${result.txIds.at(-1)}`)
printInfo(`Txn ${i} round: ${result.confirmation!.confirmedRound!.toString()}`)
await delay(500)
}
printSuccess('Sent 3 payment transactions')
// Step 8: Auto-stop after ~4 seconds
printStep(8, 'Wait for subscriber to catch up, then stop')
await new Promise<void>((resolve) => {
setTimeout(async () => {
printInfo(`Auto-stop: stopping after ~4 seconds`)
await subscriber.stop('example-done')
resolve()
}, 4000)
})
// Step 9: Verify at least 3 matched transactions
printStep(9, 'Verify matched transactions')
printInfo(`Total matched: ${matchedTxns.length.toString()}`)
if (matchedTxns.length < 3) {
printError(`Expected at least 3 matched transactions, got ${matchedTxns.length}`)
throw new Error(`Expected at least 3 matched transactions, got ${matchedTxns.length}`)
}
printSuccess(`${matchedTxns.length} transactions matched (>= 3)`)
for (const id of matchedTxns) {
printInfo(`Matched txn: ${id}`)
}
printHeader('Example complete')
process.exit(0)
}
main().catch((err) => {
printError(err.message)
process.exit(1)
})