Skip to content

Watermark Persistence

← Back to Examples

Demonstrates file-backed watermark persistence across multiple polls:

  • Implement get/set callbacks using a temp file
  • Poll twice: batch 1 catches first 2 txns, batch 2 catches next 2 txns
  • Verify watermark advances between polls
  • Verify no duplication across polls
  • Explain at-least-once delivery semantics
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
uv run python 11_watermark_persistence.py

View source on GitHub

11_watermark_persistence.py
"""Example 11: Watermark Persistence
Demonstrates file-backed watermark persistence across multiple polls:
- Implement get/set callbacks using a temp file
- Poll twice: batch 1 catches first 2 txns, batch 2 catches next 2 txns
- Verify watermark advances between polls
- Verify no duplication across polls
- Explain at-least-once delivery semantics
Prerequisites:
- LocalNet running (via `algokit localnet start`)
"""
import json
import tempfile
from pathlib import Path
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParams
from shared import (
ALGOD_CONFIG,
KMD_CONFIG,
print_header,
print_info,
print_step,
print_success,
shorten_address,
)
from algokit_subscriber import (
AlgorandSubscriber,
AlgorandSubscriberConfig,
SubscriberConfigFilter,
TransactionFilter,
WatermarkPersistence,
)
def file_watermark(path: Path) -> WatermarkPersistence:
"""Create a file-backed WatermarkPersistence using a JSON file."""
def get_watermark() -> int:
if not path.exists():
return 0
data = json.loads(path.read_text())
return int(data.get("watermark", 0))
def set_watermark(value: int) -> None:
path.write_text(json.dumps({"watermark": value}))
return WatermarkPersistence(get=get_watermark, set=set_watermark)
def create_subscriber(
algorand: AlgorandClient,
sender: str,
receiver: str,
persistence: WatermarkPersistence,
) -> AlgorandSubscriber:
"""Create a subscriber configured for payment filtering."""
return AlgorandSubscriber(
config=AlgorandSubscriberConfig(
filters=[
SubscriberConfigFilter(
name="payments",
filter=TransactionFilter(sender=sender, receiver=receiver),
),
],
watermark_persistence=persistence,
sync_behaviour="sync-oldest",
max_rounds_to_sync=100,
),
algod_client=algorand.client.algod,
)
def send_batch(
algorand: AlgorandClient,
sender: str,
receiver: str,
amount: AlgoAmount,
notes: list[str],
) -> int | None:
"""Send a batch of payments. Returns first confirmed round."""
first_round: int | None = None
for note in notes:
result = algorand.send.payment(
PaymentParams(
sender=sender,
receiver=receiver,
amount=amount,
note=note.encode(),
)
)
confirmed_round = result.confirmation.confirmed_round
if first_round is None:
first_round = confirmed_round
print_info(f"Sent {note}: round {confirmed_round}")
return first_round
def poll_and_display(
algorand: AlgorandClient,
sender: str,
receiver: str,
persistence: WatermarkPersistence,
) -> list[dict[str, str]]:
"""Poll once and display matched transactions. Returns list of {note, id}."""
subscriber = create_subscriber(algorand, sender, receiver, persistence)
result = subscriber.poll_once()
txns = result.subscribed_transactions
print_info(f"Transactions matched: {len(txns)}")
results = []
for txn in txns:
note = txn.note.decode("utf-8") if txn.note else ""
print_info(f" {note}: id: {txn.id_[:12]}...")
results.append({"note": note, "id": txn.id_})
return results
def print_delivery_box() -> None:
"""Print the at-least-once delivery semantics explanation."""
print()
print(" ┌─────────────────────────────────────────────────────────────┐")
print(" │ Watermark Persistence & Delivery Semantics │")
print(" ├─────────────────────────────────────────────────────────────┤")
print(" │ │")
print(" │ The watermark is updated AFTER processing completes: │")
print(" │ │")
print(" │ 1. get() -> read current watermark │")
print(" │ 2. Fetch transactions from watermark to tip │")
print(" │ 3. Fire on/on_batch handlers │")
print(" │ 4. set(newWatermark) -> persist new watermark │")
print(" │ │")
print(" │ If the process crashes between steps 3 and 4, the │")
print(" │ watermark is NOT updated. On restart, the same │")
print(" │ transactions will be re-fetched and re-processed. │")
print(" │ │")
print(" │ This gives AT-LEAST-ONCE delivery: │")
print(" │ - Every transaction is guaranteed to be processed │")
print(" │ - Some transactions MAY be processed more than once │")
print(" │ - Handlers should be idempotent (safe to re-run) │")
print(" │ │")
print(" │ To achieve exactly-once semantics, persist the watermark │")
print(" │ in the same atomic transaction as your business logic │")
print(" │ (e.g., in a database transaction). │")
print(" │ │")
print(" └─────────────────────────────────────────────────────────────┘")
print()
def main() -> None:
print_header("11 — Watermark Persistence")
# Step 1: Connect to LocalNet
print_step(1, "Connect to LocalNet")
algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG)
status = algorand.client.algod.status()
print_info(f"Current round: {status.last_round}")
print_success("Connected to LocalNet")
# Step 2: Create and fund accounts
print_step(2, "Create and fund accounts")
dispenser = algorand.account.localnet_dispenser().addr
sender = algorand.account.random().addr
receiver = algorand.account.random().addr
algorand.send.payment(
PaymentParams(sender=dispenser, receiver=sender, amount=AlgoAmount(algo=100))
)
algorand.send.payment(
PaymentParams(sender=dispenser, receiver=receiver, amount=AlgoAmount(algo=10))
)
print_info(f"Sender: {shorten_address(sender)}")
print_info(f"Receiver: {shorten_address(receiver)}")
print_success("Accounts created and funded")
# Step 3: Set up file-backed watermark persistence
print_step(3, "Set up file-backed watermark persistence")
tmp_dir = tempfile.mkdtemp(prefix="algokit_watermark_")
watermark_path = Path(tmp_dir) / "watermark.json"
persistence = file_watermark(watermark_path)
print_info(f"Watermark file: {watermark_path}")
print_info(f"Initial watermark: {persistence.get()}")
print_success("File-backed watermark persistence configured")
try:
_run_example(algorand, sender, receiver, persistence, watermark_path)
finally:
# Step 12: Clean up temp file
print_step(12, "Clean up temp file")
if watermark_path.exists():
watermark_path.unlink()
Path(tmp_dir).rmdir()
print_info(f"Deleted: {watermark_path}")
print_success("Temp file cleaned up")
print_header("Example complete")
def _run_example(
algorand: AlgorandClient,
sender: str,
receiver: str,
persistence: WatermarkPersistence,
watermark_path: Path,
) -> None:
"""Execute both polls and verify results."""
# Step 4: Send first batch of 2 payments (1 ALGO each)
print_step(4, "Send first batch of 2 payments")
first_round = send_batch(
algorand,
sender,
receiver,
AlgoAmount(algo=1),
["batch1-txn1", "batch1-txn2"],
)
print_success("First batch of 2 payments sent")
# Step 5: Set initial watermark to isolate test transactions
print_step(5, "Set initial watermark to isolate test transactions")
assert first_round is not None
start_watermark = first_round - 1
persistence.set(start_watermark)
print_info(f"Watermark set to: {start_watermark}")
print_success("Watermark positioned before first batch")
# Step 6: First poll — expect 2 transactions from batch 1
print_step(6, "First poll — expect 2 transactions from batch 1")
poll1 = poll_and_display(algorand, sender, receiver, persistence)
assert len(poll1) == 2, f"Expected 2 transactions in first poll, got {len(poll1)}"
print_success("First poll caught exactly 2 transactions")
# Step 7: Verify watermark persisted to file
print_step(7, "Verify watermark persisted to file")
saved_watermark = persistence.get()
file_content = json.loads(watermark_path.read_text())["watermark"]
print_info(f"File content: {file_content}")
print_info(f"Watermark value: {saved_watermark}")
assert saved_watermark is not None
assert saved_watermark > start_watermark, (
f"Watermark should have advanced past {start_watermark}, but is {saved_watermark}"
)
print_success(f"Watermark advanced: {start_watermark} -> {saved_watermark}")
# Step 8: Send second batch of 2 payments (2 ALGO each)
print_step(8, "Send second batch of 2 payments")
send_batch(
algorand,
sender,
receiver,
AlgoAmount(algo=2),
["batch2-txn1", "batch2-txn2"],
)
print_success("Second batch of 2 payments sent")
# Step 9: Second poll — expect only 2 NEW transactions from batch 2
print_step(9, "Second poll — expect only 2 NEW transactions from batch 2")
poll2 = poll_and_display(algorand, sender, receiver, persistence)
assert len(poll2) == 2, f"Expected 2 transactions in second poll, got {len(poll2)}"
poll2_notes = [t["note"] for t in poll2]
assert all(n.startswith("batch2") for n in poll2_notes), (
f"Expected only batch2 transactions, got: {', '.join(poll2_notes)}"
)
print_success("Second poll caught exactly 2 NEW transactions (batch2 only)")
# Step 10: Verify final watermark advanced again
print_step(10, "Verify final watermark")
final_watermark = persistence.get()
print_info(f"Final watermark: {final_watermark}")
assert final_watermark is not None
assert final_watermark > saved_watermark, (
f"Final watermark should have advanced past {saved_watermark}, but is {final_watermark}"
)
print_success(f"Watermark advanced: {saved_watermark} -> {final_watermark}")
# Step 11: At-least-once delivery semantics
print_step(11, "At-least-once delivery semantics")
print_delivery_box()
if __name__ == "__main__":
main()