Skip to content

Continuous Subscriber

← Back to Examples

Demonstrates continuous polling with start/stop and event handlers.

  • Create a subscriber with frequency_in_seconds and wait_for_block_when_at_tip
  • Register event handlers with subscriber.on()
  • Start continuous polling with subscriber.start()
  • Graceful shutdown with signal handlers
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
uv run python 02_continuous_subscriber.py

View source on GitHub

02_continuous_subscriber.py
"""Example 02: Continuous Subscriber
Demonstrates continuous polling with start/stop and event handlers.
- Create a subscriber with frequency_in_seconds and wait_for_block_when_at_tip
- Register event handlers with subscriber.on()
- Start continuous polling with subscriber.start()
- Graceful shutdown with signal handlers
Prerequisites:
- LocalNet running (via `algokit localnet start`)
"""
import signal
import sys
import threading
import time
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParams
from shared import (
ALGOD_CONFIG,
KMD_CONFIG,
print_header,
print_info,
print_step,
print_success,
)
from algokit_subscriber import (
AlgorandSubscriber,
AlgorandSubscriberConfig,
SubscriberConfigFilter,
TransactionFilter,
in_memory_watermark,
)
def main() -> None:
print_header("02 — Continuous Subscriber")
# Step 1: Connect to LocalNet
print_step(1, "Connect to LocalNet")
algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG)
print_info(f"Current round: {algorand.client.algod.status().last_round}")
print_success("Connected to LocalNet")
# Step 2: Create and fund sender account
print_step(2, "Create and fund sender account")
dispenser = algorand.account.localnet_dispenser().addr
sender = algorand.account.random().addr
algorand.send.payment(
PaymentParams(sender=dispenser, receiver=sender, amount=AlgoAmount(algo=10))
)
print_info(f"Sender: {sender}")
# Step 3: Create AlgorandSubscriber
print_step(3, "Create AlgorandSubscriber")
subscriber = AlgorandSubscriber(
config=AlgorandSubscriberConfig(
filters=[
SubscriberConfigFilter(
name="payments",
filter=TransactionFilter(sender=sender),
)
],
frequency_in_seconds=1,
wait_for_block_when_at_tip=True,
sync_behaviour="sync-oldest-start-now",
watermark_persistence=in_memory_watermark(0),
),
algod_client=algorand.client.algod,
)
print_success("Subscriber created (freq=1s, sync-oldest-start-now)")
# Step 4: Register event handlers
print_step(4, "Register event handlers")
matched_txns: list[str] = []
def on_payment(txn: object, _event_name: str) -> None:
matched_txns.append(txn.id_) # type: ignore[attr-defined]
print_info(f"Matched payment: {txn.id_}") # type: ignore[attr-defined]
subscriber.on("payments", on_payment)
print_success('Registered on("payments") listener')
# Step 5: Register SIGINT/SIGTERM handlers for graceful shutdown
print_step(5, "Register signal handlers")
def handle_signal(signum: int, _frame: object) -> None:
sig_name = signal.Signals(signum).name
print_info(f"Signal received: {sig_name}")
subscriber.stop(sig_name)
sys.exit(0)
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, handle_signal)
print_success("Registered SIGINT and SIGTERM handlers")
# Step 6: Start continuous subscriber
print_step(6, "Start continuous subscriber")
def run_subscriber() -> None:
subscriber.start(
inspect=lambda result: print_info(
f"Poll: round range "
f"[{result.synced_round_range[0]}, "
f"{result.synced_round_range[1]}] — "
f"{len(result.subscribed_transactions)} matched, "
f"watermark {result.new_watermark}"
),
suppress_log=True,
)
subscriber_thread = threading.Thread(target=run_subscriber, daemon=True)
subscriber_thread.start()
print_success("Subscriber started")
# Step 7: Send 3 payment transactions with short delays
print_step(7, "Send 3 payment transactions")
for i in range(1, 4):
result = algorand.send.payment(
PaymentParams(
sender=sender,
receiver=sender,
amount=AlgoAmount(algo=1),
note=f"continuous txn {i}".encode(),
)
)
round_num = result.confirmation.confirmed_round
print_info(f"Txn {i}: {result.tx_id} (round {round_num})")
time.sleep(0.5)
print_success("Sent 3 payment transactions")
# Step 8: Wait for subscriber to catch up, then stop
print_step(8, "Wait for subscriber to catch up, then stop")
time.sleep(4)
print_info("Auto-stop: stopping after ~4 seconds")
subscriber.stop("example-done")
subscriber_thread.join(timeout=5)
# Step 9: Verify matched transactions
print_step(9, "Verify matched transactions")
assert len(matched_txns) >= 3, (
f"Expected at least 3 matched transactions, got {len(matched_txns)}"
)
print_success(f"{len(matched_txns)} transactions matched (>= 3)")
for txn_id in matched_txns:
print_info(f"Matched txn: {txn_id}")
print_header("Example complete")
if __name__ == "__main__":
main()