Skip to content

Stateless Subscriptions

← Back to Examples

Demonstrates getSubscribedTransactions for serverless patterns.

  • Use the stateless function instead of the AlgorandSubscriber class
  • Manage watermark externally between calls
  • Verify no overlap between consecutive calls
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
uv run python 14_stateless_subscriptions.py

View source on GitHub

14_stateless_subscriptions.py
"""Example 14: Stateless Subscriptions
Demonstrates getSubscribedTransactions for serverless patterns.
- Use the stateless function instead of the AlgorandSubscriber class
- Manage watermark externally between calls
- Verify no overlap between consecutive calls
Prerequisites:
- LocalNet running (via `algokit localnet start`)
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from algokit_utils import AlgoAmount, AlgorandClient, PaymentParams
from shared import (
ALGOD_CONFIG,
KMD_CONFIG,
print_header,
print_info,
print_step,
print_success,
shorten_address,
)
from algokit_subscriber import (
NamedTransactionFilter,
TransactionFilter,
TransactionSubscriptionParams,
get_subscribed_transactions,
)
if TYPE_CHECKING:
from algokit_algod_client import AlgodClient
@dataclass
class PollResult:
"""Simplified result from a stateless poll."""
transactions: list[str]
new_watermark: int
round_range: tuple[int, int]
def stateless_poll(algod: AlgodClient, watermark: int, sender_addr: str) -> PollResult:
"""Simulate a serverless handler: watermark in -> result out."""
result = get_subscribed_transactions(
subscription=TransactionSubscriptionParams(
filters=[
NamedTransactionFilter(
name="payments",
filter=TransactionFilter(sender=sender_addr),
),
],
watermark=watermark,
sync_behaviour="sync-oldest",
max_rounds_to_sync=100,
),
algod=algod,
)
return PollResult(
transactions=[txn.id_ for txn in result.subscribed_transactions],
new_watermark=result.new_watermark,
round_range=result.synced_round_range,
)
def send_payment(
algorand: AlgorandClient,
sender: str,
amount: int,
note: str,
label: str,
) -> tuple[str, int]:
"""Send a self-payment and print its ID and round."""
result = algorand.send.payment(
PaymentParams(
sender=sender,
receiver=sender,
amount=AlgoAmount(algo=amount),
note=note.encode(),
)
)
print_info(f"{label} ID: {result.tx_id}")
rnd = result.confirmation.confirmed_round
print_info(f"{label} round: {rnd}")
tx_id = result.tx_id
assert tx_id is not None
assert rnd is not None
return tx_id, rnd
def print_poll_result(poll: PollResult) -> None:
"""Print poll result details."""
print_info(f"Transactions found: {len(poll.transactions)}")
print_info(f"Round range: [{poll.round_range[0]}, {poll.round_range[1]}]")
print_info(f"New watermark: {poll.new_watermark}")
for tx_id in poll.transactions:
print_info(f" Matched txn: {tx_id}")
def print_contrast() -> None:
"""Print contrast between stateless and stateful approaches."""
print()
print(" getSubscribedTransactions (stateless):")
print(" - No class instantiation, no event system")
print(" - Caller manages watermark externally (DB, file, env var)")
print(" - Single function call: params in -> result out")
print(" - Ideal for serverless functions, cron jobs, Lambda/Cloud Functions")
print(" - No polling loop — caller controls when/how often to call")
print()
print(" AlgorandSubscriber (stateful):")
print(" - Class with start/stop, event emitters (on, onBatch)")
print(" - Built-in watermark persistence (get/set callbacks)")
print(" - Built-in polling loop with configurable frequency")
print(" - Ideal for long-running services and real-time subscriptions")
print()
def main() -> None:
print_header("14 — getSubscribedTransactions (Stateless)")
# Step 1: Connect to LocalNet
print_step(1, "Connect to LocalNet")
algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG)
algod = algorand.client.algod
status = algod.status()
print_info(f"Current round: {status.last_round}")
print_success("Connected to LocalNet")
# Step 2: Create and fund sender account
print_step(2, "Create and fund sender account")
dispenser = algorand.account.localnet_dispenser().addr
sender = algorand.account.random().addr
algorand.send.payment(
PaymentParams(
sender=dispenser,
receiver=sender,
amount=AlgoAmount(algo=10),
)
)
print_info(f"Sender: {shorten_address(sender)}")
# Step 3: Send first batch of 2 payments
print_step(3, "Send first batch of payments (2 transactions)")
_tx1_id, tx1_round = send_payment(algorand, sender, 1, "stateless batch-1 txn-1", "Txn 1")
_tx2_id, _tx2_round = send_payment(algorand, sender, 2, "stateless batch-1 txn-2", "Txn 2")
print_success("Sent 2 payments")
# Step 4: First stateless call — watermark = firstRound - 1
print_step(4, "First stateless call (watermark = firstRound - 1)")
initial_watermark = tx1_round - 1
print_info(f"Input watermark: {initial_watermark}")
first_call = stateless_poll(algod, initial_watermark, sender)
print_poll_result(first_call)
assert len(first_call.transactions) == 2, (
f"Expected 2 in first call, got {len(first_call.transactions)}"
)
print_success("First call returned 2 transactions")
# Step 5: Send second batch of 2 payments
print_step(5, "Send second batch of payments (2 transactions)")
_tx3_id, _tx3_round = send_payment(algorand, sender, 3, "stateless batch-2 txn-3", "Txn 3")
_tx4_id, _tx4_round = send_payment(algorand, sender, 4, "stateless batch-2 txn-4", "Txn 4")
print_success("Sent 2 more payments")
# Step 6: Second stateless call — uses newWatermark from first call
print_step(6, "Second stateless call (watermark from first call)")
print_info(f"Input watermark: {first_call.new_watermark}")
second_call = stateless_poll(algod, first_call.new_watermark, sender)
print_poll_result(second_call)
assert len(second_call.transactions) == 2, (
f"Expected 2 in second call, got {len(second_call.transactions)}"
)
print_success("Second call returned only new transactions")
# Step 7: Verify no overlap
print_step(7, "Verify no overlap between calls")
first_ids = set(first_call.transactions)
overlap = [t for t in second_call.transactions if t in first_ids]
assert len(overlap) == 0, f"Found {len(overlap)} overlapping transactions"
print_success("No overlap — second call returned only new transactions")
# Step 8: Contrast with AlgorandSubscriber class
print_step(8, "Contrast: getSubscribedTransactions vs AlgorandSubscriber")
print_contrast()
# Step 9: Summary
print_step(9, "Summary")
print_info(f"First call watermark: {initial_watermark} -> {first_call.new_watermark}")
print_info(f"Second call watermark: {first_call.new_watermark} -> {second_call.new_watermark}")
total = len(first_call.transactions) + len(second_call.transactions)
print_info(f"Total transactions: {total}")
print_success("Stateless subscription pattern demonstrated successfully")
print_header("Example complete")
if __name__ == "__main__":
main()