Sync Behaviours
Description
Section titled “Description”Demonstrates all 4 sync behaviours and maxRoundsToSync comparison:
- sync-oldest: incremental catchup from watermark
- skip-sync-newest: jump to tip, discard history
- sync-oldest-start-now: hybrid first-start behavior
- fail: throw when too far behind
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscriberuv run python 12_sync_behaviours.py"""Example 12: Sync Behaviours
Demonstrates all 4 sync behaviours and maxRoundsToSync comparison:- sync-oldest: incremental catchup from watermark- skip-sync-newest: jump to tip, discard history- sync-oldest-start-now: hybrid first-start behavior- fail: throw when too far behind
Prerequisites:- LocalNet running (via `algokit localnet start`)"""
from dataclasses import dataclass
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParamsfrom shared import ( ALGOD_CONFIG, KMD_CONFIG, print_header, print_info, print_step, print_success,)
from algokit_subscriber import ( AlgorandSubscriber, AlgorandSubscriberConfig, SubscriberConfigFilter, TransactionFilter, WatermarkPersistence,)from algokit_subscriber.types.subscription import ( SyncBehaviour, TransactionSubscriptionResult,)
# Table column widths_NAME_W = 32_RANGE_W = 23_TXNS_W = 12_NOTE_W = 42
@dataclassclass BehaviourResult: """Outcome of a single sync-behaviour demonstration."""
name: str synced_round_range: tuple[int, int] current_round: int starting_watermark: int new_watermark: int txn_count: int note: str
def poll_with_behaviour( algorand: AlgorandClient, sender_addr: str, watermark: int, sync_behaviour: SyncBehaviour, max_rounds: int,) -> TransactionSubscriptionResult: """Create a subscriber with given sync behaviour and poll once.""" wm = watermark
def get_wm() -> int: return wm
def set_wm(w: int) -> None: nonlocal wm wm = w
subscriber = AlgorandSubscriber( config=AlgorandSubscriberConfig( filters=[ SubscriberConfigFilter( name="payments", filter=TransactionFilter(sender=sender_addr), ), ], sync_behaviour=sync_behaviour, max_rounds_to_sync=max_rounds, watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm), ), algod_client=algorand.client.algod, ) return subscriber.poll_once()
def _print_poll_details( result: TransactionSubscriptionResult, watermark: int, max_rounds: int,) -> None: """Print common details for a poll result.""" print_info(f"Watermark: {watermark}") print_info(f"maxRoundsToSync: {max_rounds}") sr = result.synced_round_range print_info(f"syncedRoundRange: [{sr[0]}, {sr[1]}]") print_info(f"currentRound (tip): {result.current_round}") txn_count = len(result.subscribed_transactions) print_info(f"Transactions matched: {txn_count}")
def _to_result( name: str, result: TransactionSubscriptionResult, note: str,) -> BehaviourResult: """Convert a TransactionSubscriptionResult to BehaviourResult.""" return BehaviourResult( name=name, synced_round_range=result.synced_round_range, current_round=result.current_round, starting_watermark=result.starting_watermark, new_watermark=result.new_watermark, txn_count=len(result.subscribed_transactions), note=note, )
def _table_row( name: str, rng: str, txns: int, note: str,) -> str: """Format a single table row.""" return ( f" │ {name.ljust(_NAME_W)} │ " f"{rng.ljust(_RANGE_W)} │ " f"{str(txns).ljust(_TXNS_W)} │ " f"{note.ljust(_NOTE_W)} │" )
def _table_border(ch: str) -> str: """Build a horizontal table border (ch = ┌├└).""" sep = {"┌": ("┬", "┐"), "├": ("┼", "┤"), "└": ("┴", "┘")} mid, end = sep[ch] return ( f" {ch}{'─' * (_NAME_W + 2)}{mid}" f"{'─' * (_RANGE_W + 2)}{mid}" f"{'─' * (_TXNS_W + 2)}{mid}" f"{'─' * (_NOTE_W + 2)}{end}" )
def demonstrate_sync_oldest( algorand: AlgorandClient, sender_addr: str, old_watermark: int, small_max: int,) -> BehaviourResult: """Step 4: sync-oldest — starts from watermark, syncs forward.""" print_step( 4, "sync-oldest — starts from watermark, syncs forward (limited by maxRoundsToSync)", ) result = poll_with_behaviour(algorand, sender_addr, old_watermark, "sync-oldest", small_max) _print_poll_details(result, old_watermark, small_max) print("") print(" Explanation: sync-oldest starts from watermark+1 and syncs forward") print(f" only {small_max} rounds. It does NOT jump to the tip. This is useful for") print(" incremental catchup — each poll processes a batch of rounds.") print_success("sync-oldest demonstrated") note = f"Syncs {small_max} rounds from watermark" return _to_result("sync-oldest", result, note)
def demonstrate_skip_sync_newest( algorand: AlgorandClient, sender_addr: str, old_watermark: int, small_max: int,) -> BehaviourResult: """Step 5: skip-sync-newest — jumps to tip.""" print_step(5, "skip-sync-newest — jumps to tip, only sees latest rounds") result = poll_with_behaviour( algorand, sender_addr, old_watermark, "skip-sync-newest", small_max, ) _print_poll_details(result, old_watermark, small_max) print("") print(" Explanation: skip-sync-newest jumps to currentRound - maxRoundsToSync + 1.") print(" It discards all old history and only sees the newest rounds. Useful for") print(" real-time notifications where you don't care about catching up.") print_success("skip-sync-newest demonstrated") note = f"Jumps to tip, scans last {small_max} rounds" return _to_result("skip-sync-newest", result, note)
def demonstrate_start_now( algorand: AlgorandClient, sender_addr: str, small_max: int,) -> BehaviourResult: """Step 6: sync-oldest-start-now — wm=0 starts from tip.""" print_step( 6, "sync-oldest-start-now — when watermark=0, starts from current round (not round 1)", ) result = poll_with_behaviour(algorand, sender_addr, 0, "sync-oldest-start-now", small_max) print_info("Watermark: 0 (fresh start)") print_info(f"maxRoundsToSync: {small_max}") sr = result.synced_round_range print_info(f"syncedRoundRange: [{sr[0]}, {sr[1]}]") print_info(f"currentRound (tip): {result.current_round}") txn_count = len(result.subscribed_transactions) print_info(f"Transactions matched: {txn_count}") print("") print(" Explanation: When watermark=0, sync-oldest-start-now behaves like") print(" skip-sync-newest — it jumps to the tip instead of syncing from round 1.") print(" This avoids scanning the entire chain history on first startup.") print(" Once watermark > 0 (after first poll), it behaves like sync-oldest.") print_success("sync-oldest-start-now demonstrated") note = "Watermark=0: jumps to tip like skip-sync-newest" return _to_result("sync-oldest-start-now (wm=0)", result, note)
def demonstrate_fail( algorand: AlgorandClient, sender_addr: str, old_watermark: int, small_max: int, tip_round: int,) -> BehaviourResult: """Step 7: fail — throws when gap exceeds maxRoundsToSync.""" print_step( 7, "fail — throws when gap between watermark and tip exceeds maxRoundsToSync", ) fail_error: Exception | None = None try: poll_with_behaviour(algorand, sender_addr, old_watermark, "fail", small_max) except ValueError as err: fail_error = err
fail_br = BehaviourResult( name="fail", synced_round_range=(0, 0), current_round=tip_round, starting_watermark=old_watermark, new_watermark=old_watermark, txn_count=0, note="", )
if fail_error: print_info(f"Error thrown: {fail_error}") print("") print( " Explanation: fail throws an error when currentRound - watermark > maxRoundsToSync." ) print(" This is useful for strict deployments where falling behind is unacceptable.") print(" The operator must investigate why the subscriber fell behind.") print_success("fail behaviour demonstrated (error thrown as expected)") fail_br.note = "Throws error — gap too large" return fail_br
print_info("No error: Gap was within maxRoundsToSync, no error thrown") fail_br.note = "No error — gap within threshold" return fail_br
def demonstrate_max_rounds_comparison( algorand: AlgorandClient, sender_addr: str, old_watermark: int, small_max: int, large_max: int,) -> tuple[TransactionSubscriptionResult, TransactionSubscriptionResult]: """Step 8: compare small vs large maxRoundsToSync.""" print_step(8, "maxRoundsToSync effect — compare different values") small_result = poll_with_behaviour( algorand, sender_addr, old_watermark, "sync-oldest", small_max ) large_result = poll_with_behaviour( algorand, sender_addr, old_watermark, "sync-oldest", large_max )
sr = small_result.synced_round_range lr = large_result.synced_round_range s_txns = len(small_result.subscribed_transactions) l_txns = len(large_result.subscribed_transactions) print_info(f"sync-oldest maxRoundsToSync={small_max}: range=[{sr[0]}, {sr[1]}], txns={s_txns}") print_info(f"sync-oldest maxRoundsToSync={large_max}: range=[{lr[0]}, {lr[1]}], txns={l_txns}") print("") print(" With a small maxRoundsToSync, sync-oldest only processes a few rounds per poll.") print( " With a large maxRoundsToSync " "(or when gap < maxRoundsToSync), " "it processes all rounds to the tip." ) print_success("maxRoundsToSync comparison demonstrated") return small_result, large_result
def print_comparison_table( results: list[BehaviourResult], small_result: TransactionSubscriptionResult, large_result: TransactionSubscriptionResult, small_max: int, large_max: int,) -> None: """Step 9: Print comparison table.""" print_step(9, "Comparison table") print("") print(_table_border("┌")) print( _table_row("Behaviour", "syncedRoundRange", 0, "Note").replace( f" {str(0).ljust(_TXNS_W)} ", f" {'Txn Count'.ljust(_TXNS_W)} ", ) ) print(_table_border("├"))
for r in results: if r.name == "fail": rng = "N/A (error thrown)" else: sr = r.synced_round_range rng = f"[{sr[0]}, {sr[1]}]" print(_table_row(r.name, rng, r.txn_count, r.note))
# maxRoundsToSync comparison rows sr = small_result.synced_round_range lr = large_result.synced_round_range s_txns = len(small_result.subscribed_transactions) l_txns = len(large_result.subscribed_transactions) print(_table_border("├")) print( _table_row( f"sync-oldest (max={small_max})", f"[{sr[0]}, {sr[1]}]", s_txns, f"Limited to {small_max} rounds", ) ) print( _table_row( f"sync-oldest (max={large_max})", f"[{lr[0]}, {lr[1]}]", l_txns, "Syncs all rounds to tip", ) ) print(_table_border("└")) print("")
def _print_summary_box() -> None: """Step 10: Print summary explanation box.""" w = 65 print_step(10, "Summary") print("") print(f" ┌{'─' * w}┐") print(f" │{' Sync Behaviour Guide':<{w}}│") print(f" ├{'─' * w}┤") lines = [ "", " sync-oldest:", " Processes rounds incrementally from watermark forward.", " Safe for catching up. Requires archival node for old data.", "", " skip-sync-newest:", " Jumps to the tip, discards old history.", " Best for real-time notifications only.", "", " sync-oldest-start-now:", " Hybrid — skips history on first start (wm=0), then catches", " up incrementally like sync-oldest afterward.", "", " fail:", " Throws if too far behind. Forces operator intervention.", "", " maxRoundsToSync (default 500):", " Controls rounds per poll. Affects staleness tolerance for", " skip-sync-newest/fail, and catchup speed for sync-oldest.", "", ] for line in lines: print(f" │{line:<{w}}│") print(f" └{'─' * w}┘") print("")
def verify_results( results: list[BehaviourResult], old_watermark: int,) -> None: """Verify each behaviour produced expected round ranges.""" sync_oldest = results[0] skip_newest = results[1] start_now = results[2] fail_result = results[3]
# sync-oldest should start from watermark+1 assert sync_oldest.synced_round_range[0] == old_watermark + 1, ( f"sync-oldest should start at watermark+1=" f"{old_watermark + 1}, " f"got {sync_oldest.synced_round_range[0]}" )
# skip-sync-newest should start near the tip assert skip_newest.synced_round_range[0] > old_watermark + 1, ( f"skip-sync-newest should start well past watermark, " f"got {skip_newest.synced_round_range[0]}" )
# sync-oldest-start-now with wm=0 should also start near tip assert start_now.synced_round_range[0] > old_watermark + 1, ( f"sync-oldest-start-now (wm=0) should start near tip, " f"got {start_now.synced_round_range[0]}" )
# fail should have 0 txns (errored out) assert fail_result.txn_count == 0, f"fail should have 0 txns, got {fail_result.txn_count}"
def main() -> None: print_header("12 — Sync Behaviours")
# Step 1: Connect to LocalNet print_step(1, "Connect to LocalNet") algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG) status = algorand.client.algod.status() print_info(f"Current round: {status.last_round}") print_success("Connected to LocalNet")
# Step 2: Create and fund accounts print_step(2, "Create and fund accounts") dispenser = algorand.account.localnet_dispenser().addr sender = algorand.account.random().addr receiver = algorand.account.random().addr algorand.send.payment( PaymentParams( sender=dispenser, receiver=sender, amount=AlgoAmount(algo=100), ) ) algorand.send.payment( PaymentParams( sender=dispenser, receiver=receiver, amount=AlgoAmount(algo=10), ) ) print_success("Accounts created and funded")
# Step 3: Send 6 transactions to create round history print_step(3, "Send 6 transactions to create round history") txn_rounds: list[int] = [] for i in range(1, 7): result = algorand.send.payment( PaymentParams( sender=sender, receiver=receiver, amount=AlgoAmount(algo=1), note=f"sync-test-{i}".encode(), ) ) assert result.confirmation is not None rnd = result.confirmation.confirmed_round assert rnd is not None txn_rounds.append(rnd) print_info(f"Txn {i}: round {rnd}")
first_txn_round = txn_rounds[0] last_txn_round = txn_rounds[-1] print_info(f"Transaction round range: {first_txn_round} to {last_txn_round}") print_success("6 transactions sent")
tip_round = algorand.client.algod.status().last_round print_info(f"Current tip: {tip_round}")
_run_demonstrations(algorand, sender, first_txn_round, tip_round)
print_header("Example complete")
def _run_demonstrations( algorand: AlgorandClient, sender: str, first_txn_round: int, tip_round: int,) -> None: """Run all sync behaviour demonstrations and print results.""" old_watermark = first_txn_round - 1 small_max = 3 large_max = 500
results: list[BehaviourResult] = []
# Step 4-7: Demonstrate each behaviour results.append(demonstrate_sync_oldest(algorand, sender, old_watermark, small_max)) results.append(demonstrate_skip_sync_newest(algorand, sender, old_watermark, small_max)) results.append(demonstrate_start_now(algorand, sender, small_max)) results.append(demonstrate_fail(algorand, sender, old_watermark, small_max, tip_round))
# Step 8: maxRoundsToSync comparison small_result, large_result = demonstrate_max_rounds_comparison( algorand, sender, old_watermark, small_max, large_max )
# Verify expected round ranges verify_results(results, old_watermark)
# Step 9: Comparison table print_comparison_table(results, small_result, large_result, small_max, large_max)
# Step 10: Summary _print_summary_box()
if __name__ == "__main__": main()Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling