Skip to content

Watermarking & Resilience

You can create reliable syncing / indexing services through a simple round watermarking capability that allows you to create resilient syncing services that can recover from an outage.

This works through the use of the watermark_persistence parameter in AlgorandSubscriber and watermark parameter in get_subscribed_transactions:

import algokit_subscriber as sub
def get_saved_watermark() -> int:
# Return the watermark from a persistence store e.g. database, redis, file system, etc.
pass
def save_watermark(new_watermark: int) -> None:
# Save the watermark to a persistence store e.g. database, redis, file system, etc.
pass
...
subscriber = sub.AlgorandSubscriber(
config=sub.AlgorandSubscriberConfig(
watermark_persistence=sub.WatermarkPersistence(
get=get_saved_watermark, set=save_watermark
),
...
),
...
)
# or:
watermark = get_saved_watermark()
result = sub.get_subscribed_transactions(
subscription=sub.TransactionSubscriptionParams(watermark=watermark, ...),
...
)
save_watermark(result.new_watermark)

By using a persistence store, you can gracefully respond to an outage of your subscriber. The next time it starts it will pick back up from the point where it last persisted. It’s worth noting this provides at least once delivery semantics so you need to handle duplicate events.

Alternatively, if you want to create at most once delivery semantics you could use the transactional outbox pattern and wrap a unit of work from an ACID persistence store (e.g. a SQL database with a serializable or repeatable read transaction) around the watermark retrieval, transaction processing and watermark persistence so the processing of transactions and watermarking of a single poll happens in a single atomic transaction. In this model, you would then process the transactions in a separate process from the persistence store (and likely have a flag on each transaction to indicate if it has been processed or not). You would need to be careful to ensure that you only have one subscriber actively running at a time to guarantee this delivery semantic. To ensure resilience you may want to have multiple subscribers running, but a primary node that actually executes based on retrieval of a distributed semaphore / lease.

If you are doing a quick test or creating an ephemeral subscriber that just needs to exist in-memory and doesn’t need to recover resiliently (useful with sync_behaviour of skip-sync-newest for instance) then you can use the in_memory_watermark() helper instead of a persistence store, e.g.:

subscriber = sub.AlgorandSubscriber(
config=sub.AlgorandSubscriberConfig(
watermark_persistence=sub.in_memory_watermark(),
...
),
...
)
# or:
watermark = 0
result = sub.get_subscribed_transactions(
subscription=sub.TransactionSubscriptionParams(watermark=watermark, ...),
...
)
watermark = result.new_watermark