Part of the Sanchayam series.
Why a Queue
A price fetch is an external API call. It can be slow, it can fail, it needs rate limiting, and it should not block a user request. The right place for it is a background worker draining a persistent queue, not an inline synchronous call.
When the frontend requests holdings data and a price is missing or stale, the backend responds immediately with what it has and enqueues the asset for a fetch. The next time the user loads the page, the price is ready. There is no blocking wait.
The Queue Table
CREATE TABLE price_fetch_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
asset_id UUID NOT NULL REFERENCES assets(id),
status price_fetch_status NOT NULL DEFAULT 'pending',
priority INT NOT NULL DEFAULT 2,
retry_count INT NOT NULL DEFAULT 0,
queued_at TIMESTAMP NOT NULL DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP,
error TEXT
);
CREATE UNIQUE INDEX price_fetch_queue_active ON price_fetch_queue (asset_id)
WHERE status IN ('pending', 'in_progress');
The unique partial index prevents duplicate queue entries for the same asset while it is pending or in progress. Enqueuing an already-queued asset is a no-op:
export async function enqueue(assetId: string, priority = 2): Promise<void> {
await sql`
INSERT INTO price_fetch_queue (asset_id, status, priority, queued_at)
VALUES (${assetId}, 'pending', ${priority}, NOW())
ON CONFLICT DO NOTHING
`
}
Priority 1 is user-request priority (triggered when a user loads a page and the price is missing). Priority 2 is background refresh. The worker processes lower numbers first.
FOR UPDATE SKIP LOCKED
The worker loop picks items one at a time using FOR UPDATE SKIP LOCKED:
const [item] = await sql`
SELECT pq.id, pq.asset_id, pq.retry_count, a.symbol, a.data_type
FROM price_fetch_queue pq
JOIN assets a ON a.id = pq.asset_id
WHERE pq.status = 'pending'
ORDER BY pq.priority ASC, pq.queued_at ASC
LIMIT 1
FOR UPDATE OF pq SKIP LOCKED
`
FOR UPDATE locks the selected row so no other worker can pick it. SKIP LOCKED means if the top row is already locked by another transaction, skip it and check the next one. This allows multiple workers to drain the same queue without coordination overhead and without a row ever being picked twice.
Provider Registry
Providers (currently Twelve Data, Yahoo Finance, and mfapi.in) are stored in the data_collectors table with encrypted API keys. On startup, syncProviders loads all active providers, decrypts their keys using AES-256-GCM, and stores them in a module-level registry Map:
function decryptKey(encHex: string, ivHex: string, keyHex: string): string {
const key = Buffer.from(keyHex, 'hex')
const iv = Buffer.from(ivHex, 'hex')
const data = Buffer.from(encHex, 'hex')
const tag = data.slice(data.length - 16)
const encrypted = data.slice(0, data.length - 16)
const decipher = crypto.createDecipheriv('aes-256-gcm', key, iv)
decipher.setAuthTag(tag)
return decipher.update(encrypted).toString('utf8') + decipher.final('utf8')
}
The decrypted key lives only in memory. The database stores ciphertext. If the database is compromised, API keys are not exposed.
Provider Routing
The provider_routing table maps asset data types to providers. A broad rule covers all symbols of a given type. A symbol-level rule overrides the broad rule for a specific asset:
CREATE UNIQUE INDEX provider_routing_type_broad ON provider_routing (data_type) WHERE symbol IS NULL;
CREATE UNIQUE INDEX provider_routing_type_symbol ON provider_routing (data_type, symbol) WHERE symbol IS NOT NULL;
Resolution in the worker:
async function resolveProvider(dataType: string, symbol: string | null): Promise<Provider | null> {
const [row] = await sql`
SELECT pr.collector_name FROM provider_routing pr
JOIN data_collectors dc ON dc.name = pr.collector_name
WHERE pr.data_type = ${dataType}
AND (pr.symbol = ${symbol} OR pr.symbol IS NULL)
AND pr.is_active = true AND dc.is_active = true
ORDER BY pr.symbol NULLS LAST
LIMIT 1
`
if (!row) return null
return registry.get(row.collector_name) ?? null
}
ORDER BY pr.symbol NULLS LAST ensures a symbol-level override wins over the broad type rule. By default, all equity types route to Twelve Data. An individual asset can be overridden to Yahoo Finance or mfapi.in without touching code.
Rate Limit Enforcement via collector_call_log
Provider rate limits are not tracked with in-process counters. In-process counters reset on restart and do not survive crashes. Instead, rate limits are enforced by checking the last call time in collector_call_log:
async function enforceRateLimit(provider: Provider): Promise<void> {
const [lastCall] = await sql`
SELECT called_at FROM collector_call_log
WHERE collector_name = ${provider.name}
ORDER BY called_at DESC
LIMIT 1
`
if (!lastCall) return
const minDelay = Math.ceil(60000 / provider.rateLimitPerMin)
const elapsed = Date.now() - new Date(lastCall.called_at).getTime()
if (elapsed < minDelay) {
await sleep(minDelay - elapsed)
}
}
Every successful or failed API call is logged to collector_call_log. The worker checks the log before each fetch and waits if the minimum interval has not elapsed. This works correctly across restarts - the log persists in the database.
Twelve Data’s free tier allows 8 calls per minute. The worker calculates 60000 / 8 = 7500ms minimum gap between calls.
Stale Price Detection
Every minute, enqueueStalePrices finds assets that need a refresh:
const stale = await sql`
SELECT DISTINCT a.id FROM assets a
JOIN holdings h ON h.asset_id = a.id
WHERE a.update_mode = 'api'
AND a.is_deleted = false AND a.is_active = true
AND h.is_deleted = false AND h.status = 'active'
AND (
NOT EXISTS (SELECT 1 FROM asset_price_history p WHERE p.asset_id = a.id)
OR (
NOT EXISTS (
SELECT 1 FROM asset_price_history p
WHERE p.asset_id = a.id AND p.recorded_at >= NOW() - INTERVAL '24 hours'
)
AND a.price_last_consumed_at > NOW() - INTERVAL '48 hours'
)
)
`
An asset qualifies if it has no price data at all, or if its latest price is older than 24 hours and someone has looked at it within the last 48 hours. Assets that nobody has viewed in two days are skipped. This keeps the API call volume proportional to actual usage rather than the size of the asset catalog.
Retry and Failure
The worker retries a failed fetch up to 3 times before marking it permanently failed:
if (retries >= 3) {
await sql`UPDATE price_fetch_queue SET status = 'failed', ... WHERE id = ${item.id}`
await emit('PRICE_FETCH_FAILED', { assetId, symbol, retries, error: message })
} else {
await sql`UPDATE price_fetch_queue SET status = 'pending', retry_count = ${retries}, ... WHERE id = ${item.id}`
}
On permanent failure, a PRICE_FETCH_FAILED notification event is created. The notification service picks it up and emails the admin. The queue row is left in failed status so the admin can see which assets stopped receiving updates and why.
Prices are stored in minor units as integers:
const priceMinor = Math.round(parseFloat(price) * (10 ** asset.decimals))
INR has 2 decimals, so a price of 2847.50 becomes 284750. No floating point in stored values.