The Problem
Modern applications frequently integrate with third-party services through OAuth, requiring careful management of access tokens that expire over time. The challenge becomes particularly complex when multiple concurrent operations need to refresh the same user’s token simultaneously. Without proper coordination, this leads to race conditions where multiple processes attempt token refresh concurrently, potentially causing OAuth provider rate limiting, token invalidation, or inconsistent application state.
Consider a FastAPI application making 100 parallel API calls to a third-party service. When the OAuth token expires, all 100 operations might detect the need for refresh simultaneously, creating a classic distributed systems coordination problem.
The Classical Solutions
Optimistic Locking
Optimistic locking assumes conflicts are rare and uses version numbers or timestamps to detect conflicts at commit time. While efficient under low contention, it suffers from retry storms when conflicts become frequent. For OAuth token refresh, this means multiple processes would attempt refresh simultaneously, with all but one failing and retrying—wasteful and potentially problematic with OAuth provider rate limits.
class OptimisticTokenManager:
def __init__(self, db_pool):
self.db_pool = db_pool
async def get_valid_token(self, user_id):
async with self.db_pool.acquire() as conn:
# Read current token with version
token_data = await conn.fetchrow("""
SELECT access_token, refresh_token, expires_at, version
FROM oauth_tokens WHERE user_id = $1
""", user_id)
if not token_data:
raise Exception("No token found")
access_token, refresh_token, expires_at, version = token_data
# Check if token needs refresh
if expires_at > datetime.now() + timedelta(minutes=5):
return access_token
# Attempt refresh with optimistic locking
try:
new_token = await self._call_oauth_refresh(refresh_token)
# Update only if version hasn't changed
result = await conn.execute("""
UPDATE oauth_tokens
SET access_token = $1, refresh_token = $2,
expires_at = $3, version = version + 1
WHERE user_id = $4 AND version = $5
""", new_token.access_token, new_token.refresh_token,
new_token.expires_at, user_id, version)
if result == "UPDATE 0":
# Version conflict - retry
return await self.get_valid_token(user_id)
return new_token.access_token
except Exception as e:
# Refresh failed, could retry
raise e
Database Advisory Locks (Fencing)
PostgreSQL’s advisory locks provide a more predictable solution by preventing conflicts proactively. Using pg_advisory_lock(), only one process can refresh the token while others wait. This guarantees exactly one refresh operation per token expiration cycle.
class FencingTokenManager:
def __init__(self, db_pool):
self.db_pool = db_pool
async def get_valid_token(self, user_id):
async with self.db_pool.acquire() as conn:
# Check if token needs refresh
token_data = await conn.fetchrow("""
SELECT access_token, refresh_token, expires_at
FROM oauth_tokens WHERE user_id = $1
""", user_id)
if not token_data:
raise Exception("No token found")
access_token, refresh_token, expires_at = token_data
if expires_at > datetime.now() + timedelta(minutes=5):
return access_token
# Acquire advisory lock (blocks until available)
lock_id = hash(str(user_id)) % (2**31)
await conn.execute("SELECT pg_advisory_lock($1)", lock_id)
try:
# Double-check if another process already refreshed
fresh_data = await conn.fetchrow("""
SELECT access_token, refresh_token, expires_at
FROM oauth_tokens WHERE user_id = $1
""", user_id)
if fresh_data[2] > datetime.now() + timedelta(minutes=5):
return fresh_data[0] # Another process refreshed it
# Actually refresh the token
new_token = await self._call_oauth_refresh(fresh_data[1])
await conn.execute("""
UPDATE oauth_tokens
SET access_token = $1, refresh_token = $2, expires_at = $3
WHERE user_id = $4
""", new_token.access_token, new_token.refresh_token,
new_token.expires_at, user_id)
return new_token.access_token
finally:
# Always release the lock
await conn.execute("SELECT pg_advisory_unlock($1)", lock_id)
However, advisory locks introduce a significant problem in high-concurrency scenarios: connection pool exhaustion. When 100 concurrent operations hit the same advisory lock, all 100 database connections become blocked, potentially starving other parts of the application.
The Connection Pool Dilemma
The core issue isn’t the advisory lock itself, but rather that blocking 100 database connections simultaneously can exhaust connection pools. FastAPI applications typically use connection pools with 10-50 connections, meaning the blocking behavior can impact the entire application’s database access patterns.
This led to exploring application-level coordination using asyncio locks and in-memory caching, which avoids database connection blocking but requires careful state management across application restarts:
class ApplicationLevelTokenManager:
def __init__(self, db_pool):
self.db_pool = db_pool
self.token_cache = {} # user_id -> (token, expires_at)
self.refresh_locks = {} # user_id -> asyncio.Lock
async def get_valid_token(self, user_id):
# Check in-memory cache first (no DB connection needed)
if user_id in self.token_cache:
token, expires_at = self.token_cache[user_id]
if expires_at > datetime.now() + timedelta(minutes=5):
return token
# Need to refresh - use application-level lock
if user_id not in self.refresh_locks:
self.refresh_locks[user_id] = asyncio.Lock()
async with self.refresh_locks[user_id]:
# Double-check cache (another coroutine might have refreshed)
if user_id in self.token_cache:
token, expires_at = self.token_cache[user_id]
if expires_at > datetime.now() + timedelta(minutes=5):
return token
# Actually refresh (only ONE DB connection used)
async with self.db_pool.acquire() as conn:
token_data = await conn.fetchrow("""
SELECT refresh_token FROM oauth_tokens WHERE user_id = $1
""", user_id)
new_token = await self._call_oauth_refresh(token_data[0])
await conn.execute("""
UPDATE oauth_tokens
SET access_token = $1, refresh_token = $2, expires_at = $3
WHERE user_id = $4
""", new_token.access_token, new_token.refresh_token,
new_token.expires_at, user_id)
# Cache the result
self.token_cache[user_id] = (new_token.access_token, new_token.expires_at)
return new_token.access_token
An Alternative Approach: Timestamp-Based Claims
A potentially elegant solution emerged: using timestamp-based fencing with a claim mechanism. The approach involves adding a refresh_started_at column to track ongoing refresh operations:
-- Add the fencing column
ALTER TABLE oauth_tokens ADD COLUMN refresh_started_at TIMESTAMP;
class TimestampClaimTokenManager:
def __init__(self, db_pool):
self.db_pool = db_pool
self.refresh_timeout = 30 # seconds
async def get_valid_token(self, user_id):
async with self.db_pool.acquire() as conn:
# Check if token needs refresh
token_data = await conn.fetchrow("""
SELECT access_token, expires_at, refresh_started_at
FROM oauth_tokens WHERE user_id = $1
""", user_id)
if not token_data:
raise Exception("No token found")
access_token, expires_at, refresh_started_at = token_data
# Token is still valid
if expires_at > datetime.now() + timedelta(minutes=5):
return access_token
# Token needs refresh - try to claim it
return await self._refresh_with_claim(user_id, conn)
async def _refresh_with_claim(self, user_id, conn):
claim_timestamp = datetime.now()
# Try to claim the refresh operation
result = await conn.fetchrow("""
UPDATE oauth_tokens
SET refresh_started_at = $1
WHERE user_id = $2
AND (
refresh_started_at IS NULL
OR refresh_started_at < $3
)
RETURNING refresh_started_at, refresh_token
""",
claim_timestamp,
user_id,
datetime.now() - timedelta(seconds=self.refresh_timeout)
)
if not result:
# Failed to claim - wait and retry
await asyncio.sleep(0.1)
return await self.get_valid_token(user_id)
returned_timestamp, refresh_token = result
# Check if we successfully claimed it
if returned_timestamp == claim_timestamp:
# We got the claim! Do the actual refresh
try:
new_token = await self._call_oauth_refresh(refresh_token)
# Update with new token and clear the claim
await conn.execute("""
UPDATE oauth_tokens
SET access_token = $1,
refresh_token = $2,
expires_at = $3,
refresh_started_at = NULL
WHERE user_id = $4
""", new_token.access_token, new_token.refresh_token,
new_token.expires_at, user_id)
return new_token.access_token
except Exception as e:
# Clear the claim on failure
await conn.execute("""
UPDATE oauth_tokens
SET refresh_started_at = NULL
WHERE user_id = $1
""", user_id)
raise e
else:
# Someone else claimed it, wait and retry
await asyncio.sleep(0.1)
return await self.get_valid_token(user_id)
This pattern attempts to “claim” the right to refresh by updating the timestamp, succeeding only if no other process has recently claimed it. The operation returns the actual stored timestamp, allowing the caller to verify successful claim acquisition by comparing with their submitted value.
Lessons Learned
This exploration revealed several important insights about distributed systems design:
Multiple Valid Approaches: Each concurrency control pattern has its place. Optimistic locking works well for low-contention scenarios, advisory locks provide strong guarantees but can block resources, application-level coordination avoids database blocking but requires careful state management, and timestamp-based claims offer a middle ground.
Understanding Trade-offs: Every approach has costs. Optimistic locking suffers under high contention, advisory locks can exhaust connection pools, application-level solutions require restart handling, and timestamp-based claims need careful timeout tuning.
Implementation Details Matter: The difference between a working solution and a problematic one often lies in subtle implementation details—proper error handling, timeout values, retry strategies, and failure recovery mechanisms.
Context Drives Choice: The “best” solution depends heavily on specific constraints. For single-instance applications, simple application-level coordination might suffice. For distributed systems with high concurrency, more sophisticated approaches become necessary.
Conclusion
The journey from identifying a race condition in OAuth token refresh to exploring various concurrency control mechanisms illustrates the complexity inherent in distributed systems design. Each approach—optimistic locking, advisory locks, application-level coordination, and timestamp-based claims—offers distinct advantages and trade-offs.
The key insight is that there are no universal solutions in distributed systems. Success depends on matching the concurrency control mechanism to your specific constraints: transaction volume, acceptable latency, infrastructure complexity, and failure tolerance requirements.
For OAuth token refresh specifically, the choice might depend on your deployment architecture. Single-instance applications might benefit from application-level coordination, while distributed systems might prefer database-level approaches. High-throughput systems might choose optimistic locking despite retry costs, while systems prioritizing predictability might accept the overhead of advisory locks.
The most important skill isn’t memorizing every pattern, but rather understanding the fundamental trade-offs and being able to reason about which approach best fits your specific system’s needs. Whether you choose well-established patterns or explore novel approaches, the foundation remains the same: careful analysis, thorough testing, and robust error handling.
2 Comments
Your other option is to get one process to refresh the oauth token earlier than expiry, that way you never have to lock or wait for token refresh.
Only downside is that the tokens are renewed sooner than required, but it keeps the whole system flowing nicely without having any interruptions.
It’s similar to how Letsencrypt hands out 90 day certificates but automatically renews when about 30 days left to go
That definitely is a feasible option, I used that approach in one of my project and it’s been working well