Watermark Persistence
Description
Section titled “Description”This example demonstrates file-backed watermark persistence across polls.
- Implement get/set callbacks for file-based watermark storage
- Verify watermark advances after each poll
- Demonstrate at-least-once delivery semantics
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscribernpx tsx 11-watermark-persistence.ts/** * Example: Watermark Persistence * * This example demonstrates file-backed watermark persistence across polls. * - Implement get/set callbacks for file-based watermark storage * - Verify watermark advances after each poll * - Demonstrate at-least-once delivery semantics * * Prerequisites: * - LocalNet running (via `algokit localnet start`) */import fs from 'node:fs'import os from 'node:os'import path from 'node:path'import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress, formatAlgo } from './shared/utils.js'
async function main() { printHeader('11 — Watermark Persistence')
// Step 1: Connect to LocalNet printStep(1, 'Connect to LocalNet') const algorand = AlgorandClient.defaultLocalNet() const status = await algorand.client.algod.status() printInfo(`Current round: ${status.lastRound.toString()}`) printSuccess('Connected to LocalNet')
// Step 2: Create and fund accounts printStep(2, 'Create and fund accounts') const sender = await algorand.account.fromEnvironment('WM_SENDER', algo(100)) const receiver = await algorand.account.fromEnvironment('WM_RECEIVER', algo(10)) const senderAddr = sender.addr.toString() const receiverAddr = receiver.addr.toString() printInfo(`Sender: ${shortenAddress(senderAddr)}`) printInfo(`Receiver: ${shortenAddress(receiverAddr)}`) printSuccess('Accounts created and funded')
// Step 3: Set up file-backed watermark persistence printStep(3, 'Set up file-backed watermark persistence') const watermarkFile = path.join(os.tmpdir(), 'example-watermark.txt')
// Clean up any leftover file from a previous run if (fs.existsSync(watermarkFile)) { fs.unlinkSync(watermarkFile) }
const watermarkPersistence = { get: async (): Promise<bigint> => { if (fs.existsSync(watermarkFile)) { const content = fs.readFileSync(watermarkFile, 'utf-8').trim() return BigInt(content) } return 0n }, set: async (newWatermark: bigint): Promise<void> => { fs.writeFileSync(watermarkFile, newWatermark.toString(), 'utf-8') }, }
printInfo(`Watermark file: ${watermarkFile}`) printInfo(`Initial watermark: ${(await watermarkPersistence.get()).toString()}`) printSuccess('File-backed watermark persistence configured')
// Step 4: Send first batch of 2 transactions printStep(4, 'Send first batch of 2 payments') let firstRound: bigint | undefined for (const note of ['batch1-txn1', 'batch1-txn2']) { const result = await algorand.send.payment({ sender: sender.addr, receiver: receiver.addr, amount: algo(1), note, }) const round = result.confirmation.confirmedRound! if (!firstRound) firstRound = round printInfo(`Sent ${note}: round ${round}`) } printSuccess('First batch of 2 payments sent')
// Step 5: Set initial watermark so we only scan from our first transaction printStep(5, 'Set initial watermark to isolate test transactions') const startWatermark = firstRound! - 1n await watermarkPersistence.set(startWatermark) printInfo(`Watermark set to: ${startWatermark.toString()}`) printSuccess('Watermark positioned before first batch')
// Step 6: First poll — should catch exactly 2 transactions printStep(6, 'First poll — expect 2 transactions from batch 1')
function createSubscriber() { return new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr, receiver: receiverAddr, }, }, ], syncBehaviour: 'sync-oldest', maxRoundsToSync: 100, watermarkPersistence, }, algorand.client.algod, ) }
const subscriber1 = createSubscriber() const result1 = await subscriber1.pollOnce() const poll1Txns = result1.subscribedTransactions
printInfo(`Transactions matched: ${poll1Txns.length.toString()}`) for (const txn of poll1Txns) { const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : '' printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`) }
if (poll1Txns.length !== 2) { throw new Error(`Expected 2 transactions in first poll, got ${poll1Txns.length}`) } printSuccess('First poll caught exactly 2 transactions')
// Step 7: Verify watermark was saved to file printStep(7, 'Verify watermark persisted to file') const savedWatermark = await watermarkPersistence.get() const fileContent = fs.readFileSync(watermarkFile, 'utf-8').trim() printInfo(`File content: ${fileContent}`) printInfo(`Watermark value: ${savedWatermark.toString()}`)
if (savedWatermark <= startWatermark) { throw new Error(`Watermark should have advanced past ${startWatermark}, but is ${savedWatermark}`) } printSuccess(`Watermark advanced: ${startWatermark} -> ${savedWatermark}`)
// Step 8: Send second batch of 2 transactions printStep(8, 'Send second batch of 2 payments') for (const note of ['batch2-txn1', 'batch2-txn2']) { const result = await algorand.send.payment({ sender: sender.addr, receiver: receiver.addr, amount: algo(2), note, }) printInfo(`Sent ${note}: round ${result.confirmation.confirmedRound}`) } printSuccess('Second batch of 2 payments sent')
// Step 9: Second poll — should catch ONLY the 2 new transactions (not the old ones) printStep(9, 'Second poll — expect only 2 NEW transactions from batch 2')
// Create a fresh subscriber instance — it reads watermark from file, // proving persistence works across subscriber instances const subscriber2 = createSubscriber() const result2 = await subscriber2.pollOnce() const poll2Txns = result2.subscribedTransactions
printInfo(`Transactions matched: ${poll2Txns.length.toString()}`) for (const txn of poll2Txns) { const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : '' printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`) }
if (poll2Txns.length !== 2) { throw new Error(`Expected 2 transactions in second poll, got ${poll2Txns.length}`) }
// Verify these are batch2 transactions, not batch1 const poll2Notes = poll2Txns.map((txn) => (txn.note ? Buffer.from(txn.note).toString('utf-8') : '')) const allBatch2 = poll2Notes.every((note) => note.startsWith('batch2')) if (!allBatch2) { throw new Error(`Expected only batch2 transactions, got: ${poll2Notes.join(', ')}`) } printSuccess('Second poll caught exactly 2 NEW transactions (batch2 only)')
// Step 10: Verify final watermark advanced again printStep(10, 'Verify final watermark') const finalWatermark = await watermarkPersistence.get() printInfo(`Final watermark: ${finalWatermark.toString()}`)
if (finalWatermark <= savedWatermark) { throw new Error(`Final watermark should have advanced past ${savedWatermark}, but is ${finalWatermark}`) } printSuccess(`Watermark advanced: ${savedWatermark} -> ${finalWatermark}`)
// Step 11: Explain at-least-once delivery semantics printStep(11, 'At-least-once delivery semantics') console.log() console.log(' ┌─────────────────────────────────────────────────────────────┐') console.log(' │ Watermark Persistence & Delivery Semantics │') console.log(' ├─────────────────────────────────────────────────────────────┤') console.log(' │ │') console.log(' │ The watermark is updated AFTER processing completes: │') console.log(' │ │') console.log(' │ 1. get() -> read current watermark │') console.log(' │ 2. Fetch transactions from watermark to tip │') console.log(' │ 3. Fire on/onBatch handlers │') console.log(' │ 4. set(newWatermark) -> persist new watermark │') console.log(' │ │') console.log(' │ If the process crashes between steps 3 and 4, the │') console.log(' │ watermark is NOT updated. On restart, the same │') console.log(' │ transactions will be re-fetched and re-processed. │') console.log(' │ │') console.log(' │ This gives AT-LEAST-ONCE delivery: │') console.log(' │ - Every transaction is guaranteed to be processed │') console.log(' │ - Some transactions MAY be processed more than once │') console.log(' │ - Handlers should be idempotent (safe to re-run) │') console.log(' │ │') console.log(' │ To achieve exactly-once semantics, persist the watermark │') console.log(' │ in the same atomic transaction as your business logic │') console.log(' │ (e.g., in a database transaction). │') console.log(' │ │') console.log(' └─────────────────────────────────────────────────────────────┘') console.log()
// Step 12: Clean up temp file printStep(12, 'Clean up temp file') fs.unlinkSync(watermarkFile) printInfo(`Deleted: ${watermarkFile}`) printSuccess('Temp file cleaned up')
printHeader('Example complete')}
main().catch((err) => { printError(err.message) process.exit(1)})Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer Subscription
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling