Skip to content

Lifecycle Hooks & Error Handling

← Back to Examples

Demonstrates lifecycle hooks and retry patterns.

  • Hook execution order: on_before_poll -> processing -> on_poll -> inspect
  • start(inspect) callback in continuous polling
  • Error recovery with on_error and retry logic
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
uv run python 15_lifecycle_hooks.py

View source on GitHub

15_lifecycle_hooks.py
"""Example 15: Lifecycle Hooks & Error Handling
Demonstrates lifecycle hooks and retry patterns.
- Hook execution order: on_before_poll -> processing -> on_poll -> inspect
- start(inspect) callback in continuous polling
- Error recovery with on_error and retry logic
Prerequisites:
- LocalNet running (via `algokit localnet start`)
"""
import threading
import time
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParams
from shared import (
ALGOD_CONFIG,
KMD_CONFIG,
print_header,
print_info,
print_step,
print_success,
shorten_address,
)
from algokit_subscriber import (
AlgorandSubscriber,
AlgorandSubscriberConfig,
SubscriberConfigFilter,
TransactionFilter,
WatermarkPersistence,
)
from algokit_subscriber.types.subscription import (
BeforePollMetadata,
TransactionSubscriptionResult,
)
MAX_RETRIES = 3
def part_a(algorand: AlgorandClient, sender: str, watermark_a: int) -> int:
"""Part A: Hook execution order with poll_once()."""
timeline: list[str] = []
watermark = watermark_a
def get_wm() -> int:
return watermark
def set_wm(w: int) -> None:
nonlocal watermark
watermark = w
subscriber_a = AlgorandSubscriber(
config=AlgorandSubscriberConfig(
filters=[
SubscriberConfigFilter(
name="payments",
filter=TransactionFilter(sender=sender),
)
],
frequency_in_seconds=1,
wait_for_block_when_at_tip=False,
sync_behaviour="sync-oldest",
watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm),
),
algod_client=algorand.client.algod,
)
# Register lifecycle hooks
subscriber_a.on_before_poll(
lambda metadata, _: timeline.append(
f"on_before_poll(watermark={metadata.watermark},"
f" current_round={metadata.current_round})"
)
)
subscriber_a.on("payments", lambda txn, _: timeline.append(f'on("payments") — txn {txn.id_}'))
subscriber_a.on_poll(
lambda result, _: timeline.append(
f"on_poll(txns={len(result.subscribed_transactions)},"
f" rounds=[{result.synced_round_range[0]},"
f" {result.synced_round_range[1]}])"
)
)
print_info('Hooks registered: on_before_poll, on("payments"), on_poll')
# Execute a single poll
poll_result = subscriber_a.poll_once()
print_info(f"Poll matched: {len(poll_result.subscribed_transactions)} transaction(s)")
# Print the timeline
print_success("Hook execution order:")
for i, entry in enumerate(timeline):
print_info(f" {i + 1}: {entry}")
# Verify order: on_before_poll -> on("payments") -> on_poll
assert len(timeline) >= 3, f"Expected at least 3 timeline entries, got {len(timeline)}"
assert timeline[0].startswith("on_before_poll"), (
f"Expected first hook to be on_before_poll, got: {timeline[0]}"
)
assert timeline[1].startswith('on("payments")'), (
f'Expected second hook to be on("payments"), got: {timeline[1]}'
)
assert timeline[-1].startswith("on_poll"), (
f"Expected last hook to be on_poll, got: {timeline[-1]}"
)
print_success("Order verified: on_before_poll -> [transaction processing] -> on_poll")
return watermark
def part_b(
algorand: AlgorandClient,
sender: str,
watermark_b: int,
) -> int:
"""Part B: start(inspect) callback in continuous loop."""
watermark = watermark_b
timeline: list[str] = []
def get_wm() -> int:
return watermark
def set_wm(w: int) -> None:
nonlocal watermark
watermark = w
subscriber_b = AlgorandSubscriber(
config=AlgorandSubscriberConfig(
filters=[
SubscriberConfigFilter(
name="payments",
filter=TransactionFilter(sender=sender),
)
],
frequency_in_seconds=1,
wait_for_block_when_at_tip=True,
sync_behaviour="sync-oldest",
watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm),
),
algod_client=algorand.client.algod,
)
subscriber_b.on_before_poll(
lambda metadata, _: timeline.append(
f"on_before_poll(watermark={metadata.watermark},"
f" current_round={metadata.current_round})"
)
)
subscriber_b.on("payments", lambda txn, _: timeline.append(f'on("payments") — txn {txn.id_}'))
subscriber_b.on_poll(
lambda result, _: timeline.append(
f"on_poll(txns={len(result.subscribed_transactions)},"
f" rounds=[{result.synced_round_range[0]},"
f" {result.synced_round_range[1]}])"
)
)
def inspect_cb(result: TransactionSubscriptionResult) -> None:
timeline.append(
f"inspect(txns={len(result.subscribed_transactions)},"
f" new_watermark={result.new_watermark})"
)
# start() blocks in Python, run in background thread
thread = threading.Thread(
target=subscriber_b.start,
kwargs={"inspect": inspect_cb, "suppress_log": True},
daemon=True,
)
thread.start()
# Wait for subscriber to catch up then stop
time.sleep(3)
subscriber_b.stop("part-b-done")
thread.join(timeout=5)
print_success("Timeline with start(inspect):")
for i, entry in enumerate(timeline):
print_info(f" {i + 1}: {entry}")
# Verify inspect appears after on_poll
poll_idx = next(
(i for i, e in enumerate(timeline) if e.startswith("on_poll")),
-1,
)
inspect_idx = next(
(i for i, e in enumerate(timeline) if e.startswith("inspect")),
-1,
)
assert poll_idx != -1, "Expected on_poll entry in timeline"
assert inspect_idx != -1, "Expected inspect entry in timeline"
assert inspect_idx > poll_idx, (
f"Expected inspect (idx={inspect_idx}) after on_poll (idx={poll_idx})"
)
print_success(
"Order verified: on_before_poll -> [transaction processing] -> on_poll -> inspect"
)
return watermark
def part_c(
algorand: AlgorandClient,
sender: str,
watermark_c: int,
) -> tuple[int, int, int]:
"""Part C: Error recovery with on_error + retry logic."""
watermark = watermark_c
retry_count = 0
errors_caught = 0
successful_polls = 0
poll_number = 0
error_timeline: list[str] = []
def get_wm() -> int:
return watermark
def set_wm(w: int) -> None:
nonlocal watermark
watermark = w
subscriber_c = AlgorandSubscriber(
config=AlgorandSubscriberConfig(
filters=[
SubscriberConfigFilter(
name="payments",
filter=TransactionFilter(sender=sender),
)
],
frequency_in_seconds=1,
wait_for_block_when_at_tip=True,
sync_behaviour="sync-oldest-start-now",
watermark_persistence=WatermarkPersistence(get=get_wm, set=set_wm),
),
algod_client=algorand.client.algod,
)
def before_poll_c(_metadata: BeforePollMetadata, _ev: str) -> None:
nonlocal poll_number
poll_number += 1
error_timeline.append(f"on_before_poll (poll #{poll_number})")
def on_poll_c(
_result: TransactionSubscriptionResult,
_ev: str,
) -> None:
nonlocal successful_polls
# Simulate an error on the first poll
if poll_number == 1:
error_timeline.append("on_poll — throwing simulated error!")
raise RuntimeError("Simulated processing error")
successful_polls += 1
error_timeline.append(f"on_poll — success (poll #{poll_number})")
def on_error_c(error: Exception, _ev: str) -> None:
nonlocal retry_count, errors_caught
errors_caught += 1
retry_count += 1
message = str(error)
error_timeline.append(
f'on_error — caught: "{message}" (retry {retry_count}/{MAX_RETRIES})'
)
if retry_count > MAX_RETRIES:
error_timeline.append("on_error — max retries exceeded, stopping")
subscriber_c.stop("max retries exceeded")
return
# In Python start() blocks and the loop retries automatically,
# unlike TS where start() is non-blocking and must be re-called.
error_timeline.append("on_error — retrying (loop continues)")
subscriber_c.on_before_poll(before_poll_c)
subscriber_c.on_poll(on_poll_c)
subscriber_c.on_error(on_error_c)
print_info("Starting subscriber: will throw on first poll, then recover")
# start() blocks in Python, run in background thread
thread = threading.Thread(
target=subscriber_c.start,
kwargs={"suppress_log": True},
daemon=True,
)
thread.start()
# Wait for error + recovery + successful poll
time.sleep(5)
subscriber_c.stop("part-c-done")
thread.join(timeout=5)
print_success("Error recovery timeline:")
for i, entry in enumerate(error_timeline):
print_info(f" {i + 1}: {entry}")
print_info(f"Errors caught: {errors_caught}")
print_info(f"Retries used: {retry_count}")
print_info(f"Successful polls after recovery: {successful_polls}")
return errors_caught, retry_count, successful_polls
def main() -> None:
print_header("15 — Lifecycle Hooks & Error Handling")
# Step 1: Connect to LocalNet
print_step(1, "Connect to LocalNet")
algorand = AlgorandClient.from_config(
algod_config=ALGOD_CONFIG,
kmd_config=KMD_CONFIG,
)
status = algorand.client.algod.status()
print_info(f"Current round: {status.last_round}")
print_success("Connected to LocalNet")
# Step 2: Create and fund sender account
print_step(2, "Create and fund sender account")
dispenser = algorand.account.localnet_dispenser().addr
sender = algorand.account.random().addr
algorand.send.payment(
PaymentParams(
sender=dispenser,
receiver=sender,
amount=AlgoAmount(algo=10),
)
)
print_info(f"Sender: {shorten_address(sender)}")
# Part A: Hook execution order with poll_once()
print_step(3, "Part A — Hook execution order (poll_once)")
# Send a transaction so we have something to match
txn1 = algorand.send.payment(
PaymentParams(
sender=sender,
receiver=sender,
amount=AlgoAmount(algo=1),
note=b"lifecycle txn 1",
)
)
first_round = txn1.confirmation.confirmed_round
assert first_round is not None
print_info(f"Sent txn: {txn1.tx_id}")
watermark_a = first_round - 1
watermark_a = part_a(algorand, sender, watermark_a)
# Part B: start(inspect) callback
print_step(4, "Part B — start(inspect) callback")
# Send 2 more transactions
for i in range(2, 4):
algorand.send.payment(
PaymentParams(
sender=sender,
receiver=sender,
amount=AlgoAmount(algo=1),
note=f"lifecycle txn {i}".encode(),
)
)
print_info("Sent: 2 more transactions")
watermark_b = part_b(algorand, sender, watermark_a)
# Part C: Error recovery with on_error
print_step(5, "Part C — Error recovery with on_error")
# Send a transaction so there's something to process
algorand.send.payment(
PaymentParams(
sender=sender,
receiver=sender,
amount=AlgoAmount(algo=1),
note=b"lifecycle error test",
)
)
errors_caught, _retry_count, successful_polls = part_c(
algorand,
sender,
watermark_b,
)
assert errors_caught >= 1, f"Expected at least 1 error caught, got {errors_caught}"
assert successful_polls >= 1, (
f"Expected at least 1 successful poll after recovery, got {successful_polls}"
)
print_success("Error recovery verified: error -> on_error -> retry -> success")
# Step 6: Summary
print_step(6, "Summary")
print_success("Lifecycle hook execution order:")
print_info(
" 1: on_before_poll(metadata) — before each poll, receives { watermark, current_round }"
)
print_info(" 2: [transaction processing] — filter matching, mapper, on(), on_batch()")
print_info(
" 3: on_poll(result) — after processing, receives TransactionSubscriptionResult"
)
print_info(
" 4: inspect(result) — in start() loop only, after on_poll, same result object"
)
print_success("Error handling:")
print_info(" -: on_error(error) replaces default throw-on-error behavior")
print_info(" -: In Python, start() loop retries automatically after on_error returns")
print_info(f" -: Demonstrated retry up to {MAX_RETRIES} times before giving up")
print_header("Example complete")
if __name__ == "__main__":
main()