Skip to content

Watermark Persistence

← Back to Examples

This example demonstrates file-backed watermark persistence across polls.

  • Implement get/set callbacks for file-based watermark storage
  • Verify watermark advances after each poll
  • Demonstrate at-least-once delivery semantics
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
npx tsx 11-watermark-persistence.ts

View source on GitHub

11-watermark-persistence.ts
/**
* Example: Watermark Persistence
*
* This example demonstrates file-backed watermark persistence across polls.
* - Implement get/set callbacks for file-based watermark storage
* - Verify watermark advances after each poll
* - Demonstrate at-least-once delivery semantics
*
* Prerequisites:
* - LocalNet running (via `algokit localnet start`)
*/
import fs from 'node:fs'
import os from 'node:os'
import path from 'node:path'
import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils'
import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber'
import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress, formatAlgo } from './shared/utils.js'
async function main() {
printHeader('11 — Watermark Persistence')
// Step 1: Connect to LocalNet
printStep(1, 'Connect to LocalNet')
const algorand = AlgorandClient.defaultLocalNet()
const status = await algorand.client.algod.status()
printInfo(`Current round: ${status.lastRound.toString()}`)
printSuccess('Connected to LocalNet')
// Step 2: Create and fund accounts
printStep(2, 'Create and fund accounts')
const sender = await algorand.account.fromEnvironment('WM_SENDER', algo(100))
const receiver = await algorand.account.fromEnvironment('WM_RECEIVER', algo(10))
const senderAddr = sender.addr.toString()
const receiverAddr = receiver.addr.toString()
printInfo(`Sender: ${shortenAddress(senderAddr)}`)
printInfo(`Receiver: ${shortenAddress(receiverAddr)}`)
printSuccess('Accounts created and funded')
// Step 3: Set up file-backed watermark persistence
printStep(3, 'Set up file-backed watermark persistence')
const watermarkFile = path.join(os.tmpdir(), 'example-watermark.txt')
// Clean up any leftover file from a previous run
if (fs.existsSync(watermarkFile)) {
fs.unlinkSync(watermarkFile)
}
const watermarkPersistence = {
get: async (): Promise<bigint> => {
if (fs.existsSync(watermarkFile)) {
const content = fs.readFileSync(watermarkFile, 'utf-8').trim()
return BigInt(content)
}
return 0n
},
set: async (newWatermark: bigint): Promise<void> => {
fs.writeFileSync(watermarkFile, newWatermark.toString(), 'utf-8')
},
}
printInfo(`Watermark file: ${watermarkFile}`)
printInfo(`Initial watermark: ${(await watermarkPersistence.get()).toString()}`)
printSuccess('File-backed watermark persistence configured')
// Step 4: Send first batch of 2 transactions
printStep(4, 'Send first batch of 2 payments')
let firstRound: bigint | undefined
for (const note of ['batch1-txn1', 'batch1-txn2']) {
const result = await algorand.send.payment({
sender: sender.addr,
receiver: receiver.addr,
amount: algo(1),
note,
})
const round = result.confirmation.confirmedRound!
if (!firstRound) firstRound = round
printInfo(`Sent ${note}: round ${round}`)
}
printSuccess('First batch of 2 payments sent')
// Step 5: Set initial watermark so we only scan from our first transaction
printStep(5, 'Set initial watermark to isolate test transactions')
const startWatermark = firstRound! - 1n
await watermarkPersistence.set(startWatermark)
printInfo(`Watermark set to: ${startWatermark.toString()}`)
printSuccess('Watermark positioned before first batch')
// Step 6: First poll — should catch exactly 2 transactions
printStep(6, 'First poll — expect 2 transactions from batch 1')
function createSubscriber() {
return new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: {
sender: senderAddr,
receiver: receiverAddr,
},
},
],
syncBehaviour: 'sync-oldest',
maxRoundsToSync: 100,
watermarkPersistence,
},
algorand.client.algod,
)
}
const subscriber1 = createSubscriber()
const result1 = await subscriber1.pollOnce()
const poll1Txns = result1.subscribedTransactions
printInfo(`Transactions matched: ${poll1Txns.length.toString()}`)
for (const txn of poll1Txns) {
const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : ''
printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`)
}
if (poll1Txns.length !== 2) {
throw new Error(`Expected 2 transactions in first poll, got ${poll1Txns.length}`)
}
printSuccess('First poll caught exactly 2 transactions')
// Step 7: Verify watermark was saved to file
printStep(7, 'Verify watermark persisted to file')
const savedWatermark = await watermarkPersistence.get()
const fileContent = fs.readFileSync(watermarkFile, 'utf-8').trim()
printInfo(`File content: ${fileContent}`)
printInfo(`Watermark value: ${savedWatermark.toString()}`)
if (savedWatermark <= startWatermark) {
throw new Error(`Watermark should have advanced past ${startWatermark}, but is ${savedWatermark}`)
}
printSuccess(`Watermark advanced: ${startWatermark} -> ${savedWatermark}`)
// Step 8: Send second batch of 2 transactions
printStep(8, 'Send second batch of 2 payments')
for (const note of ['batch2-txn1', 'batch2-txn2']) {
const result = await algorand.send.payment({
sender: sender.addr,
receiver: receiver.addr,
amount: algo(2),
note,
})
printInfo(`Sent ${note}: round ${result.confirmation.confirmedRound}`)
}
printSuccess('Second batch of 2 payments sent')
// Step 9: Second poll — should catch ONLY the 2 new transactions (not the old ones)
printStep(9, 'Second poll — expect only 2 NEW transactions from batch 2')
// Create a fresh subscriber instance — it reads watermark from file,
// proving persistence works across subscriber instances
const subscriber2 = createSubscriber()
const result2 = await subscriber2.pollOnce()
const poll2Txns = result2.subscribedTransactions
printInfo(`Transactions matched: ${poll2Txns.length.toString()}`)
for (const txn of poll2Txns) {
const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : ''
printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`)
}
if (poll2Txns.length !== 2) {
throw new Error(`Expected 2 transactions in second poll, got ${poll2Txns.length}`)
}
// Verify these are batch2 transactions, not batch1
const poll2Notes = poll2Txns.map((txn) => (txn.note ? Buffer.from(txn.note).toString('utf-8') : ''))
const allBatch2 = poll2Notes.every((note) => note.startsWith('batch2'))
if (!allBatch2) {
throw new Error(`Expected only batch2 transactions, got: ${poll2Notes.join(', ')}`)
}
printSuccess('Second poll caught exactly 2 NEW transactions (batch2 only)')
// Step 10: Verify final watermark advanced again
printStep(10, 'Verify final watermark')
const finalWatermark = await watermarkPersistence.get()
printInfo(`Final watermark: ${finalWatermark.toString()}`)
if (finalWatermark <= savedWatermark) {
throw new Error(`Final watermark should have advanced past ${savedWatermark}, but is ${finalWatermark}`)
}
printSuccess(`Watermark advanced: ${savedWatermark} -> ${finalWatermark}`)
// Step 11: Explain at-least-once delivery semantics
printStep(11, 'At-least-once delivery semantics')
console.log()
console.log(' ┌─────────────────────────────────────────────────────────────┐')
console.log(' │ Watermark Persistence & Delivery Semantics │')
console.log(' ├─────────────────────────────────────────────────────────────┤')
console.log(' │ │')
console.log(' │ The watermark is updated AFTER processing completes: │')
console.log(' │ │')
console.log(' │ 1. get() -> read current watermark │')
console.log(' │ 2. Fetch transactions from watermark to tip │')
console.log(' │ 3. Fire on/onBatch handlers │')
console.log(' │ 4. set(newWatermark) -> persist new watermark │')
console.log(' │ │')
console.log(' │ If the process crashes between steps 3 and 4, the │')
console.log(' │ watermark is NOT updated. On restart, the same │')
console.log(' │ transactions will be re-fetched and re-processed. │')
console.log(' │ │')
console.log(' │ This gives AT-LEAST-ONCE delivery: │')
console.log(' │ - Every transaction is guaranteed to be processed │')
console.log(' │ - Some transactions MAY be processed more than once │')
console.log(' │ - Handlers should be idempotent (safe to re-run) │')
console.log(' │ │')
console.log(' │ To achieve exactly-once semantics, persist the watermark │')
console.log(' │ in the same atomic transaction as your business logic │')
console.log(' │ (e.g., in a database transaction). │')
console.log(' │ │')
console.log(' └─────────────────────────────────────────────────────────────┘')
console.log()
// Step 12: Clean up temp file
printStep(12, 'Clean up temp file')
fs.unlinkSync(watermarkFile)
printInfo(`Deleted: ${watermarkFile}`)
printSuccess('Temp file cleaned up')
printHeader('Example complete')
}
main().catch((err) => {
printError(err.message)
process.exit(1)
})