Class BSV::Wallet::SolidQueueAdapter ¶
Inherits: Object Includes: BSV::Wallet::Interface::BroadcastQueue
PostgreSQL-backed asynchronous broadcast queue adapter.
Persists outbound transactions to the wallet_broadcast_jobs table and processes them in a background worker thread. Designed for multi-process deployments where InlineQueue's synchronous broadcast is unacceptable.
The worker uses +SELECT ... FOR UPDATE SKIP LOCKED+ so multiple processes can run SolidQueueAdapter against the same database safely — only one worker will claim a given job.
Lifecycle¶
adapter = BSV::Wallet::SolidQueueAdapter.new(
db: sequel_db,
storage: postgres_store,
broadcaster: arc_broadcaster
)
adapter.start # spawns background worker thread
# ... process transactions ...
adapter.drain # stop + join (blocks until current poll cycle completes)
Recovery¶
On restart, the worker's first poll naturally finds stale sending jobs (those whose locked_at is older than stale_threshold seconds) and re-broadcasts them. No special recovery code is needed.
Drain warning¶
drain blocks until the current poll cycle completes. If a job is mid-broadcast this may take several seconds. Jobs enqueued after +@running = false+ but before the worker thread exits will remain unsent until the next start.
Constants¶
DEFAULT_POLL_INTERVAL ¶
Default number of seconds between poll cycles.
MAX_ATTEMPTS ¶
Maximum number of broadcast attempts before a job is abandoned. After this many failures the job remains failed permanently.
STALE_THRESHOLD ¶
Default number of seconds before a sending job is considered stale and eligible for retry. Configurable for testability.
Public Instance Methods¶
async?() ¶
Returns true — this adapter executes broadcast asynchronously. - @return [Boolean]
broadcast_enabled?() ¶
Returns true — SolidQueueAdapter requires a broadcaster at construction time (ArgumentError is raised if nil is passed), so broadcast is always available. - @return [Boolean]
drain() ¶
Stops the worker and blocks until the current poll cycle completes.
Safe to call when start has not been called (+@worker_thread+ is nil). - @return [void]
enqueue(payload) ¶
Persists a transaction to the broadcast job queue and returns immediately.
Inserts a row into wallet_broadcast_jobs with status unsent. If a row already exists for the same txid (e.g. after a crash and restart), the UniqueConstraintViolation is rescued and the existing status is returned instead. - @param payload [Hash] broadcast payload (see +BroadcastQueue+ module docs) - @return [Hash] +{ txid: String, broadcast_status: 'sending' }+
initialize(db:, storage:, broadcaster:, poll_interval: = DEFAULT_POLL_INTERVAL, stale_threshold: = STALE_THRESHOLD) ¶
- @param
db[Sequel::Database] a Sequel database handle (shared with PostgresStore) - @param
storage[Store] wallet storage adapter (must not be MemoryStore) - @param
broadcaster[#broadcast] broadcaster object - @param
poll_interval[Integer] seconds between worker poll cycles - @param
stale_threshold[Integer] seconds before a +sending+ job is retried - @raise [ArgumentError] if +storage+ is a +BSV::Wallet::Store::Memory+
- @raise [ArgumentError] if +broadcaster+ is +nil+
- @return [SolidQueueAdapter] a new instance of SolidQueueAdapter
start() ¶
Spawns the background worker thread.
Safe to call multiple times — returns immediately if already running. The check-and-set is atomic under the mutex to prevent two concurrent start calls from spawning duplicate worker threads. - @return [void]
status(txid) ¶
Returns the broadcast status for a previously enqueued transaction.
Reads directly from the jobs table — the authoritative source of truth for async broadcast status. - @param txid [String] hex transaction identifier - @return [String, nil] status string or +nil+ if no job exists
stop() ¶
Signals the worker to stop after the current poll cycle.
Non-blocking — returns immediately without waiting for the thread. - @return [void]