Skip to content

ARC-28 Event Subscription

← Back to Examples

Demonstrates ARC-28 event parsing, filtering, and inspection:

  • Define event definitions matching ARC-56 spec
  • Configure config-level arc28_events for event parsing
  • Filter transactions by emitted event names
  • Inspect parsed event data (args, args_by_name)
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
uv run python 08_arc28_events.py

View source on GitHub

08_arc28_events.py
"""Example 08: ARC-28 Event Subscription
Demonstrates ARC-28 event parsing, filtering, and inspection:
- Define event definitions matching ARC-56 spec
- Configure config-level arc28_events for event parsing
- Filter transactions by emitted event names
- Inspect parsed event data (args, args_by_name)
Prerequisites:
- LocalNet running (via `algokit localnet start`)
"""
from pathlib import Path
from algokit_utils import (
AlgoAmount,
AlgorandClient,
AppClientMethodCallParams,
AppFactory,
AppFactoryParams,
PaymentParams,
)
from shared import (
ALGOD_CONFIG,
KMD_CONFIG,
print_header,
print_info,
print_step,
print_success,
shorten_address,
)
from algokit_subscriber import (
AlgorandSubscriber,
AlgorandSubscriberConfig,
Arc28Event,
Arc28EventArg,
Arc28EventFilter,
Arc28EventGroup,
SubscribedTransaction,
SubscriberConfigFilter,
TransactionFilter,
in_memory_watermark,
)
SPEC_PATH = Path(__file__).parent / "shared" / "artifacts" / "testing-app.arc56.json"
def _format_value(v: object) -> str:
if isinstance(v, list):
return "[" + ", ".join(str(x) for x in v) + "]"
return str(v)
def print_event_details(txn: SubscribedTransaction) -> None:
"""Print parsed ARC-28 event details for a transaction."""
print_info(f"Transaction: {txn.id_}")
filters = ", ".join(txn.filters_matched or [])
print_info(f" Filters matched: [{filters}]")
if not txn.arc28_events:
print_info(" Events: none")
return
for event in txn.arc28_events:
print_info(f" Event name: {event.event_name}")
print_info(f" Event signature: {event.event_signature}")
print_info(f" Event prefix: {event.event_prefix}")
print_info(f" Group name: {event.group_name}")
args_str = "[" + ", ".join(_format_value(a) for a in event.args) + "]"
by_name_str = (
"{"
+ ", ".join(f"{k}: {_format_value(v)}" for k, v in event.args_by_name.items())
+ "}"
)
print_info(f" Args (ordered): {args_str}")
print_info(f" Args (by name): {by_name_str}")
def verify_events(matched_txns: list[SubscribedTransaction]) -> None:
"""Verify emitSwapped and emitComplex transaction events."""
assert len(matched_txns) == 2, f"Expected 2 matched transactions, got {len(matched_txns)}"
# First transaction: emitSwapped — 1 Swapped event
swap_txn = matched_txns[0]
assert len(swap_txn.arc28_events) == 1, (
f"emitSwapped: expected 1 event, got {len(swap_txn.arc28_events)}"
)
assert swap_txn.arc28_events[0].event_name == "Swapped"
assert swap_txn.arc28_events[0].args[0] == 42
assert swap_txn.arc28_events[0].args[1] == 99
assert swap_txn.arc28_events[0].args_by_name["field1"] == 42
assert swap_txn.arc28_events[0].args_by_name["field2"] == 99
print_success("emitSwapped: Swapped event parsed correctly (field1=42, field2=99)")
# Second transaction: emitComplex — 1 Swapped + 1 Complex
complex_txn = matched_txns[1]
assert len(complex_txn.arc28_events) == 2, (
f"emitComplex: expected 2 events, got {len(complex_txn.arc28_events)}"
)
assert complex_txn.arc28_events[0].event_name == "Swapped"
assert complex_txn.arc28_events[1].event_name == "Complex"
print_success("emitComplex: Swapped + Complex events parsed correctly")
def print_summary(step: int, app_id: int, group: Arc28EventGroup) -> None:
"""Print the final summary."""
print_step(step, "Summary")
print_info(f"App ID: {app_id}")
print_info(f'Event group: "{group.group_name}" with {len(group.events)} event definitions')
print_info(f"process_for_app_ids: [{app_id}] — only parse events from this app")
print_info("continue_on_error: True — skip unparseable events")
print_info("Config-level arc28_events: Defines HOW events are parsed from app call logs")
print_info(
"Filter-level arc28_events: Defines WHICH transactions to match (by group + event name)"
)
print_info("emitSwapped result: 1 Swapped event with args [42, 99]")
print_info("emitComplex result: 2 events: Swapped [10, 20] + Complex [[1,2,3], 20]")
print_header("Example complete")
def main() -> None:
print_header("08 — ARC-28 Event Subscription")
# Step 1: Connect to LocalNet
print_step(1, "Connect to LocalNet")
algorand = AlgorandClient.from_config(algod_config=ALGOD_CONFIG, kmd_config=KMD_CONFIG)
status = algorand.client.algod.status()
print_info(f"Current round: {status.last_round}")
print_success("Connected to LocalNet")
# Step 2: Create and fund account
print_step(2, "Create and fund account")
dispenser = algorand.account.localnet_dispenser().addr
creator = algorand.account.random().addr
algorand.send.payment(
PaymentParams(sender=dispenser, receiver=creator, amount=AlgoAmount(algo=100))
)
print_info(f"Creator: {shorten_address(creator)}")
print_success("Account created and funded")
# Step 3: Deploy TestingApp via AppFactory
print_step(3, "Deploy TestingApp via AppFactory")
factory = AppFactory(
AppFactoryParams(
algorand=algorand,
app_spec=SPEC_PATH.read_text("utf-8"),
default_sender=creator,
)
)
app_client, create_result = factory.send.bare.create()
app_id = create_result.app_id
create_round = create_result.confirmation.confirmed_round
assert create_round is not None
print_info(f"App ID: {app_id}")
print_info(f"Create round: {create_round}")
print_success("TestingApp deployed")
# Step 4: Emit ARC-28 events via app calls
print_step(4, "Emit ARC-28 events via app calls")
swap_result = app_client.send.call(
AppClientMethodCallParams(method="emitSwapped(uint64,uint64)void", args=[42, 99])
)
print_info(f"emitSwapped(42, 99) txn: {swap_result.tx_ids[0]}")
complex_result = app_client.send.call(
AppClientMethodCallParams(
method="emitComplex(uint64,uint64,uint32[])void",
args=[10, 20, [1, 2, 3]],
)
)
print_info(f"emitComplex(10, 20, [1,2,3]) txn: {complex_result.tx_ids[0]}")
print_success("2 event-emitting app calls sent")
watermark_before = create_round - 1
# Step 5: Define ARC-28 event definitions
print_step(5, "Define ARC-28 event definitions")
swapped_event = Arc28Event(
name="Swapped",
args=[
Arc28EventArg(type="uint64", name="field1"),
Arc28EventArg(type="uint64", name="field2"),
],
)
complex_event = Arc28Event(
name="Complex",
args=[
Arc28EventArg(type="uint32[]", name="field1"),
Arc28EventArg(type="uint64", name="field2"),
],
)
print_info("Swapped signature: Swapped(uint64,uint64)")
print_info("Complex signature: Complex(uint32[],uint64)")
print_success("Event definitions ready")
# Step 6: Subscribe with arc28_events config (parsing) + filter (matching)
# KEY DISTINCTION:
# Config-level arc28_events (Arc28EventGroup[]): HOW to parse events
# Filter-level arc28_events: WHICH transactions to match
print_step(6, "Subscribe with arc28_events — event parsing + filtering")
arc28_event_group = Arc28EventGroup(
group_name="testing-app-events",
events=[swapped_event, complex_event],
process_for_app_ids=[app_id],
continue_on_error=True,
)
subscriber = AlgorandSubscriber(
config=AlgorandSubscriberConfig(
filters=[
SubscriberConfigFilter(
name="arc28-events",
filter=TransactionFilter(
app_id=app_id,
arc28_events=[
Arc28EventFilter(
group_name="testing-app-events",
event_name="Swapped",
),
Arc28EventFilter(
group_name="testing-app-events",
event_name="Complex",
),
],
),
),
],
arc28_events=[arc28_event_group],
watermark_persistence=in_memory_watermark(watermark_before),
sync_behaviour="sync-oldest",
max_rounds_to_sync=100,
),
algod_client=algorand.client.algod,
)
result = subscriber.poll_once()
matched_txns = result.subscribed_transactions
print_info(f"Matched count: {len(matched_txns)}")
# Step 7: Inspect parsed ARC-28 event data
print_step(7, "Inspect parsed ARC-28 event data")
for txn in matched_txns:
print_event_details(txn)
verify_events(matched_txns)
# Step 8: Demonstrate continueOnError behavior
print_step(8, "Demonstrate continue_on_error: True")
print_info(
"continue_on_error: True — if an event log cannot be decoded,"
" a warning is logged and the event is skipped"
)
print_info(
"Behavior: Without continue_on_error, a parse failure would"
" raise an error and halt processing"
)
print_success(
"continue_on_error: True is set on the event group"
" — unparseable events are silently skipped"
)
# Step 9: Summary
print_summary(9, app_id, arc28_event_group)
if __name__ == "__main__":
main()