Finance APIs Series — Part 5 of 5

Building Your Aggregator — One API to Rule Them All

The capstone project: build a production-grade data aggregator that unifies price, fundamental, sentiment, and options data from 5+ sources. Rate limiting, caching, failover, MCP integration, and Docker deployment.

Unified API Cache Layer Failover MCP Server
Finance APIs — The Ultimate Guide5/5
ArchitectureRate LimitingCachingNormalizationFailoverMCP IntegrationDocker DeployKey Takeaways
System Architecture

The Aggregator Architecture

The goal: a single unified API that abstracts away the complexity of 5+ data providers. Your application code calls aggregator.get_quote("AAPL") and doesn't care whether the data comes from Polygon, Finnhub, or a local cache.

1

Providers Layer

Adapters for each API (Yahoo, Polygon, Finnhub, FMP, StockTwits). Handles authentication and response parsing.

2

Rate Limiter

Token bucket per provider. Queues requests when limits are hit. Round-robin across providers.

3

Cache Layer

Redis for hot data (quotes, 60s TTL). SQLite for cold data (daily bars, financials, infinite TTL).

4

Normalizer

Unified schema across all providers. Consistent field names, data types, and timestamps.

5

Failover

Primary → fallback → cache hierarchy. Automatic retry with exponential backoff.

6

MCP Server

Expose the aggregator as an MCP server for AI agents (Claude, GPT, etc.).

Data Flow Architecture

The DailyTickers Approach

This is exactly how articles.dailytickers.com works. Our MCP Gateway aggregates data from multiple providers through a unified interface. When Claude generates an analysis article, it calls GetInstruments or QueryData — a single function that internally routes to the best available provider, with automatic failover and caching. The articles you read on this site are powered by this exact architecture.

Rate Limit Management

Rate Limit Management

Every API has rate limits. The aggregator must respect them while maximizing throughput. Here's a production token bucket implementation:

Python
import time
import asyncio
from dataclasses import dataclass, field
from typing import Dict

@dataclass
class TokenBucket:
    """Token bucket rate limiter."""
    capacity: int          # Max tokens
    refill_rate: float     # Tokens per second
    tokens: float = field(init=False)
    last_refill: float = field(init=False)

    def __post_init__(self):
        self.tokens = self.capacity
        self.last_refill = time.monotonic()

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

    async def acquire(self, tokens=1):
        """Wait until tokens are available, then consume."""
        while True:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            # Wait for tokens to refill
            wait_time = (tokens - self.tokens) / self.refill_rate
            await asyncio.sleep(wait_time)

class RateLimitManager:
    """Manage rate limits for multiple API providers."""

    LIMITS = {
        "yahoo": TokenBucket(capacity=30, refill_rate=0.5),       # ~30 req/min
        "polygon": TokenBucket(capacity=5, refill_rate=5/60),        # 5 req/min (free)
        "finnhub": TokenBucket(capacity=60, refill_rate=1),          # 60 req/min
        "fmp": TokenBucket(capacity=5, refill_rate=250/(24*3600)),   # 250 req/day
        "twelvedata": TokenBucket(capacity=8, refill_rate=800/(24*3600)), # 800 req/day
        "stocktwits": TokenBucket(capacity=200, refill_rate=200/3600),  # 200 req/hr
    }

    async def acquire(self, provider: str):
        bucket = self.LIMITS.get(provider)
        if bucket:
            await bucket.acquire()

    def available_tokens(self, provider: str) -> float:
        bucket = self.LIMITS.get(provider)
        if bucket:
            bucket._refill()
            return bucket.tokens
        return 0

Round-Robin Provider Selection

Python
class ProviderRouter:
    """Select the best available provider based on rate limits and data type."""

    PROVIDER_PRIORITY = {
        "quote": ["finnhub", "polygon", "twelvedata", "yahoo"],
        "bars_daily": ["yahoo", "polygon", "twelvedata"],
        "bars_intraday": ["polygon", "twelvedata", "finnhub"],
        "financials": ["fmp", "yahoo"],
        "sentiment": ["stocktwits", "finnhub"],
        "options": ["yahoo", "polygon"],
    }

    def __init__(self, rate_manager: RateLimitManager):
        self.rate_manager = rate_manager

    def select(self, data_type: str) -> str:
        """Pick the provider with the most available tokens."""
        providers = self.PROVIDER_PRIORITY.get(data_type, ["yahoo"])
        best = None
        best_tokens = -1

        for p in providers:
            tokens = self.rate_manager.available_tokens(p)
            if tokens > best_tokens:
                best = p
                best_tokens = tokens

        return best or providers[0]
Caching Layer

Two-Tier Caching Strategy

Cache Strategy
Hot (Redis, 60s TTL): Quotes, real-time data  —  Cold (SQLite, infinite): Daily bars, financials
Python
import json
import sqlite3
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Any

class CacheLayer:
    """Two-tier cache: in-memory (hot) + SQLite (cold)."""

    TTL_CONFIG = {
        "quote": 60,               # 60 seconds
        "bars_intraday": 300,      # 5 minutes
        "bars_daily": 86400,       # 24 hours (daily bars don't change after close)
        "financials": 604800,      # 7 days (quarterly data)
        "sentiment": 300,          # 5 minutes
        "options": 120,             # 2 minutes
        "insider": 86400,          # 24 hours
    }

    def __init__(self, db_path="cache.db"):
        # Hot cache (in-memory dict, replace with Redis in production)
        self.hot = {}
        # Cold cache (SQLite)
        self.db = sqlite3.connect(db_path)
        self.db.execute("""
            CREATE TABLE IF NOT EXISTS cache (
                key TEXT PRIMARY KEY,
                data TEXT,
                data_type TEXT,
                created_at REAL,
                ttl INTEGER
            )
        """)

    def _key(self, data_type, symbol, **kwargs):
        raw = f"{data_type}:{symbol}:{json.dumps(kwargs, sort_keys=True)}"
        return hashlib.md5(raw.encode()).hexdigest()

    def get(self, data_type: str, symbol: str, **kwargs) -> Optional[Any]:
        key = self._key(data_type, symbol, **kwargs)
        ttl = self.TTL_CONFIG.get(data_type, 300)
        now = datetime.now().timestamp()

        # Check hot cache first
        if key in self.hot:
            entry = self.hot[key]
            if now - entry["created_at"] < ttl:
                return entry["data"]

        # Check cold cache
        row = self.db.execute(
            "SELECT data, created_at, ttl FROM cache WHERE key = ?", (key,)
        ).fetchone()

        if row and (now - row[1]) < row[2]:
            data = json.loads(row[0])
            # Promote to hot cache
            self.hot[key] = {"data": data, "created_at": row[1]}
            return data

        return None

    def set(self, data_type: str, symbol: str, data: Any, **kwargs):
        key = self._key(data_type, symbol, **kwargs)
        ttl = self.TTL_CONFIG.get(data_type, 300)
        now = datetime.now().timestamp()

        # Write to both caches
        self.hot[key] = {"data": data, "created_at": now}
        self.db.execute(
            "INSERT OR REPLACE INTO cache VALUES (?, ?, ?, ?, ?)",
            (key, json.dumps(data), data_type, now, ttl)
        )
        self.db.commit()

When to Use Redis vs. SQLite

SQLite is perfect for single-machine deployments — zero configuration, file-based, and handles millions of rows with sub-millisecond reads. Use it for the aggregator's local cache.

Redis is needed when you have multiple instances of the aggregator (horizontal scaling) or when you need pub/sub for real-time updates. For a single-machine setup, SQLite + an in-memory dict is simpler and faster.

Data Normalization

Unified Schema

Different providers return data in wildly different formats. The normalizer converts everything to a consistent schema:

Field Yahoo Polygon Finnhub Normalized
PriceregularMarketPricelastTrade.pcprice
Change %regularMarketChangePercenttodaysChangePercdpchange_pct
VolumeregularMarketVolumeday.vv (missing)volume
OpenregularMarketOpenday.ooopen
HighregularMarketDayHighday.hhhigh
LowregularMarketDayLowday.lllow
TimestampregularMarketTime (unix)updated (ns)t (unix)timestamp (ISO)
Python
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional

@dataclass
class Quote:
    """Normalized quote — same structure regardless of source."""
    symbol: str
    price: float
    open: float
    high: float
    low: float
    close: float
    volume: int
    change_pct: float
    timestamp: str
    source: str

class Normalizer:
    """Convert provider-specific responses to unified schema."""

    @staticmethod
    def normalize_quote(raw: dict, source: str, symbol: str) -> Quote:
        if source == "yahoo":
            return Quote(
                symbol=symbol, price=raw.get("regularMarketPrice", 0),
                open=raw.get("regularMarketOpen", 0),
                high=raw.get("regularMarketDayHigh", 0),
                low=raw.get("regularMarketDayLow", 0),
                close=raw.get("regularMarketPreviousClose", 0),
                volume=raw.get("regularMarketVolume", 0),
                change_pct=raw.get("regularMarketChangePercent", 0),
                timestamp=datetime.now().isoformat(), source=source
            )
        elif source == "finnhub":
            return Quote(
                symbol=symbol, price=raw.get("c", 0),
                open=raw.get("o", 0), high=raw.get("h", 0),
                low=raw.get("l", 0), close=raw.get("pc", 0),
                volume=0,  # Finnhub quote doesn't include volume
                change_pct=raw.get("dp", 0),
                timestamp=datetime.fromtimestamp(raw.get("t", 0)).isoformat(),
                source=source
            )
        elif source == "polygon":
            tick = raw.get("ticker", {})
            return Quote(
                symbol=symbol, price=tick.get("lastTrade", {}).get("p", 0),
                open=tick.get("day", {}).get("o", 0),
                high=tick.get("day", {}).get("h", 0),
                low=tick.get("day", {}).get("l", 0),
                close=tick.get("prevDay", {}).get("c", 0),
                volume=tick.get("day", {}).get("v", 0),
                change_pct=tick.get("todaysChangePerc", 0),
                timestamp=datetime.now().isoformat(), source=source
            )
        raise ValueError(f"Unknown source: {source}")
Failover Strategy

Failover & Resilience

Python
import asyncio
import logging

logger = logging.getLogger("aggregator")

class FinanceAggregator:
    """The main aggregator — unified API over multiple providers."""

    def __init__(self):
        self.cache = CacheLayer()
        self.rate_manager = RateLimitManager()
        self.router = ProviderRouter(self.rate_manager)
        self.normalizer = Normalizer()
        self.providers = {
            "yahoo": YahooProvider(),
            "polygon": PolygonProvider("POLYGON_KEY"),
            "finnhub": FinnhubProvider("FINNHUB_KEY"),
            "fmp": FMPProvider("FMP_KEY"),
            "stocktwits": StockTwitsProvider(),
        }
        self.health = {p: True for p in self.providers}

    async def get_quote(self, symbol: str) -> Quote:
        """Get a quote with failover: cache -> primary -> fallback -> stale cache."""

        # 1. Check cache
        cached = self.cache.get("quote", symbol)
        if cached:
            return Quote(**cached)

        # 2. Try providers in priority order
        providers = ["finnhub", "polygon", "yahoo"]
        for provider_name in providers:
            if not self.health[provider_name]:
                continue

            try:
                await self.rate_manager.acquire(provider_name)
                provider = self.providers[provider_name]
                raw = provider.get_quote(symbol)
                quote = self.normalizer.normalize_quote(raw, provider_name, symbol)

                # Cache the result
                self.cache.set("quote", symbol, asdict(quote))
                self.health[provider_name] = True
                return quote

            except Exception as e:
                logger.warning(f"{provider_name} failed for {symbol}: {e}")
                self.health[provider_name] = False
                # Schedule health check in 60s
                asyncio.get_event_loop().call_later(60, self._reset_health, provider_name)

        # 3. Last resort: stale cache (ignore TTL)
        stale = self.cache.get("quote", symbol)  # Would need a force_stale param
        if stale:
            logger.warning(f"Serving stale cache for {symbol}")
            return Quote(**stale)

        raise Exception(f"All providers failed for {symbol}")

    def _reset_health(self, provider):
        self.health[provider] = True
        logger.info(f"Reset health for {provider}")

Cost Optimization

Strategy Monthly Cost Coverage Real-Time
Free Stack $0 Yahoo + Finnhub + FMP + StockTwits Finnhub WS only
Budget Pro $49 + Polygon Starter ($29) + EODHD ($20) SIP feed + global
Full Production $319 + Polygon Business ($199) + UW ($40) + Benzinga ($199) Full OPRA + news
MCP Server Integration

Exposing Your Aggregator as an MCP Server

The Model Context Protocol (MCP) lets AI agents (Claude, GPT, etc.) call your aggregator as a tool. This is how DailyTickers's MCP Gateway works — Claude can call GetMarketOverview or QueryData to get real-time market data during article generation.

What Is MCP?

MCP (Model Context Protocol) is an open standard by Anthropic that lets AI models interact with external tools and data sources. An MCP server exposes "tools" (functions) that an AI can call. By wrapping your aggregator as an MCP server, any AI agent can query live market data, run screeners, or fetch financials — just by describing what they need in natural language.

Python
from mcp.server import Server
from mcp.types import Tool, TextContent

# Initialize MCP server
server = Server("finance-aggregator")

# Initialize our aggregator
aggregator = FinanceAggregator()

@server.tool()
async def get_quote(symbol: str) -> str:
    """Get real-time quote for a stock symbol.

    Args:
        symbol: Stock ticker (e.g., AAPL, MSFT, TSLA)

    Returns:
        JSON with price, change, volume, and source
    """
    quote = await aggregator.get_quote(symbol)
    return json.dumps(asdict(quote), indent=2)

@server.tool()
async def get_bars(symbol: str, period: str = "1y", interval: str = "1d") -> str:
    """Get historical OHLCV bars for a stock.

    Args:
        symbol: Stock ticker
        period: Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 5y)
        interval: Bar interval (1m, 5m, 1h, 1d)

    Returns:
        JSON array of {date, open, high, low, close, volume}
    """
    bars = await aggregator.get_bars(symbol, period, interval)
    return json.dumps([asdict(b) for b in bars])

@server.tool()
async def get_financials(symbol: str, statement: str = "income") -> str:
    """Get financial statements for a company.

    Args:
        symbol: Stock ticker
        statement: Type (income, balance, cashflow, ratios)

    Returns:
        JSON with quarterly/annual financial data
    """
    data = await aggregator.get_financials(symbol, statement)
    return json.dumps(data)

@server.tool()
async def get_sentiment(symbol: str) -> str:
    """Get multi-source sentiment analysis for a stock.

    Args:
        symbol: Stock ticker

    Returns:
        JSON with composite sentiment, individual source scores, and convergence
    """
    sentiment = await aggregator.get_sentiment(symbol)
    return json.dumps(sentiment)

@server.tool()
async def market_overview() -> str:
    """Get a complete market overview: indices, sectors, commodities, crypto, rates.

    Returns:
        JSON with market snapshot across all asset classes
    """
    overview = await aggregator.get_market_overview()
    return json.dumps(overview)

# Run the server
if __name__ == "__main__":
    import asyncio
    asyncio.run(server.run())

MCP Configuration (claude_desktop_config.json)

JSON
{
  "mcpServers": {
    "finance-aggregator": {
      "command": "python",
      "args": ["path/to/mcp_server.py"],
      "env": {
        "POLYGON_KEY": "your_key",
        "FINNHUB_KEY": "your_key",
        "FMP_KEY": "your_key"
      }
    }
  }
}
Docker Deployment

Docker Setup

Dockerfile
FROM python:3.12-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create data directory for SQLite cache
RUN mkdir -p /data

# Environment variables
ENV CACHE_DB=/data/cache.db
ENV LOG_LEVEL=INFO

# Expose MCP server port
EXPOSE 8080

CMD ["python", "mcp_server.py"]
docker-compose.yml
version: '3.8'

services:
  aggregator:
    build: .
    ports:
      - "8080:8080"
    environment:
      - POLYGON_KEY=${POLYGON_KEY}
      - FINNHUB_KEY=${FINNHUB_KEY}
      - FMP_KEY=${FMP_KEY}
    volumes:
      - cache_data:/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  cache_data:
  redis_data:
requirements.txt
yfinance>=0.2.31
requests>=2.31.0
aiohttp>=3.9.0
redis>=5.0.0
mcp>=0.1.0
scipy>=1.11.0
numpy>=1.26.0
pandas>=2.1.0
praw>=7.7.0
pytrends>=4.9.0
finnhub-python>=2.4.0

Monitoring: API Health Dashboard

Series Conclusion

Key Takeaways — The Complete Series

What You've Learned

The Complete Free Stack

Data Type Free Provider Rate Limit What You Get
Historical OHLCVYahoo Finance~2K req/hr20+ years, global, adjusted
Real-Time QuotesFinnhub60 req/minWebSocket + REST, US stocks
Technical IndicatorsTwelve Data800 req/day100+ indicators, global
FinancialsFMP250 req/dayIncome, balance, cashflow, ratios
SEC FilingsSEC EDGAR10 req/secAll filings, XBRL, full-text search
Social SentimentStockTwits200 req/hrMessages, sentiment tags, trending
Reddit MentionsApeWisdomUnlimitedWSB rankings, mention counts
Search InterestGoogle TrendsUnlimitedSearch volume index
Options ChainsYahoo Finance~2K req/hrChains, IV, volume, OI
Congress TradesQuiverQuantGenerousSTOCK Act disclosures
Total Cost of the Free Stack
10 data sources × $0/month = $0/month

What to Build Next

You now have the knowledge to build any financial data application. Here are some project ideas:

How does the DailyTickers MCP Gateway work?

The DailyTickers MCP Gateway (mcp__dailytickers__* tools) is essentially a production version of the aggregator described in this series. It provides 58+ data types through a unified interface: quotes, bars, financials, sentiment, options, earnings, analyst actions, insider transactions, and more. When Claude generates the daily briefing or a stock analysis, it calls these MCP tools to get live market data — the exact same architecture you've learned in this series.

What if I want to sell my aggregator as a service?

If you're redistributing data, you need proper data licensing. Yahoo Finance explicitly forbids redistribution. SEC EDGAR data is public domain. For a commercial product, you'll need Polygon.io ($199/mo minimum for redistribution rights), or a direct exchange data license. The aggregator architecture works the same — you just swap in licensed providers.

Finance APIs — The Ultimate Guide5/5