Skip to content

Sync Behaviours

← Back to Examples

Demonstrates all 4 sync behaviours and maxRoundsToSync comparison:

  • sync-oldest: incremental catchup from watermark
  • skip-sync-newest: jump to tip, discard history
  • sync-oldest-start-now: hybrid first-start behavior
  • fail: throw when too far behind
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
uv run python 12_sync_behaviours.py

View source on GitHub

12_sync_behaviours.py
"""Example 12: Sync Behaviours
Demonstrates all 4 sync behaviours and maxRoundsToSync comparison:
- sync-oldest: incremental catchup from watermark
- skip-sync-newest: jump to tip, discard history
- sync-oldest-start-now: hybrid first-start behavior
- fail: throw when too far behind
Prerequisites:
- LocalNet running (via `algokit localnet start`)
"""
from dataclasses import dataclass
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParams
from shared import (
ALGOD_CONFIG,
KMD_CONFIG,
print_header,
print_info,
print_step,
print_success,
)
from algokit_subscriber import (
AlgorandSubscriber,
AlgorandSubscriberConfig,
SubscriberConfigFilter,
TransactionFilter,
WatermarkPersistence,
)
from algokit_subscriber.types.subscription import (
SyncBehaviour,
TransactionSubscriptionResult,
)
# Table column widths
_NAME_W = 32
_RANGE_W = 23
_TXNS_W = 12
_NOTE_W = 42
@dataclass
class BehaviourResult:
"""Outcome of a single sync-behaviour demonstration."""
name: str
synced_round_range: tuple[int, int]
current_round: int
starting_watermark: int
new_watermark: int
txn_count: int
note: str
def poll_with_behaviour(
algorand: AlgorandClient,
sender_addr: str,
watermark: int,
sync_behaviour: SyncBehaviour,
max_rounds: int,
) -> TransactionSubscriptionResult:
"""Create a subscriber with given sync behaviour and poll once."""
wm = watermark
def get_wm() -> int:
return wm
def set_wm(w: int) -> None:
nonlocal wm
wm = w
subscriber = AlgorandSubscriber(
config=AlgorandSubscriberConfig(
filters=[
SubscriberConfigFilter(
name="payments",
filter=TransactionFilter(sender=sender_addr),
),
],
sync_behaviour=sync_behaviour,
max_rounds_to_sync=max_rounds,
watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm),
),
algod_client=algorand.client.algod,
)
return subscriber.poll_once()
def _print_poll_details(
result: TransactionSubscriptionResult,
watermark: int,
max_rounds: int,
) -> None:
"""Print common details for a poll result."""
print_info(f"Watermark: {watermark}")
print_info(f"maxRoundsToSync: {max_rounds}")
sr = result.synced_round_range
print_info(f"syncedRoundRange: [{sr[0]}, {sr[1]}]")
print_info(f"currentRound (tip): {result.current_round}")
txn_count = len(result.subscribed_transactions)
print_info(f"Transactions matched: {txn_count}")
def _to_result(
name: str,
result: TransactionSubscriptionResult,
note: str,
) -> BehaviourResult:
"""Convert a TransactionSubscriptionResult to BehaviourResult."""
return BehaviourResult(
name=name,
synced_round_range=result.synced_round_range,
current_round=result.current_round,
starting_watermark=result.starting_watermark,
new_watermark=result.new_watermark,
txn_count=len(result.subscribed_transactions),
note=note,
)
def _table_row(
name: str,
rng: str,
txns: int,
note: str,
) -> str:
"""Format a single table row."""
return (
f" │ {name.ljust(_NAME_W)} │ "
f"{rng.ljust(_RANGE_W)} │ "
f"{str(txns).ljust(_TXNS_W)} │ "
f"{note.ljust(_NOTE_W)} │"
)
def _table_border(ch: str) -> str:
"""Build a horizontal table border (ch = ┌├└)."""
sep = {"┌": ("┬", "┐"), "├": ("┼", "┤"), "└": ("┴", "┘")}
mid, end = sep[ch]
return (
f" {ch}{'─' * (_NAME_W + 2)}{mid}"
f"{'─' * (_RANGE_W + 2)}{mid}"
f"{'─' * (_TXNS_W + 2)}{mid}"
f"{'─' * (_NOTE_W + 2)}{end}"
)
def demonstrate_sync_oldest(
algorand: AlgorandClient,
sender_addr: str,
old_watermark: int,
small_max: int,
) -> BehaviourResult:
"""Step 4: sync-oldest — starts from watermark, syncs forward."""
print_step(
4,
"sync-oldest — starts from watermark, syncs forward (limited by maxRoundsToSync)",
)
result = poll_with_behaviour(algorand, sender_addr, old_watermark, "sync-oldest", small_max)
_print_poll_details(result, old_watermark, small_max)
print("")
print(" Explanation: sync-oldest starts from watermark+1 and syncs forward")
print(f" only {small_max} rounds. It does NOT jump to the tip. This is useful for")
print(" incremental catchup — each poll processes a batch of rounds.")
print_success("sync-oldest demonstrated")
note = f"Syncs {small_max} rounds from watermark"
return _to_result("sync-oldest", result, note)
def demonstrate_skip_sync_newest(
algorand: AlgorandClient,
sender_addr: str,
old_watermark: int,
small_max: int,
) -> BehaviourResult:
"""Step 5: skip-sync-newest — jumps to tip."""
print_step(5, "skip-sync-newest — jumps to tip, only sees latest rounds")
result = poll_with_behaviour(
algorand,
sender_addr,
old_watermark,
"skip-sync-newest",
small_max,
)
_print_poll_details(result, old_watermark, small_max)
print("")
print(" Explanation: skip-sync-newest jumps to currentRound - maxRoundsToSync + 1.")
print(" It discards all old history and only sees the newest rounds. Useful for")
print(" real-time notifications where you don't care about catching up.")
print_success("skip-sync-newest demonstrated")
note = f"Jumps to tip, scans last {small_max} rounds"
return _to_result("skip-sync-newest", result, note)
def demonstrate_start_now(
algorand: AlgorandClient,
sender_addr: str,
small_max: int,
) -> BehaviourResult:
"""Step 6: sync-oldest-start-now — wm=0 starts from tip."""
print_step(
6,
"sync-oldest-start-now — when watermark=0, starts from current round (not round 1)",
)
result = poll_with_behaviour(algorand, sender_addr, 0, "sync-oldest-start-now", small_max)
print_info("Watermark: 0 (fresh start)")
print_info(f"maxRoundsToSync: {small_max}")
sr = result.synced_round_range
print_info(f"syncedRoundRange: [{sr[0]}, {sr[1]}]")
print_info(f"currentRound (tip): {result.current_round}")
txn_count = len(result.subscribed_transactions)
print_info(f"Transactions matched: {txn_count}")
print("")
print(" Explanation: When watermark=0, sync-oldest-start-now behaves like")
print(" skip-sync-newest — it jumps to the tip instead of syncing from round 1.")
print(" This avoids scanning the entire chain history on first startup.")
print(" Once watermark > 0 (after first poll), it behaves like sync-oldest.")
print_success("sync-oldest-start-now demonstrated")
note = "Watermark=0: jumps to tip like skip-sync-newest"
return _to_result("sync-oldest-start-now (wm=0)", result, note)
def demonstrate_fail(
algorand: AlgorandClient,
sender_addr: str,
old_watermark: int,
small_max: int,
tip_round: int,
) -> BehaviourResult:
"""Step 7: fail — throws when gap exceeds maxRoundsToSync."""
print_step(
7,
"fail — throws when gap between watermark and tip exceeds maxRoundsToSync",
)
fail_error: Exception | None = None
try:
poll_with_behaviour(algorand, sender_addr, old_watermark, "fail", small_max)
except ValueError as err:
fail_error = err
fail_br = BehaviourResult(
name="fail",
synced_round_range=(0, 0),
current_round=tip_round,
starting_watermark=old_watermark,
new_watermark=old_watermark,
txn_count=0,
note="",
)
if fail_error:
print_info(f"Error thrown: {fail_error}")
print("")
print(
" Explanation: fail throws an error when currentRound - watermark > maxRoundsToSync."
)
print(" This is useful for strict deployments where falling behind is unacceptable.")
print(" The operator must investigate why the subscriber fell behind.")
print_success("fail behaviour demonstrated (error thrown as expected)")
fail_br.note = "Throws error — gap too large"
return fail_br
print_info("No error: Gap was within maxRoundsToSync, no error thrown")
fail_br.note = "No error — gap within threshold"
return fail_br
def demonstrate_max_rounds_comparison(
algorand: AlgorandClient,
sender_addr: str,
old_watermark: int,
small_max: int,
large_max: int,
) -> tuple[TransactionSubscriptionResult, TransactionSubscriptionResult]:
"""Step 8: compare small vs large maxRoundsToSync."""
print_step(8, "maxRoundsToSync effect — compare different values")
small_result = poll_with_behaviour(
algorand, sender_addr, old_watermark, "sync-oldest", small_max
)
large_result = poll_with_behaviour(
algorand, sender_addr, old_watermark, "sync-oldest", large_max
)
sr = small_result.synced_round_range
lr = large_result.synced_round_range
s_txns = len(small_result.subscribed_transactions)
l_txns = len(large_result.subscribed_transactions)
print_info(f"sync-oldest maxRoundsToSync={small_max}: range=[{sr[0]}, {sr[1]}], txns={s_txns}")
print_info(f"sync-oldest maxRoundsToSync={large_max}: range=[{lr[0]}, {lr[1]}], txns={l_txns}")
print("")
print(" With a small maxRoundsToSync, sync-oldest only processes a few rounds per poll.")
print(
" With a large maxRoundsToSync "
"(or when gap < maxRoundsToSync), "
"it processes all rounds to the tip."
)
print_success("maxRoundsToSync comparison demonstrated")
return small_result, large_result
def print_comparison_table(
results: list[BehaviourResult],
small_result: TransactionSubscriptionResult,
large_result: TransactionSubscriptionResult,
small_max: int,
large_max: int,
) -> None:
"""Step 9: Print comparison table."""
print_step(9, "Comparison table")
print("")
print(_table_border("┌"))
print(
_table_row("Behaviour", "syncedRoundRange", 0, "Note").replace(
f" {str(0).ljust(_TXNS_W)} ",
f" {'Txn Count'.ljust(_TXNS_W)} ",
)
)
print(_table_border("├"))
for r in results:
if r.name == "fail":
rng = "N/A (error thrown)"
else:
sr = r.synced_round_range
rng = f"[{sr[0]}, {sr[1]}]"
print(_table_row(r.name, rng, r.txn_count, r.note))
# maxRoundsToSync comparison rows
sr = small_result.synced_round_range
lr = large_result.synced_round_range
s_txns = len(small_result.subscribed_transactions)
l_txns = len(large_result.subscribed_transactions)
print(_table_border("├"))
print(
_table_row(
f"sync-oldest (max={small_max})",
f"[{sr[0]}, {sr[1]}]",
s_txns,
f"Limited to {small_max} rounds",
)
)
print(
_table_row(
f"sync-oldest (max={large_max})",
f"[{lr[0]}, {lr[1]}]",
l_txns,
"Syncs all rounds to tip",
)
)
print(_table_border("└"))
print("")
def _print_summary_box() -> None:
"""Step 10: Print summary explanation box."""
w = 65
print_step(10, "Summary")
print("")
print(f" ┌{'─' * w}┐")
print(f" │{' Sync Behaviour Guide':<{w}}│")
print(f" ├{'─' * w}┤")
lines = [
"",
" sync-oldest:",
" Processes rounds incrementally from watermark forward.",
" Safe for catching up. Requires archival node for old data.",
"",
" skip-sync-newest:",
" Jumps to the tip, discards old history.",
" Best for real-time notifications only.",
"",
" sync-oldest-start-now:",
" Hybrid — skips history on first start (wm=0), then catches",
" up incrementally like sync-oldest afterward.",
"",
" fail:",
" Throws if too far behind. Forces operator intervention.",
"",
" maxRoundsToSync (default 500):",
" Controls rounds per poll. Affects staleness tolerance for",
" skip-sync-newest/fail, and catchup speed for sync-oldest.",
"",
]
for line in lines:
print(f" │{line:<{w}}│")
print(f" └{'─' * w}┘")
print("")
def verify_results(
results: list[BehaviourResult],
old_watermark: int,
) -> None:
"""Verify each behaviour produced expected round ranges."""
sync_oldest = results[0]
skip_newest = results[1]
start_now = results[2]
fail_result = results[3]
# sync-oldest should start from watermark+1
assert sync_oldest.synced_round_range[0] == old_watermark + 1, (
f"sync-oldest should start at watermark+1="
f"{old_watermark + 1}, "
f"got {sync_oldest.synced_round_range[0]}"
)
# skip-sync-newest should start near the tip
assert skip_newest.synced_round_range[0] > old_watermark + 1, (
f"skip-sync-newest should start well past watermark, "
f"got {skip_newest.synced_round_range[0]}"
)
# sync-oldest-start-now with wm=0 should also start near tip
assert start_now.synced_round_range[0] > old_watermark + 1, (
f"sync-oldest-start-now (wm=0) should start near tip, "
f"got {start_now.synced_round_range[0]}"
)
# fail should have 0 txns (errored out)
assert fail_result.txn_count == 0, f"fail should have 0 txns, got {fail_result.txn_count}"
def main() -> None:
print_header("12 — Sync Behaviours")
# Step 1: Connect to LocalNet
print_step(1, "Connect to LocalNet")
algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG)
status = algorand.client.algod.status()
print_info(f"Current round: {status.last_round}")
print_success("Connected to LocalNet")
# Step 2: Create and fund accounts
print_step(2, "Create and fund accounts")
dispenser = algorand.account.localnet_dispenser().addr
sender = algorand.account.random().addr
receiver = algorand.account.random().addr
algorand.send.payment(
PaymentParams(
sender=dispenser,
receiver=sender,
amount=AlgoAmount(algo=100),
)
)
algorand.send.payment(
PaymentParams(
sender=dispenser,
receiver=receiver,
amount=AlgoAmount(algo=10),
)
)
print_success("Accounts created and funded")
# Step 3: Send 6 transactions to create round history
print_step(3, "Send 6 transactions to create round history")
txn_rounds: list[int] = []
for i in range(1, 7):
result = algorand.send.payment(
PaymentParams(
sender=sender,
receiver=receiver,
amount=AlgoAmount(algo=1),
note=f"sync-test-{i}".encode(),
)
)
assert result.confirmation is not None
rnd = result.confirmation.confirmed_round
assert rnd is not None
txn_rounds.append(rnd)
print_info(f"Txn {i}: round {rnd}")
first_txn_round = txn_rounds[0]
last_txn_round = txn_rounds[-1]
print_info(f"Transaction round range: {first_txn_round} to {last_txn_round}")
print_success("6 transactions sent")
tip_round = algorand.client.algod.status().last_round
print_info(f"Current tip: {tip_round}")
_run_demonstrations(algorand, sender, first_txn_round, tip_round)
print_header("Example complete")
def _run_demonstrations(
algorand: AlgorandClient,
sender: str,
first_txn_round: int,
tip_round: int,
) -> None:
"""Run all sync behaviour demonstrations and print results."""
old_watermark = first_txn_round - 1
small_max = 3
large_max = 500
results: list[BehaviourResult] = []
# Step 4-7: Demonstrate each behaviour
results.append(demonstrate_sync_oldest(algorand, sender, old_watermark, small_max))
results.append(demonstrate_skip_sync_newest(algorand, sender, old_watermark, small_max))
results.append(demonstrate_start_now(algorand, sender, small_max))
results.append(demonstrate_fail(algorand, sender, old_watermark, small_max, tip_round))
# Step 8: maxRoundsToSync comparison
small_result, large_result = demonstrate_max_rounds_comparison(
algorand, sender, old_watermark, small_max, large_max
)
# Verify expected round ranges
verify_results(results, old_watermark)
# Step 9: Comparison table
print_comparison_table(results, small_result, large_result, small_max, large_max)
# Step 10: Summary
_print_summary_box()
if __name__ == "__main__":
main()