OAuth Token Refresh in Distributed Systems

The Problem

Modern applications frequently integrate with third-party services through OAuth, requiring careful management of access tokens that expire over time. The challenge becomes particularly complex when multiple concurrent operations need to refresh the same user’s token simultaneously. Without proper coordination, this leads to race conditions where multiple processes attempt token refresh concurrently, potentially causing OAuth provider rate limiting, token invalidation, or inconsistent application state.

Consider a FastAPI application making 100 parallel API calls to a third-party service. When the OAuth token expires, all 100 operations might detect the need for refresh simultaneously, creating a classic distributed systems coordination problem.

The Classical Solutions

Optimistic Locking

Optimistic locking assumes conflicts are rare and uses version numbers or timestamps to detect conflicts at commit time. While efficient under low contention, it suffers from retry storms when conflicts become frequent. For OAuth token refresh, this means multiple processes would attempt refresh simultaneously, with all but one failing and retrying—wasteful and potentially problematic with OAuth provider rate limits.

class OptimisticTokenManager:
    def __init__(self, db_pool):
        self.db_pool = db_pool

    async def get_valid_token(self, user_id):
        async with self.db_pool.acquire() as conn:
            # Read current token with version
            token_data = await conn.fetchrow("""
                SELECT access_token, refresh_token, expires_at, version
                FROM oauth_tokens WHERE user_id = $1
            """, user_id)

            if not token_data:
                raise Exception("No token found")

            access_token, refresh_token, expires_at, version = token_data

            # Check if token needs refresh
            if expires_at > datetime.now() + timedelta(minutes=5):
                return access_token

            # Attempt refresh with optimistic locking
            try:
                new_token = await self._call_oauth_refresh(refresh_token)

                # Update only if version hasn't changed
                result = await conn.execute("""
                    UPDATE oauth_tokens 
                    SET access_token = $1, refresh_token = $2, 
                        expires_at = $3, version = version + 1
                    WHERE user_id = $4 AND version = $5
                """, new_token.access_token, new_token.refresh_token, 
                     new_token.expires_at, user_id, version)

                if result == "UPDATE 0":
                    # Version conflict - retry
                    return await self.get_valid_token(user_id)

                return new_token.access_token

            except Exception as e:
                # Refresh failed, could retry
                raise e

Database Advisory Locks (Fencing)

PostgreSQL’s advisory locks provide a more predictable solution by preventing conflicts proactively. Using pg_advisory_lock(), only one process can refresh the token while others wait. This guarantees exactly one refresh operation per token expiration cycle.

class FencingTokenManager:
    def __init__(self, db_pool):
        self.db_pool = db_pool

    async def get_valid_token(self, user_id):
        async with self.db_pool.acquire() as conn:
            # Check if token needs refresh
            token_data = await conn.fetchrow("""
                SELECT access_token, refresh_token, expires_at 
                FROM oauth_tokens WHERE user_id = $1
            """, user_id)

            if not token_data:
                raise Exception("No token found")

            access_token, refresh_token, expires_at = token_data

            if expires_at > datetime.now() + timedelta(minutes=5):
                return access_token

            # Acquire advisory lock (blocks until available)
            lock_id = hash(str(user_id)) % (2**31)
            await conn.execute("SELECT pg_advisory_lock($1)", lock_id)

            try:
                # Double-check if another process already refreshed
                fresh_data = await conn.fetchrow("""
                    SELECT access_token, refresh_token, expires_at 
                    FROM oauth_tokens WHERE user_id = $1
                """, user_id)

                if fresh_data[2] > datetime.now() + timedelta(minutes=5):
                    return fresh_data[0]  # Another process refreshed it

                # Actually refresh the token
                new_token = await self._call_oauth_refresh(fresh_data[1])

                await conn.execute("""
                    UPDATE oauth_tokens 
                    SET access_token = $1, refresh_token = $2, expires_at = $3
                    WHERE user_id = $4
                """, new_token.access_token, new_token.refresh_token, 
                     new_token.expires_at, user_id)

                return new_token.access_token

            finally:
                # Always release the lock
                await conn.execute("SELECT pg_advisory_unlock($1)", lock_id)

However, advisory locks introduce a significant problem in high-concurrency scenarios: connection pool exhaustion. When 100 concurrent operations hit the same advisory lock, all 100 database connections become blocked, potentially starving other parts of the application.

The Connection Pool Dilemma

The core issue isn’t the advisory lock itself, but rather that blocking 100 database connections simultaneously can exhaust connection pools. FastAPI applications typically use connection pools with 10-50 connections, meaning the blocking behavior can impact the entire application’s database access patterns.

This led to exploring application-level coordination using asyncio locks and in-memory caching, which avoids database connection blocking but requires careful state management across application restarts:

class ApplicationLevelTokenManager:
    def __init__(self, db_pool):
        self.db_pool = db_pool
        self.token_cache = {}  # user_id -> (token, expires_at)
        self.refresh_locks = {}  # user_id -> asyncio.Lock

    async def get_valid_token(self, user_id):
        # Check in-memory cache first (no DB connection needed)
        if user_id in self.token_cache:
            token, expires_at = self.token_cache[user_id]
            if expires_at > datetime.now() + timedelta(minutes=5):
                return token

        # Need to refresh - use application-level lock
        if user_id not in self.refresh_locks:
            self.refresh_locks[user_id] = asyncio.Lock()

        async with self.refresh_locks[user_id]:
            # Double-check cache (another coroutine might have refreshed)
            if user_id in self.token_cache:
                token, expires_at = self.token_cache[user_id]
                if expires_at > datetime.now() + timedelta(minutes=5):
                    return token

            # Actually refresh (only ONE DB connection used)
            async with self.db_pool.acquire() as conn:
                token_data = await conn.fetchrow("""
                    SELECT refresh_token FROM oauth_tokens WHERE user_id = $1
                """, user_id)

                new_token = await self._call_oauth_refresh(token_data[0])

                await conn.execute("""
                    UPDATE oauth_tokens 
                    SET access_token = $1, refresh_token = $2, expires_at = $3
                    WHERE user_id = $4
                """, new_token.access_token, new_token.refresh_token, 
                     new_token.expires_at, user_id)

                # Cache the result
                self.token_cache[user_id] = (new_token.access_token, new_token.expires_at)
                return new_token.access_token

An Alternative Approach: Timestamp-Based Claims

A potentially elegant solution emerged: using timestamp-based fencing with a claim mechanism. The approach involves adding a refresh_started_at column to track ongoing refresh operations:

-- Add the fencing column
ALTER TABLE oauth_tokens ADD COLUMN refresh_started_at TIMESTAMP;
class TimestampClaimTokenManager:
    def __init__(self, db_pool):
        self.db_pool = db_pool
        self.refresh_timeout = 30  # seconds

    async def get_valid_token(self, user_id):
        async with self.db_pool.acquire() as conn:
            # Check if token needs refresh
            token_data = await conn.fetchrow("""
                SELECT access_token, expires_at, refresh_started_at
                FROM oauth_tokens WHERE user_id = $1
            """, user_id)

            if not token_data:
                raise Exception("No token found")

            access_token, expires_at, refresh_started_at = token_data

            # Token is still valid
            if expires_at > datetime.now() + timedelta(minutes=5):
                return access_token

            # Token needs refresh - try to claim it
            return await self._refresh_with_claim(user_id, conn)

    async def _refresh_with_claim(self, user_id, conn):
        claim_timestamp = datetime.now()

        # Try to claim the refresh operation
        result = await conn.fetchrow("""
            UPDATE oauth_tokens 
            SET refresh_started_at = $1
            WHERE user_id = $2 
            AND (
                refresh_started_at IS NULL 
                OR refresh_started_at < $3
            )
            RETURNING refresh_started_at, refresh_token
        """, 
        claim_timestamp, 
        user_id, 
        datetime.now() - timedelta(seconds=self.refresh_timeout)
        )

        if not result:
            # Failed to claim - wait and retry
            await asyncio.sleep(0.1)
            return await self.get_valid_token(user_id)

        returned_timestamp, refresh_token = result

        # Check if we successfully claimed it
        if returned_timestamp == claim_timestamp:
            # We got the claim! Do the actual refresh
            try:
                new_token = await self._call_oauth_refresh(refresh_token)

                # Update with new token and clear the claim
                await conn.execute("""
                    UPDATE oauth_tokens 
                    SET access_token = $1, 
                        refresh_token = $2, 
                        expires_at = $3,
                        refresh_started_at = NULL
                    WHERE user_id = $4
                """, new_token.access_token, new_token.refresh_token, 
                     new_token.expires_at, user_id)

                return new_token.access_token

            except Exception as e:
                # Clear the claim on failure
                await conn.execute("""
                    UPDATE oauth_tokens 
                    SET refresh_started_at = NULL 
                    WHERE user_id = $1
                """, user_id)
                raise e
        else:
            # Someone else claimed it, wait and retry
            await asyncio.sleep(0.1)
            return await self.get_valid_token(user_id)

This pattern attempts to “claim” the right to refresh by updating the timestamp, succeeding only if no other process has recently claimed it. The operation returns the actual stored timestamp, allowing the caller to verify successful claim acquisition by comparing with their submitted value.

Lessons Learned

This exploration revealed several important insights about distributed systems design:

Multiple Valid Approaches: Each concurrency control pattern has its place. Optimistic locking works well for low-contention scenarios, advisory locks provide strong guarantees but can block resources, application-level coordination avoids database blocking but requires careful state management, and timestamp-based claims offer a middle ground.

Understanding Trade-offs: Every approach has costs. Optimistic locking suffers under high contention, advisory locks can exhaust connection pools, application-level solutions require restart handling, and timestamp-based claims need careful timeout tuning.

Implementation Details Matter: The difference between a working solution and a problematic one often lies in subtle implementation details—proper error handling, timeout values, retry strategies, and failure recovery mechanisms.

Context Drives Choice: The “best” solution depends heavily on specific constraints. For single-instance applications, simple application-level coordination might suffice. For distributed systems with high concurrency, more sophisticated approaches become necessary.

Conclusion

The journey from identifying a race condition in OAuth token refresh to exploring various concurrency control mechanisms illustrates the complexity inherent in distributed systems design. Each approach—optimistic locking, advisory locks, application-level coordination, and timestamp-based claims—offers distinct advantages and trade-offs.

The key insight is that there are no universal solutions in distributed systems. Success depends on matching the concurrency control mechanism to your specific constraints: transaction volume, acceptable latency, infrastructure complexity, and failure tolerance requirements.

For OAuth token refresh specifically, the choice might depend on your deployment architecture. Single-instance applications might benefit from application-level coordination, while distributed systems might prefer database-level approaches. High-throughput systems might choose optimistic locking despite retry costs, while systems prioritizing predictability might accept the overhead of advisory locks.

The most important skill isn’t memorizing every pattern, but rather understanding the fundamental trade-offs and being able to reason about which approach best fits your specific system’s needs. Whether you choose well-established patterns or explore novel approaches, the foundation remains the same: careful analysis, thorough testing, and robust error handling.

Tools & Services I Use

2 Comments

  1. Your other option is to get one process to refresh the oauth token earlier than expiry, that way you never have to lock or wait for token refresh.

    Only downside is that the tokens are renewed sooner than required, but it keeps the whole system flowing nicely without having any interruptions.

    It’s similar to how Letsencrypt hands out 90 day certificates but automatically renews when about 30 days left to go

    1. That definitely is a feasible option, I used that approach in one of my project and it’s been working well

Leave a Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.