ARC-28 Event Subscription
Description
Section titled “Description”Demonstrates ARC-28 event parsing, filtering, and inspection:
- Define event definitions matching ARC-56 spec
- Configure config-level arc28_events for event parsing
- Filter transactions by emitted event names
- Inspect parsed event data (args, args_by_name)
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscriberuv run python 08_arc28_events.py"""Example 08: ARC-28 Event Subscription
Demonstrates ARC-28 event parsing, filtering, and inspection:- Define event definitions matching ARC-56 spec- Configure config-level arc28_events for event parsing- Filter transactions by emitted event names- Inspect parsed event data (args, args_by_name)
Prerequisites:- LocalNet running (via `algokit localnet start`)"""
from pathlib import Path
from algokit_utils import ( AlgoAmount, AlgorandClient, AppClientMethodCallParams, AppFactory, AppFactoryParams, PaymentParams,)from shared import ( ALGOD_CONFIG, KMD_CONFIG, print_header, print_info, print_step, print_success, shorten_address,)
from algokit_subscriber import ( AlgorandSubscriber, AlgorandSubscriberConfig, Arc28Event, Arc28EventArg, Arc28EventFilter, Arc28EventGroup, SubscribedTransaction, SubscriberConfigFilter, TransactionFilter, in_memory_watermark,)
SPEC_PATH = Path(__file__).parent / "shared" / "artifacts" / "testing-app.arc56.json"
def _format_value(v: object) -> str: if isinstance(v, list): return "[" + ", ".join(str(x) for x in v) + "]" return str(v)
def print_event_details(txn: SubscribedTransaction) -> None: """Print parsed ARC-28 event details for a transaction.""" print_info(f"Transaction: {txn.id_}") filters = ", ".join(txn.filters_matched or []) print_info(f" Filters matched: [{filters}]") if not txn.arc28_events: print_info(" Events: none") return for event in txn.arc28_events: print_info(f" Event name: {event.event_name}") print_info(f" Event signature: {event.event_signature}") print_info(f" Event prefix: {event.event_prefix}") print_info(f" Group name: {event.group_name}") args_str = "[" + ", ".join(_format_value(a) for a in event.args) + "]" by_name_str = ( "{" + ", ".join(f"{k}: {_format_value(v)}" for k, v in event.args_by_name.items()) + "}" ) print_info(f" Args (ordered): {args_str}") print_info(f" Args (by name): {by_name_str}")
def verify_events(matched_txns: list[SubscribedTransaction]) -> None: """Verify emitSwapped and emitComplex transaction events.""" assert len(matched_txns) == 2, f"Expected 2 matched transactions, got {len(matched_txns)}" # First transaction: emitSwapped — 1 Swapped event swap_txn = matched_txns[0] assert len(swap_txn.arc28_events) == 1, ( f"emitSwapped: expected 1 event, got {len(swap_txn.arc28_events)}" ) assert swap_txn.arc28_events[0].event_name == "Swapped" assert swap_txn.arc28_events[0].args[0] == 42 assert swap_txn.arc28_events[0].args[1] == 99 assert swap_txn.arc28_events[0].args_by_name["field1"] == 42 assert swap_txn.arc28_events[0].args_by_name["field2"] == 99 print_success("emitSwapped: Swapped event parsed correctly (field1=42, field2=99)") # Second transaction: emitComplex — 1 Swapped + 1 Complex complex_txn = matched_txns[1] assert len(complex_txn.arc28_events) == 2, ( f"emitComplex: expected 2 events, got {len(complex_txn.arc28_events)}" ) assert complex_txn.arc28_events[0].event_name == "Swapped" assert complex_txn.arc28_events[1].event_name == "Complex" print_success("emitComplex: Swapped + Complex events parsed correctly")
def print_summary(step: int, app_id: int, group: Arc28EventGroup) -> None: """Print the final summary.""" print_step(step, "Summary") print_info(f"App ID: {app_id}") print_info(f'Event group: "{group.group_name}" with {len(group.events)} event definitions') print_info(f"process_for_app_ids: [{app_id}] — only parse events from this app") print_info("continue_on_error: True — skip unparseable events") print_info("Config-level arc28_events: Defines HOW events are parsed from app call logs") print_info( "Filter-level arc28_events: Defines WHICH transactions to match (by group + event name)" ) print_info("emitSwapped result: 1 Swapped event with args [42, 99]") print_info("emitComplex result: 2 events: Swapped [10, 20] + Complex [[1,2,3], 20]") print_header("Example complete")
def main() -> None: print_header("08 — ARC-28 Event Subscription")
# Step 1: Connect to LocalNet print_step(1, "Connect to LocalNet") algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG) status = algorand.client.algod.status() print_info(f"Current round: {status.last_round}") print_success("Connected to LocalNet")
# Step 2: Create and fund account print_step(2, "Create and fund account") dispenser = algorand.account.localnet_dispenser().addr creator = algorand.account.random().addr algorand.send.payment( PaymentParams(sender=dispenser, receiver=creator, amount=AlgoAmount(algo=100)) ) print_info(f"Creator: {shorten_address(creator)}") print_success("Account created and funded")
# Step 3: Deploy TestingApp via AppFactory print_step(3, "Deploy TestingApp via AppFactory") factory = AppFactory( AppFactoryParams( algorand=algorand, app_spec=SPEC_PATH.read_text("utf-8"), default_sender=creator, ) ) app_client, create_result = factory.send.bare.create() app_id = create_result.app_id create_round = create_result.confirmation.confirmed_round assert create_round is not None print_info(f"App ID: {app_id}") print_info(f"Create round: {create_round}") print_success("TestingApp deployed")
# Step 4: Emit ARC-28 events via app calls print_step(4, "Emit ARC-28 events via app calls") swap_result = app_client.send.call( AppClientMethodCallParams(method="emitSwapped(uint64,uint64)void", args=[42, 99]) ) print_info(f"emitSwapped(42, 99) txn: {swap_result.tx_ids[0]}") complex_result = app_client.send.call( AppClientMethodCallParams( method="emitComplex(uint64,uint64,uint32[])void", args=[10, 20, [1, 2, 3]], ) ) print_info(f"emitComplex(10, 20, [1,2,3]) txn: {complex_result.tx_ids[0]}") print_success("2 event-emitting app calls sent") watermark_before = create_round - 1
# Step 5: Define ARC-28 event definitions print_step(5, "Define ARC-28 event definitions") swapped_event = Arc28Event( name="Swapped", args=[ Arc28EventArg(type="uint64", name="field1"), Arc28EventArg(type="uint64", name="field2"), ], ) complex_event = Arc28Event( name="Complex", args=[ Arc28EventArg(type="uint32[]", name="field1"), Arc28EventArg(type="uint64", name="field2"), ], ) print_info("Swapped signature: Swapped(uint64,uint64)") print_info("Complex signature: Complex(uint32[],uint64)") print_success("Event definitions ready")
# Step 6: Subscribe with arc28_events config (parsing) + filter (matching) # KEY DISTINCTION: # Config-level arc28_events (Arc28EventGroup[]): HOW to parse events # Filter-level arc28_events: WHICH transactions to match print_step(6, "Subscribe with arc28_events — event parsing + filtering") arc28_event_group = Arc28EventGroup( group_name="testing-app-events", events=[swapped_event, complex_event], process_for_app_ids=[app_id], continue_on_error=True, ) subscriber = AlgorandSubscriber( config=AlgorandSubscriberConfig( filters=[ SubscriberConfigFilter( name="arc28-events", filter=TransactionFilter( app_id=app_id, arc28_events=[ Arc28EventFilter( group_name="testing-app-events", event_name="Swapped", ), Arc28EventFilter( group_name="testing-app-events", event_name="Complex", ), ], ), ), ], arc28_events=[arc28_event_group], watermark_persistence=in_memory_watermark(watermark_before), sync_behaviour="sync-oldest", max_rounds_to_sync=100, ), algod_client=algorand.client.algod, ) result = subscriber.poll_once() matched_txns = result.subscribed_transactions print_info(f"Matched count: {len(matched_txns)}")
# Step 7: Inspect parsed ARC-28 event data print_step(7, "Inspect parsed ARC-28 event data") for txn in matched_txns: print_event_details(txn) verify_events(matched_txns)
# Step 8: Demonstrate continueOnError behavior print_step(8, "Demonstrate continue_on_error: True") print_info( "continue_on_error: True — if an event log cannot be decoded," " a warning is logged and the event is skipped" ) print_info( "Behavior: Without continue_on_error, a parse failure would" " raise an error and halt processing" ) print_success( "continue_on_error: True is set on the event group" " — unparseable events are silently skipped" )
# Step 9: Summary print_summary(9, app_id, arc28_event_group)
if __name__ == "__main__": main()Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling