Skip to content

Class BSV::Wallet::SolidQueueAdapter

Inherits: Object Includes: BSV::Wallet::Interface::BroadcastQueue

PostgreSQL-backed asynchronous broadcast queue adapter.

Persists outbound transactions to the wallet_broadcast_jobs table and processes them in a background worker thread. Designed for multi-process deployments where InlineQueue's synchronous broadcast is unacceptable.

The worker uses +SELECT ... FOR UPDATE SKIP LOCKED+ so multiple processes can run SolidQueueAdapter against the same database safely — only one worker will claim a given job.

Lifecycle

adapter = BSV::Wallet::SolidQueueAdapter.new(
  db: sequel_db,
  storage: postgres_store,
  broadcaster: arc_broadcaster
)
adapter.start   # spawns background worker thread
# ... process transactions ...
adapter.drain   # stop + join (blocks until current poll cycle completes)

Recovery

On restart, the worker's first poll naturally finds stale sending jobs (those whose locked_at is older than stale_threshold seconds) and re-broadcasts them. No special recovery code is needed.

Drain warning

drain blocks until the current poll cycle completes. If a job is mid-broadcast this may take several seconds. Jobs enqueued after +@running = false+ but before the worker thread exits will remain unsent until the next start.

Constants

DEFAULT_POLL_INTERVAL

Default number of seconds between poll cycles.

MAX_ATTEMPTS

Maximum number of broadcast attempts before a job is abandoned. After this many failures the job remains failed permanently.

STALE_THRESHOLD

Default number of seconds before a sending job is considered stale and eligible for retry. Configurable for testability.

Public Instance Methods

async?()

Returns true — this adapter executes broadcast asynchronously. - @return [Boolean]

broadcast_enabled?()

Returns trueSolidQueueAdapter requires a broadcaster at construction time (ArgumentError is raised if nil is passed), so broadcast is always available. - @return [Boolean]

drain()

Stops the worker and blocks until the current poll cycle completes.

Safe to call when start has not been called (+@worker_thread+ is nil). - @return [void]

enqueue(payload)

Persists a transaction to the broadcast job queue and returns immediately.

Inserts a row into wallet_broadcast_jobs with status unsent. If a row already exists for the same txid (e.g. after a crash and restart), the UniqueConstraintViolation is rescued and the existing status is returned instead. - @param payload [Hash] broadcast payload (see +BroadcastQueue+ module docs) - @return [Hash] +{ txid: String, broadcast_status: 'sending' }+

initialize(db:, storage:, broadcaster:, poll_interval: = DEFAULT_POLL_INTERVAL, stale_threshold: = STALE_THRESHOLD)

  • @param db [Sequel::Database] a Sequel database handle (shared with PostgresStore)
  • @param storage [Store] wallet storage adapter (must not be MemoryStore)
  • @param broadcaster [#broadcast] broadcaster object
  • @param poll_interval [Integer] seconds between worker poll cycles
  • @param stale_threshold [Integer] seconds before a +sending+ job is retried
  • @raise [ArgumentError] if +storage+ is a +BSV::Wallet::Store::Memory+
  • @raise [ArgumentError] if +broadcaster+ is +nil+
  • @return [SolidQueueAdapter] a new instance of SolidQueueAdapter

start()

Spawns the background worker thread.

Safe to call multiple times — returns immediately if already running. The check-and-set is atomic under the mutex to prevent two concurrent start calls from spawning duplicate worker threads. - @return [void]

status(txid)

Returns the broadcast status for a previously enqueued transaction.

Reads directly from the jobs table — the authoritative source of truth for async broadcast status. - @param txid [String] hex transaction identifier - @return [String, nil] status string or +nil+ if no job exists

stop()

Signals the worker to stop after the current poll cycle.

Non-blocking — returns immediately without waiting for the thread. - @return [void]