Watermark Persistence
Description
Section titled “Description”Demonstrates file-backed watermark persistence across multiple polls:
- Implement get/set callbacks using a temp file
- Poll twice: batch 1 catches first 2 txns, batch 2 catches next 2 txns
- Verify watermark advances between polls
- Verify no duplication across polls
- Explain at-least-once delivery semantics
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscriberuv run python 11_watermark_persistence.py"""Example 11: Watermark Persistence
Demonstrates file-backed watermark persistence across multiple polls:- Implement get/set callbacks using a temp file- Poll twice: batch 1 catches first 2 txns, batch 2 catches next 2 txns- Verify watermark advances between polls- Verify no duplication across polls- Explain at-least-once delivery semantics
Prerequisites:- LocalNet running (via `algokit localnet start`)"""
import jsonimport tempfilefrom pathlib import Path
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParamsfrom shared import ( ALGOD_CONFIG, KMD_CONFIG, print_header, print_info, print_step, print_success, shorten_address,)
from algokit_subscriber import ( AlgorandSubscriber, AlgorandSubscriberConfig, SubscriberConfigFilter, TransactionFilter, WatermarkPersistence,)
def file_watermark(path: Path) -> WatermarkPersistence: """Create a file-backed WatermarkPersistence using a JSON file."""
def get_watermark() -> int: if not path.exists(): return 0 data = json.loads(path.read_text()) return int(data.get("watermark", 0))
def set_watermark(value: int) -> None: path.write_text(json.dumps({"watermark": value}))
return WatermarkPersistence(get=get_watermark, set=set_watermark)
def create_subscriber( algorand: AlgorandClient, sender: str, receiver: str, persistence: WatermarkPersistence,) -> AlgorandSubscriber: """Create a subscriber configured for payment filtering.""" return AlgorandSubscriber( config=AlgorandSubscriberConfig( filters=[ SubscriberConfigFilter( name="payments", filter=TransactionFilter(sender=sender, receiver=receiver), ), ], watermark_persistence=persistence, sync_behaviour="sync-oldest", max_rounds_to_sync=100, ), algod_client=algorand.client.algod, )
def send_batch( algorand: AlgorandClient, sender: str, receiver: str, amount: AlgoAmount, notes: list[str],) -> int | None: """Send a batch of payments. Returns first confirmed round.""" first_round: int | None = None for note in notes: result = algorand.send.payment( PaymentParams( sender=sender, receiver=receiver, amount=amount, note=note.encode(), ) ) confirmed_round = result.confirmation.confirmed_round if first_round is None: first_round = confirmed_round print_info(f"Sent {note}: round {confirmed_round}") return first_round
def poll_and_display( algorand: AlgorandClient, sender: str, receiver: str, persistence: WatermarkPersistence,) -> list[dict[str, str]]: """Poll once and display matched transactions. Returns list of {note, id}.""" subscriber = create_subscriber(algorand, sender, receiver, persistence) result = subscriber.poll_once() txns = result.subscribed_transactions
print_info(f"Transactions matched: {len(txns)}") results = [] for txn in txns: note = txn.note.decode("utf-8") if txn.note else "" print_info(f" {note}: id: {txn.id_[:12]}...") results.append({"note": note, "id": txn.id_}) return results
def print_delivery_box() -> None: """Print the at-least-once delivery semantics explanation.""" print() print(" ┌─────────────────────────────────────────────────────────────┐") print(" │ Watermark Persistence & Delivery Semantics │") print(" ├─────────────────────────────────────────────────────────────┤") print(" │ │") print(" │ The watermark is updated AFTER processing completes: │") print(" │ │") print(" │ 1. get() -> read current watermark │") print(" │ 2. Fetch transactions from watermark to tip │") print(" │ 3. Fire on/on_batch handlers │") print(" │ 4. set(newWatermark) -> persist new watermark │") print(" │ │") print(" │ If the process crashes between steps 3 and 4, the │") print(" │ watermark is NOT updated. On restart, the same │") print(" │ transactions will be re-fetched and re-processed. │") print(" │ │") print(" │ This gives AT-LEAST-ONCE delivery: │") print(" │ - Every transaction is guaranteed to be processed │") print(" │ - Some transactions MAY be processed more than once │") print(" │ - Handlers should be idempotent (safe to re-run) │") print(" │ │") print(" │ To achieve exactly-once semantics, persist the watermark │") print(" │ in the same atomic transaction as your business logic │") print(" │ (e.g., in a database transaction). │") print(" │ │") print(" └─────────────────────────────────────────────────────────────┘") print()
def main() -> None: print_header("11 — Watermark Persistence")
# Step 1: Connect to LocalNet print_step(1, "Connect to LocalNet") algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG) status = algorand.client.algod.status() print_info(f"Current round: {status.last_round}") print_success("Connected to LocalNet")
# Step 2: Create and fund accounts print_step(2, "Create and fund accounts") dispenser = algorand.account.localnet_dispenser().addr sender = algorand.account.random().addr receiver = algorand.account.random().addr algorand.send.payment( PaymentParams(sender=dispenser, receiver=sender, amount=AlgoAmount(algo=100)) ) algorand.send.payment( PaymentParams(sender=dispenser, receiver=receiver, amount=AlgoAmount(algo=10)) ) print_info(f"Sender: {shorten_address(sender)}") print_info(f"Receiver: {shorten_address(receiver)}") print_success("Accounts created and funded")
# Step 3: Set up file-backed watermark persistence print_step(3, "Set up file-backed watermark persistence") tmp_dir = tempfile.mkdtemp(prefix="algokit_watermark_") watermark_path = Path(tmp_dir) / "watermark.json" persistence = file_watermark(watermark_path) print_info(f"Watermark file: {watermark_path}") print_info(f"Initial watermark: {persistence.get()}") print_success("File-backed watermark persistence configured")
try: _run_example(algorand, sender, receiver, persistence, watermark_path) finally: # Step 12: Clean up temp file print_step(12, "Clean up temp file") if watermark_path.exists(): watermark_path.unlink() Path(tmp_dir).rmdir() print_info(f"Deleted: {watermark_path}") print_success("Temp file cleaned up")
print_header("Example complete")
def _run_example( algorand: AlgorandClient, sender: str, receiver: str, persistence: WatermarkPersistence, watermark_path: Path,) -> None: """Execute both polls and verify results.""" # Step 4: Send first batch of 2 payments (1 ALGO each) print_step(4, "Send first batch of 2 payments") first_round = send_batch( algorand, sender, receiver, AlgoAmount(algo=1), ["batch1-txn1", "batch1-txn2"], ) print_success("First batch of 2 payments sent")
# Step 5: Set initial watermark to isolate test transactions print_step(5, "Set initial watermark to isolate test transactions") assert first_round is not None start_watermark = first_round - 1 persistence.set(start_watermark) print_info(f"Watermark set to: {start_watermark}") print_success("Watermark positioned before first batch")
# Step 6: First poll — expect 2 transactions from batch 1 print_step(6, "First poll — expect 2 transactions from batch 1") poll1 = poll_and_display(algorand, sender, receiver, persistence) assert len(poll1) == 2, f"Expected 2 transactions in first poll, got {len(poll1)}" print_success("First poll caught exactly 2 transactions")
# Step 7: Verify watermark persisted to file print_step(7, "Verify watermark persisted to file") saved_watermark = persistence.get() file_content = json.loads(watermark_path.read_text())["watermark"] print_info(f"File content: {file_content}") print_info(f"Watermark value: {saved_watermark}") assert saved_watermark is not None assert saved_watermark > start_watermark, ( f"Watermark should have advanced past {start_watermark}, but is {saved_watermark}" ) print_success(f"Watermark advanced: {start_watermark} -> {saved_watermark}")
# Step 8: Send second batch of 2 payments (2 ALGO each) print_step(8, "Send second batch of 2 payments") send_batch( algorand, sender, receiver, AlgoAmount(algo=2), ["batch2-txn1", "batch2-txn2"], ) print_success("Second batch of 2 payments sent")
# Step 9: Second poll — expect only 2 NEW transactions from batch 2 print_step(9, "Second poll — expect only 2 NEW transactions from batch 2") poll2 = poll_and_display(algorand, sender, receiver, persistence) assert len(poll2) == 2, f"Expected 2 transactions in second poll, got {len(poll2)}" poll2_notes = [t["note"] for t in poll2] assert all(n.startswith("batch2") for n in poll2_notes), ( f"Expected only batch2 transactions, got: {', '.join(poll2_notes)}" ) print_success("Second poll caught exactly 2 NEW transactions (batch2 only)")
# Step 10: Verify final watermark advanced again print_step(10, "Verify final watermark") final_watermark = persistence.get() print_info(f"Final watermark: {final_watermark}") assert final_watermark is not None assert final_watermark > saved_watermark, ( f"Final watermark should have advanced past {saved_watermark}, but is {final_watermark}" ) print_success(f"Watermark advanced: {saved_watermark} -> {final_watermark}")
# Step 11: At-least-once delivery semantics print_step(11, "At-least-once delivery semantics") print_delivery_box()
if __name__ == "__main__": main()Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling