Stateless Subscriptions
Description
Section titled “Description”Demonstrates getSubscribedTransactions for serverless patterns.
- Use the stateless function instead of the AlgorandSubscriber class
- Manage watermark externally between calls
- Verify no overlap between consecutive calls
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscriberuv run python 14_stateless_subscriptions.py"""Example 14: Stateless Subscriptions
Demonstrates getSubscribedTransactions for serverless patterns.- Use the stateless function instead of the AlgorandSubscriber class- Manage watermark externally between calls- Verify no overlap between consecutive calls
Prerequisites:- LocalNet running (via `algokit localnet start`)"""
from __future__ import annotations
from dataclasses import dataclassfrom typing import TYPE_CHECKING
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParamsfrom shared import ( ALGOD_CONFIG, KMD_CONFIG, print_header, print_info, print_step, print_success, shorten_address,)
from algokit_subscriber import ( NamedTransactionFilter, TransactionFilter, TransactionSubscriptionParams, get_subscribed_transactions,)
if TYPE_CHECKING: from algokit_algod_client import AlgodClient
@dataclassclass PollResult: """Simplified result from a stateless poll."""
transactions: list[str] new_watermark: int round_range: tuple[int, int]
def stateless_poll(algod: AlgodClient, watermark: int, sender_addr: str) -> PollResult: """Simulate a serverless handler: watermark in -> result out.""" result = get_subscribed_transactions( subscription=TransactionSubscriptionParams( filters=[ NamedTransactionFilter( name="payments", filter=TransactionFilter(sender=sender_addr), ), ], watermark=watermark, sync_behaviour="sync-oldest", max_rounds_to_sync=100, ), algod=algod, ) return PollResult( transactions=[txn.id_ for txn in result.subscribed_transactions], new_watermark=result.new_watermark, round_range=result.synced_round_range, )
def send_payment( algorand: AlgorandClient, sender: str, amount: int, note: str, label: str,) -> tuple[str, int]: """Send a self-payment and print its ID and round.""" result = algorand.send.payment( PaymentParams( sender=sender, receiver=sender, amount=AlgoAmount(algo=amount), note=note.encode(), ) ) print_info(f"{label} ID: {result.tx_id}") rnd = result.confirmation.confirmed_round print_info(f"{label} round: {rnd}") tx_id = result.tx_id assert tx_id is not None assert rnd is not None return tx_id, rnd
def print_poll_result(poll: PollResult) -> None: """Print poll result details.""" print_info(f"Transactions found: {len(poll.transactions)}") print_info(f"Round range: [{poll.round_range[0]}, {poll.round_range[1]}]") print_info(f"New watermark: {poll.new_watermark}") for tx_id in poll.transactions: print_info(f" Matched txn: {tx_id}")
def print_contrast() -> None: """Print contrast between stateless and stateful approaches.""" print() print(" getSubscribedTransactions (stateless):") print(" - No class instantiation, no event system") print(" - Caller manages watermark externally (DB, file, env var)") print(" - Single function call: params in -> result out") print(" - Ideal for serverless functions, cron jobs, Lambda/Cloud Functions") print(" - No polling loop — caller controls when/how often to call") print() print(" AlgorandSubscriber (stateful):") print(" - Class with start/stop, event emitters (on, onBatch)") print(" - Built-in watermark persistence (get/set callbacks)") print(" - Built-in polling loop with configurable frequency") print(" - Ideal for long-running services and real-time subscriptions") print()
def main() -> None: print_header("14 — getSubscribedTransactions (Stateless)")
# Step 1: Connect to LocalNet print_step(1, "Connect to LocalNet") algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG) algod = algorand.client.algod status = algod.status() print_info(f"Current round: {status.last_round}") print_success("Connected to LocalNet")
# Step 2: Create and fund sender account print_step(2, "Create and fund sender account") dispenser = algorand.account.localnet_dispenser().addr sender = algorand.account.random().addr algorand.send.payment( PaymentParams( sender=dispenser, receiver=sender, amount=AlgoAmount(algo=10), ) ) print_info(f"Sender: {shorten_address(sender)}")
# Step 3: Send first batch of 2 payments print_step(3, "Send first batch of payments (2 transactions)") _tx1_id, tx1_round = send_payment(algorand, sender, 1, "stateless batch-1 txn-1", "Txn 1") _tx2_id, _tx2_round = send_payment(algorand, sender, 2, "stateless batch-1 txn-2", "Txn 2") print_success("Sent 2 payments")
# Step 4: First stateless call — watermark = firstRound - 1 print_step(4, "First stateless call (watermark = firstRound - 1)") initial_watermark = tx1_round - 1 print_info(f"Input watermark: {initial_watermark}") first_call = stateless_poll(algod, initial_watermark, sender) print_poll_result(first_call) assert len(first_call.transactions) == 2, ( f"Expected 2 in first call, got {len(first_call.transactions)}" ) print_success("First call returned 2 transactions")
# Step 5: Send second batch of 2 payments print_step(5, "Send second batch of payments (2 transactions)") _tx3_id, _tx3_round = send_payment(algorand, sender, 3, "stateless batch-2 txn-3", "Txn 3") _tx4_id, _tx4_round = send_payment(algorand, sender, 4, "stateless batch-2 txn-4", "Txn 4") print_success("Sent 2 more payments")
# Step 6: Second stateless call — uses newWatermark from first call print_step(6, "Second stateless call (watermark from first call)") print_info(f"Input watermark: {first_call.new_watermark}") second_call = stateless_poll(algod, first_call.new_watermark, sender) print_poll_result(second_call) assert len(second_call.transactions) == 2, ( f"Expected 2 in second call, got {len(second_call.transactions)}" ) print_success("Second call returned only new transactions")
# Step 7: Verify no overlap print_step(7, "Verify no overlap between calls") first_ids = set(first_call.transactions) overlap = [t for t in second_call.transactions if t in first_ids] assert len(overlap) == 0, f"Found {len(overlap)} overlapping transactions" print_success("No overlap — second call returned only new transactions")
# Step 8: Contrast with AlgorandSubscriber class print_step(8, "Contrast: getSubscribedTransactions vs AlgorandSubscriber") print_contrast()
# Step 9: Summary print_step(9, "Summary") print_info(f"First call watermark: {initial_watermark} -> {first_call.new_watermark}") print_info(f"Second call watermark: {first_call.new_watermark} -> {second_call.new_watermark}") total = len(first_call.transactions) + len(second_call.transactions) print_info(f"Total transactions: {total}") print_success("Stateless subscription pattern demonstrated successfully")
print_header("Example complete")
if __name__ == "__main__": main()Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling