Lifecycle Hooks & Error Handling
Description
Section titled “Description”Demonstrates lifecycle hooks and retry patterns.
- Hook execution order: on_before_poll -> processing -> on_poll -> inspect
- start(inspect) callback in continuous polling
- Error recovery with on_error and retry logic
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscriberuv run python 15_lifecycle_hooks.py"""Example 15: Lifecycle Hooks & Error Handling
Demonstrates lifecycle hooks and retry patterns.- Hook execution order: on_before_poll -> processing -> on_poll -> inspect- start(inspect) callback in continuous polling- Error recovery with on_error and retry logic
Prerequisites:- LocalNet running (via `algokit localnet start`)"""
import threadingimport time
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParamsfrom shared import ( ALGOD_CONFIG, KMD_CONFIG, print_header, print_info, print_step, print_success, shorten_address,)
from algokit_subscriber import ( AlgorandSubscriber, AlgorandSubscriberConfig, SubscriberConfigFilter, TransactionFilter, WatermarkPersistence,)from algokit_subscriber.types.subscription import ( BeforePollMetadata, TransactionSubscriptionResult,)
MAX_RETRIES = 3
def part_a(algorand: AlgorandClient, sender: str, watermark_a: int) -> int: """Part A: Hook execution order with poll_once().""" timeline: list[str] = [] watermark = watermark_a
def get_wm() -> int: return watermark
def set_wm(w: int) -> None: nonlocal watermark watermark = w
subscriber_a = AlgorandSubscriber( config=AlgorandSubscriberConfig( filters=[ SubscriberConfigFilter( name="payments", filter=TransactionFilter(sender=sender), ) ], frequency_in_seconds=1, wait_for_block_when_at_tip=False, sync_behaviour="sync-oldest", watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm), ), algod_client=algorand.client.algod, )
# Register lifecycle hooks subscriber_a.on_before_poll( lambda metadata, _: timeline.append( f"on_before_poll(watermark={metadata.watermark}," f" current_round={metadata.current_round})" ) )
subscriber_a.on("payments", lambda txn, _: timeline.append(f'on("payments") — txn {txn.id_}'))
subscriber_a.on_poll( lambda result, _: timeline.append( f"on_poll(txns={len(result.subscribed_transactions)}," f" rounds=[{result.synced_round_range[0]}," f" {result.synced_round_range[1]}])" ) )
print_info('Hooks registered: on_before_poll, on("payments"), on_poll')
# Execute a single poll poll_result = subscriber_a.poll_once() print_info(f"Poll matched: {len(poll_result.subscribed_transactions)} transaction(s)")
# Print the timeline print_success("Hook execution order:") for i, entry in enumerate(timeline): print_info(f" {i + 1}: {entry}")
# Verify order: on_before_poll -> on("payments") -> on_poll assert len(timeline) >= 3, f"Expected at least 3 timeline entries, got {len(timeline)}" assert timeline[0].startswith("on_before_poll"), ( f"Expected first hook to be on_before_poll, got: {timeline[0]}" ) assert timeline[1].startswith('on("payments")'), ( f'Expected second hook to be on("payments"), got: {timeline[1]}' ) assert timeline[-1].startswith("on_poll"), ( f"Expected last hook to be on_poll, got: {timeline[-1]}" ) print_success("Order verified: on_before_poll -> [transaction processing] -> on_poll") return watermark
def part_b( algorand: AlgorandClient, sender: str, watermark_b: int,) -> int: """Part B: start(inspect) callback in continuous loop.""" watermark = watermark_b timeline: list[str] = []
def get_wm() -> int: return watermark
def set_wm(w: int) -> None: nonlocal watermark watermark = w
subscriber_b = AlgorandSubscriber( config=AlgorandSubscriberConfig( filters=[ SubscriberConfigFilter( name="payments", filter=TransactionFilter(sender=sender), ) ], frequency_in_seconds=1, wait_for_block_when_at_tip=True, sync_behaviour="sync-oldest", watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm), ), algod_client=algorand.client.algod, )
subscriber_b.on_before_poll( lambda metadata, _: timeline.append( f"on_before_poll(watermark={metadata.watermark}," f" current_round={metadata.current_round})" ) )
subscriber_b.on("payments", lambda txn, _: timeline.append(f'on("payments") — txn {txn.id_}'))
subscriber_b.on_poll( lambda result, _: timeline.append( f"on_poll(txns={len(result.subscribed_transactions)}," f" rounds=[{result.synced_round_range[0]}," f" {result.synced_round_range[1]}])" ) )
def inspect_cb(result: TransactionSubscriptionResult) -> None: timeline.append( f"inspect(txns={len(result.subscribed_transactions)}," f" new_watermark={result.new_watermark})" )
# start() blocks in Python, run in background thread thread = threading.Thread( target=subscriber_b.start, kwargs={"inspect": inspect_cb, "suppress_log": True}, daemon=True, ) thread.start()
# Wait for subscriber to catch up then stop time.sleep(3) subscriber_b.stop("part-b-done") thread.join(timeout=5)
print_success("Timeline with start(inspect):") for i, entry in enumerate(timeline): print_info(f" {i + 1}: {entry}")
# Verify inspect appears after on_poll poll_idx = next( (i for i, e in enumerate(timeline) if e.startswith("on_poll")), -1, ) inspect_idx = next( (i for i, e in enumerate(timeline) if e.startswith("inspect")), -1, ) assert poll_idx != -1, "Expected on_poll entry in timeline" assert inspect_idx != -1, "Expected inspect entry in timeline" assert inspect_idx > poll_idx, ( f"Expected inspect (idx={inspect_idx}) after on_poll (idx={poll_idx})" ) print_success( "Order verified: on_before_poll -> [transaction processing] -> on_poll -> inspect" ) return watermark
def part_c( algorand: AlgorandClient, sender: str, watermark_c: int,) -> tuple[int, int, int]: """Part C: Error recovery with on_error + retry logic.""" watermark = watermark_c retry_count = 0 errors_caught = 0 successful_polls = 0 poll_number = 0 error_timeline: list[str] = []
def get_wm() -> int: return watermark
def set_wm(w: int) -> None: nonlocal watermark watermark = w
subscriber_c = AlgorandSubscriber( config=AlgorandSubscriberConfig( filters=[ SubscriberConfigFilter( name="payments", filter=TransactionFilter(sender=sender), ) ], frequency_in_seconds=1, wait_for_block_when_at_tip=True, sync_behaviour="sync-oldest-start-now", watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm), ), algod_client=algorand.client.algod, )
def before_poll_c(_metadata: BeforePollMetadata, _ev: str) -> None: nonlocal poll_number poll_number += 1 error_timeline.append(f"on_before_poll (poll #{poll_number})")
def on_poll_c( _result: TransactionSubscriptionResult, _ev: str, ) -> None: nonlocal successful_polls # Simulate an error on the first poll if poll_number == 1: error_timeline.append("on_poll — throwing simulated error!") raise RuntimeError("Simulated processing error") successful_polls += 1 error_timeline.append(f"on_poll — success (poll #{poll_number})")
def on_error_c(error: Exception, _ev: str) -> None: nonlocal retry_count, errors_caught errors_caught += 1 retry_count += 1 message = str(error) error_timeline.append( f'on_error — caught: "{message}" (retry {retry_count}/{MAX_RETRIES})' ) if retry_count > MAX_RETRIES: error_timeline.append("on_error — max retries exceeded, stopping") subscriber_c.stop("max retries exceeded") return # In Python start() blocks and the loop retries automatically, # unlike TS where start() is non-blocking and must be re-called. error_timeline.append("on_error — retrying (loop continues)")
subscriber_c.on_before_poll(before_poll_c) subscriber_c.on_poll(on_poll_c) subscriber_c.on_error(on_error_c)
print_info("Starting subscriber: will throw on first poll, then recover")
# start() blocks in Python, run in background thread thread = threading.Thread( target=subscriber_c.start, kwargs={"suppress_log": True}, daemon=True, ) thread.start()
# Wait for error + recovery + successful poll time.sleep(5) subscriber_c.stop("part-c-done") thread.join(timeout=5)
print_success("Error recovery timeline:") for i, entry in enumerate(error_timeline): print_info(f" {i + 1}: {entry}")
print_info(f"Errors caught: {errors_caught}") print_info(f"Retries used: {retry_count}") print_info(f"Successful polls after recovery: {successful_polls}")
return errors_caught, retry_count, successful_polls
def main() -> None: print_header("15 — Lifecycle Hooks & Error Handling")
# Step 1: Connect to LocalNet print_step(1, "Connect to LocalNet") algorand = AlgorandClient.from_config( algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG, ) status = algorand.client.algod.status() print_info(f"Current round: {status.last_round}") print_success("Connected to LocalNet")
# Step 2: Create and fund sender account print_step(2, "Create and fund sender account") dispenser = algorand.account.localnet_dispenser().addr sender = algorand.account.random().addr algorand.send.payment( PaymentParams( sender=dispenser, receiver=sender, amount=AlgoAmount(algo=10), ) ) print_info(f"Sender: {shorten_address(sender)}")
# Part A: Hook execution order with poll_once() print_step(3, "Part A — Hook execution order (poll_once)")
# Send a transaction so we have something to match txn1 = algorand.send.payment( PaymentParams( sender=sender, receiver=sender, amount=AlgoAmount(algo=1), note=b"lifecycle txn 1", ) ) first_round = txn1.confirmation.confirmed_round assert first_round is not None print_info(f"Sent txn: {txn1.tx_id}")
watermark_a = first_round - 1 watermark_a = part_a(algorand, sender, watermark_a)
# Part B: start(inspect) callback print_step(4, "Part B — start(inspect) callback")
# Send 2 more transactions for i in range(2, 4): algorand.send.payment( PaymentParams( sender=sender, receiver=sender, amount=AlgoAmount(algo=1), note=f"lifecycle txn {i}".encode(), ) ) print_info("Sent: 2 more transactions")
watermark_b = part_b(algorand, sender, watermark_a)
# Part C: Error recovery with on_error print_step(5, "Part C — Error recovery with on_error")
# Send a transaction so there's something to process algorand.send.payment( PaymentParams( sender=sender, receiver=sender, amount=AlgoAmount(algo=1), note=b"lifecycle error test", ) )
errors_caught, _retry_count, successful_polls = part_c( algorand, sender, watermark_b, )
assert errors_caught >= 1, f"Expected at least 1 error caught, got {errors_caught}" assert successful_polls >= 1, ( f"Expected at least 1 successful poll after recovery, got {successful_polls}" ) print_success("Error recovery verified: error -> on_error -> retry -> success")
# Step 6: Summary print_step(6, "Summary")
print_success("Lifecycle hook execution order:") print_info( " 1: on_before_poll(metadata) — before each poll, receives { watermark, current_round }" ) print_info(" 2: [transaction processing] — filter matching, mapper, on(), on_batch()") print_info( " 3: on_poll(result) — after processing, receives TransactionSubscriptionResult" ) print_info( " 4: inspect(result) — in start() loop only, after on_poll, same result object" )
print_success("Error handling:") print_info(" -: on_error(error) replaces default throw-on-error behavior") print_info(" -: In Python, start() loop retries automatically after on_error returns") print_info(f" -: Demonstrated retry up to {MAX_RETRIES} times before giving up")
print_header("Example complete")
if __name__ == "__main__": main()Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling