Continuous Subscriber
Description
Section titled “Description”Demonstrates continuous polling with start/stop and event handlers.
- Create a subscriber with frequency_in_seconds and wait_for_block_when_at_tip
- Register event handlers with subscriber.on()
- Start continuous polling with subscriber.start()
- Graceful shutdown with signal handlers
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscriberuv run python 02_continuous_subscriber.py"""Example 02: Continuous Subscriber
Demonstrates continuous polling with start/stop and event handlers.- Create a subscriber with frequency_in_seconds and wait_for_block_when_at_tip- Register event handlers with subscriber.on()- Start continuous polling with subscriber.start()- Graceful shutdown with signal handlers
Prerequisites:- LocalNet running (via `algokit localnet start`)"""
import signalimport sysimport threadingimport time
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParamsfrom shared import ( ALGOD_CONFIG, KMD_CONFIG, print_header, print_info, print_step, print_success,)
from algokit_subscriber import ( AlgorandSubscriber, AlgorandSubscriberConfig, SubscriberConfigFilter, TransactionFilter, in_memory_watermark,)
def main() -> None: print_header("02 — Continuous Subscriber")
# Step 1: Connect to LocalNet print_step(1, "Connect to LocalNet") algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG) print_info(f"Current round: {algorand.client.algod.status().last_round}") print_success("Connected to LocalNet")
# Step 2: Create and fund sender account print_step(2, "Create and fund sender account") dispenser = algorand.account.localnet_dispenser().addr sender = algorand.account.random().addr algorand.send.payment( PaymentParams(sender=dispenser, receiver=sender, amount=AlgoAmount(algo=10)) ) print_info(f"Sender: {sender}")
# Step 3: Create AlgorandSubscriber print_step(3, "Create AlgorandSubscriber") subscriber = AlgorandSubscriber( config=AlgorandSubscriberConfig( filters=[ SubscriberConfigFilter( name="payments", filter=TransactionFilter(sender=sender), ) ], frequency_in_seconds=1, wait_for_block_when_at_tip=True, sync_behaviour="sync-oldest-start-now", watermark_persistence=in_memory_watermark(0), ), algod_client=algorand.client.algod, ) print_success("Subscriber created (freq=1s, sync-oldest-start-now)")
# Step 4: Register event handlers print_step(4, "Register event handlers") matched_txns: list[str] = []
def on_payment(txn: object, _event_name: str) -> None: matched_txns.append(txn.id_) # type: ignore[attr-defined] print_info(f"Matched payment: {txn.id_}") # type: ignore[attr-defined]
subscriber.on("payments", on_payment) print_success('Registered on("payments") listener')
# Step 5: Register SIGINT/SIGTERM handlers for graceful shutdown print_step(5, "Register signal handlers")
def handle_signal(signum: int, _frame: object) -> None: sig_name = signal.Signals(signum).name print_info(f"Signal received: {sig_name}") subscriber.stop(sig_name) sys.exit(0)
for sig in (signal.SIGINT, signal.SIGTERM): signal.signal(sig, handle_signal) print_success("Registered SIGINT and SIGTERM handlers")
# Step 6: Start continuous subscriber print_step(6, "Start continuous subscriber")
def run_subscriber() -> None: subscriber.start( inspect=lambda result: print_info( f"Poll: round range " f"[{result.synced_round_range[0]}, " f"{result.synced_round_range[1]}] — " f"{len(result.subscribed_transactions)} matched, " f"watermark {result.new_watermark}" ), suppress_log=True, )
subscriber_thread = threading.Thread(target=run_subscriber, daemon=True) subscriber_thread.start() print_success("Subscriber started")
# Step 7: Send 3 payment transactions with short delays print_step(7, "Send 3 payment transactions") for i in range(1, 4): result = algorand.send.payment( PaymentParams( sender=sender, receiver=sender, amount=AlgoAmount(algo=1), note=f"continuous txn {i}".encode(), ) ) round_num = result.confirmation.confirmed_round print_info(f"Txn {i}: {result.tx_id} (round {round_num})") time.sleep(0.5) print_success("Sent 3 payment transactions")
# Step 8: Wait for subscriber to catch up, then stop print_step(8, "Wait for subscriber to catch up, then stop") time.sleep(4) print_info("Auto-stop: stopping after ~4 seconds") subscriber.stop("example-done") subscriber_thread.join(timeout=5)
# Step 9: Verify matched transactions print_step(9, "Verify matched transactions")
assert len(matched_txns) >= 3, ( f"Expected at least 3 matched transactions, got {len(matched_txns)}" ) print_success(f"{len(matched_txns)} transactions matched (>= 3)")
for txn_id in matched_txns: print_info(f"Matched txn: {txn_id}")
print_header("Example complete")
if __name__ == "__main__": main()Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling