The capstone project: build a production-grade data aggregator that unifies price, fundamental, sentiment, and options data from 5+ sources. Rate limiting, caching, failover, MCP integration, and Docker deployment.
The goal: a single unified API that abstracts away the complexity of 5+ data providers. Your application code calls aggregator.get_quote("AAPL") and doesn't care whether the data comes from Polygon, Finnhub, or a local cache.
Adapters for each API (Yahoo, Polygon, Finnhub, FMP, StockTwits). Handles authentication and response parsing.
Token bucket per provider. Queues requests when limits are hit. Round-robin across providers.
Redis for hot data (quotes, 60s TTL). SQLite for cold data (daily bars, financials, infinite TTL).
Unified schema across all providers. Consistent field names, data types, and timestamps.
Primary → fallback → cache hierarchy. Automatic retry with exponential backoff.
Expose the aggregator as an MCP server for AI agents (Claude, GPT, etc.).
This is exactly how articles.dailytickers.com works. Our MCP Gateway aggregates data from multiple providers through a unified interface. When Claude generates an analysis article, it calls GetInstruments or QueryData — a single function that internally routes to the best available provider, with automatic failover and caching. The articles you read on this site are powered by this exact architecture.
Every API has rate limits. The aggregator must respect them while maximizing throughput. Here's a production token bucket implementation:
import time import asyncio from dataclasses import dataclass, field from typing import Dict @dataclass class TokenBucket: """Token bucket rate limiter.""" capacity: int # Max tokens refill_rate: float # Tokens per second tokens: float = field(init=False) last_refill: float = field(init=False) def __post_init__(self): self.tokens = self.capacity self.last_refill = time.monotonic() def _refill(self): now = time.monotonic() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate) self.last_refill = now async def acquire(self, tokens=1): """Wait until tokens are available, then consume.""" while True: self._refill() if self.tokens >= tokens: self.tokens -= tokens return True # Wait for tokens to refill wait_time = (tokens - self.tokens) / self.refill_rate await asyncio.sleep(wait_time) class RateLimitManager: """Manage rate limits for multiple API providers.""" LIMITS = { "yahoo": TokenBucket(capacity=30, refill_rate=0.5), # ~30 req/min "polygon": TokenBucket(capacity=5, refill_rate=5/60), # 5 req/min (free) "finnhub": TokenBucket(capacity=60, refill_rate=1), # 60 req/min "fmp": TokenBucket(capacity=5, refill_rate=250/(24*3600)), # 250 req/day "twelvedata": TokenBucket(capacity=8, refill_rate=800/(24*3600)), # 800 req/day "stocktwits": TokenBucket(capacity=200, refill_rate=200/3600), # 200 req/hr } async def acquire(self, provider: str): bucket = self.LIMITS.get(provider) if bucket: await bucket.acquire() def available_tokens(self, provider: str) -> float: bucket = self.LIMITS.get(provider) if bucket: bucket._refill() return bucket.tokens return 0
class ProviderRouter: """Select the best available provider based on rate limits and data type.""" PROVIDER_PRIORITY = { "quote": ["finnhub", "polygon", "twelvedata", "yahoo"], "bars_daily": ["yahoo", "polygon", "twelvedata"], "bars_intraday": ["polygon", "twelvedata", "finnhub"], "financials": ["fmp", "yahoo"], "sentiment": ["stocktwits", "finnhub"], "options": ["yahoo", "polygon"], } def __init__(self, rate_manager: RateLimitManager): self.rate_manager = rate_manager def select(self, data_type: str) -> str: """Pick the provider with the most available tokens.""" providers = self.PROVIDER_PRIORITY.get(data_type, ["yahoo"]) best = None best_tokens = -1 for p in providers: tokens = self.rate_manager.available_tokens(p) if tokens > best_tokens: best = p best_tokens = tokens return best or providers[0]
import json import sqlite3 import hashlib from datetime import datetime, timedelta from typing import Optional, Any class CacheLayer: """Two-tier cache: in-memory (hot) + SQLite (cold).""" TTL_CONFIG = { "quote": 60, # 60 seconds "bars_intraday": 300, # 5 minutes "bars_daily": 86400, # 24 hours (daily bars don't change after close) "financials": 604800, # 7 days (quarterly data) "sentiment": 300, # 5 minutes "options": 120, # 2 minutes "insider": 86400, # 24 hours } def __init__(self, db_path="cache.db"): # Hot cache (in-memory dict, replace with Redis in production) self.hot = {} # Cold cache (SQLite) self.db = sqlite3.connect(db_path) self.db.execute(""" CREATE TABLE IF NOT EXISTS cache ( key TEXT PRIMARY KEY, data TEXT, data_type TEXT, created_at REAL, ttl INTEGER ) """) def _key(self, data_type, symbol, **kwargs): raw = f"{data_type}:{symbol}:{json.dumps(kwargs, sort_keys=True)}" return hashlib.md5(raw.encode()).hexdigest() def get(self, data_type: str, symbol: str, **kwargs) -> Optional[Any]: key = self._key(data_type, symbol, **kwargs) ttl = self.TTL_CONFIG.get(data_type, 300) now = datetime.now().timestamp() # Check hot cache first if key in self.hot: entry = self.hot[key] if now - entry["created_at"] < ttl: return entry["data"] # Check cold cache row = self.db.execute( "SELECT data, created_at, ttl FROM cache WHERE key = ?", (key,) ).fetchone() if row and (now - row[1]) < row[2]: data = json.loads(row[0]) # Promote to hot cache self.hot[key] = {"data": data, "created_at": row[1]} return data return None def set(self, data_type: str, symbol: str, data: Any, **kwargs): key = self._key(data_type, symbol, **kwargs) ttl = self.TTL_CONFIG.get(data_type, 300) now = datetime.now().timestamp() # Write to both caches self.hot[key] = {"data": data, "created_at": now} self.db.execute( "INSERT OR REPLACE INTO cache VALUES (?, ?, ?, ?, ?)", (key, json.dumps(data), data_type, now, ttl) ) self.db.commit()
SQLite is perfect for single-machine deployments — zero configuration, file-based, and handles millions of rows with sub-millisecond reads. Use it for the aggregator's local cache.
Redis is needed when you have multiple instances of the aggregator (horizontal scaling) or when you need pub/sub for real-time updates. For a single-machine setup, SQLite + an in-memory dict is simpler and faster.
Different providers return data in wildly different formats. The normalizer converts everything to a consistent schema:
| Field | Yahoo | Polygon | Finnhub | Normalized |
|---|---|---|---|---|
| Price | regularMarketPrice | lastTrade.p | c | price |
| Change % | regularMarketChangePercent | todaysChangePerc | dp | change_pct |
| Volume | regularMarketVolume | day.v | v (missing) | volume |
| Open | regularMarketOpen | day.o | o | open |
| High | regularMarketDayHigh | day.h | h | high |
| Low | regularMarketDayLow | day.l | l | low |
| Timestamp | regularMarketTime (unix) | updated (ns) | t (unix) | timestamp (ISO) |
from dataclasses import dataclass, asdict from datetime import datetime from typing import Optional @dataclass class Quote: """Normalized quote — same structure regardless of source.""" symbol: str price: float open: float high: float low: float close: float volume: int change_pct: float timestamp: str source: str class Normalizer: """Convert provider-specific responses to unified schema.""" @staticmethod def normalize_quote(raw: dict, source: str, symbol: str) -> Quote: if source == "yahoo": return Quote( symbol=symbol, price=raw.get("regularMarketPrice", 0), open=raw.get("regularMarketOpen", 0), high=raw.get("regularMarketDayHigh", 0), low=raw.get("regularMarketDayLow", 0), close=raw.get("regularMarketPreviousClose", 0), volume=raw.get("regularMarketVolume", 0), change_pct=raw.get("regularMarketChangePercent", 0), timestamp=datetime.now().isoformat(), source=source ) elif source == "finnhub": return Quote( symbol=symbol, price=raw.get("c", 0), open=raw.get("o", 0), high=raw.get("h", 0), low=raw.get("l", 0), close=raw.get("pc", 0), volume=0, # Finnhub quote doesn't include volume change_pct=raw.get("dp", 0), timestamp=datetime.fromtimestamp(raw.get("t", 0)).isoformat(), source=source ) elif source == "polygon": tick = raw.get("ticker", {}) return Quote( symbol=symbol, price=tick.get("lastTrade", {}).get("p", 0), open=tick.get("day", {}).get("o", 0), high=tick.get("day", {}).get("h", 0), low=tick.get("day", {}).get("l", 0), close=tick.get("prevDay", {}).get("c", 0), volume=tick.get("day", {}).get("v", 0), change_pct=tick.get("todaysChangePerc", 0), timestamp=datetime.now().isoformat(), source=source ) raise ValueError(f"Unknown source: {source}")
import asyncio import logging logger = logging.getLogger("aggregator") class FinanceAggregator: """The main aggregator — unified API over multiple providers.""" def __init__(self): self.cache = CacheLayer() self.rate_manager = RateLimitManager() self.router = ProviderRouter(self.rate_manager) self.normalizer = Normalizer() self.providers = { "yahoo": YahooProvider(), "polygon": PolygonProvider("POLYGON_KEY"), "finnhub": FinnhubProvider("FINNHUB_KEY"), "fmp": FMPProvider("FMP_KEY"), "stocktwits": StockTwitsProvider(), } self.health = {p: True for p in self.providers} async def get_quote(self, symbol: str) -> Quote: """Get a quote with failover: cache -> primary -> fallback -> stale cache.""" # 1. Check cache cached = self.cache.get("quote", symbol) if cached: return Quote(**cached) # 2. Try providers in priority order providers = ["finnhub", "polygon", "yahoo"] for provider_name in providers: if not self.health[provider_name]: continue try: await self.rate_manager.acquire(provider_name) provider = self.providers[provider_name] raw = provider.get_quote(symbol) quote = self.normalizer.normalize_quote(raw, provider_name, symbol) # Cache the result self.cache.set("quote", symbol, asdict(quote)) self.health[provider_name] = True return quote except Exception as e: logger.warning(f"{provider_name} failed for {symbol}: {e}") self.health[provider_name] = False # Schedule health check in 60s asyncio.get_event_loop().call_later(60, self._reset_health, provider_name) # 3. Last resort: stale cache (ignore TTL) stale = self.cache.get("quote", symbol) # Would need a force_stale param if stale: logger.warning(f"Serving stale cache for {symbol}") return Quote(**stale) raise Exception(f"All providers failed for {symbol}") def _reset_health(self, provider): self.health[provider] = True logger.info(f"Reset health for {provider}")
| Strategy | Monthly Cost | Coverage | Real-Time |
|---|---|---|---|
| Free Stack | $0 | Yahoo + Finnhub + FMP + StockTwits | Finnhub WS only |
| Budget Pro | $49 | + Polygon Starter ($29) + EODHD ($20) | SIP feed + global |
| Full Production | $319 | + Polygon Business ($199) + UW ($40) + Benzinga ($199) | Full OPRA + news |
The Model Context Protocol (MCP) lets AI agents (Claude, GPT, etc.) call your aggregator as a tool. This is how DailyTickers's MCP Gateway works — Claude can call GetMarketOverview or QueryData to get real-time market data during article generation.
MCP (Model Context Protocol) is an open standard by Anthropic that lets AI models interact with external tools and data sources. An MCP server exposes "tools" (functions) that an AI can call. By wrapping your aggregator as an MCP server, any AI agent can query live market data, run screeners, or fetch financials — just by describing what they need in natural language.
from mcp.server import Server from mcp.types import Tool, TextContent # Initialize MCP server server = Server("finance-aggregator") # Initialize our aggregator aggregator = FinanceAggregator() @server.tool() async def get_quote(symbol: str) -> str: """Get real-time quote for a stock symbol. Args: symbol: Stock ticker (e.g., AAPL, MSFT, TSLA) Returns: JSON with price, change, volume, and source """ quote = await aggregator.get_quote(symbol) return json.dumps(asdict(quote), indent=2) @server.tool() async def get_bars(symbol: str, period: str = "1y", interval: str = "1d") -> str: """Get historical OHLCV bars for a stock. Args: symbol: Stock ticker period: Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 5y) interval: Bar interval (1m, 5m, 1h, 1d) Returns: JSON array of {date, open, high, low, close, volume} """ bars = await aggregator.get_bars(symbol, period, interval) return json.dumps([asdict(b) for b in bars]) @server.tool() async def get_financials(symbol: str, statement: str = "income") -> str: """Get financial statements for a company. Args: symbol: Stock ticker statement: Type (income, balance, cashflow, ratios) Returns: JSON with quarterly/annual financial data """ data = await aggregator.get_financials(symbol, statement) return json.dumps(data) @server.tool() async def get_sentiment(symbol: str) -> str: """Get multi-source sentiment analysis for a stock. Args: symbol: Stock ticker Returns: JSON with composite sentiment, individual source scores, and convergence """ sentiment = await aggregator.get_sentiment(symbol) return json.dumps(sentiment) @server.tool() async def market_overview() -> str: """Get a complete market overview: indices, sectors, commodities, crypto, rates. Returns: JSON with market snapshot across all asset classes """ overview = await aggregator.get_market_overview() return json.dumps(overview) # Run the server if __name__ == "__main__": import asyncio asyncio.run(server.run())
{
"mcpServers": {
"finance-aggregator": {
"command": "python",
"args": ["path/to/mcp_server.py"],
"env": {
"POLYGON_KEY": "your_key",
"FINNHUB_KEY": "your_key",
"FMP_KEY": "your_key"
}
}
}
}
FROM python:3.12-slim WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application COPY . . # Create data directory for SQLite cache RUN mkdir -p /data # Environment variables ENV CACHE_DB=/data/cache.db ENV LOG_LEVEL=INFO # Expose MCP server port EXPOSE 8080 CMD ["python", "mcp_server.py"]
version: '3.8' services: aggregator: build: . ports: - "8080:8080" environment: - POLYGON_KEY=${POLYGON_KEY} - FINNHUB_KEY=${FINNHUB_KEY} - FMP_KEY=${FMP_KEY} volumes: - cache_data:/data restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8080/health"] interval: 30s timeout: 10s retries: 3 redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data restart: unless-stopped volumes: cache_data: redis_data:
yfinance>=0.2.31 requests>=2.31.0 aiohttp>=3.9.0 redis>=5.0.0 mcp>=0.1.0 scipy>=1.11.0 numpy>=1.26.0 pandas>=2.1.0 praw>=7.7.0 pytrends>=4.9.0 finnhub-python>=2.4.0
| Data Type | Free Provider | Rate Limit | What You Get |
|---|---|---|---|
| Historical OHLCV | Yahoo Finance | ~2K req/hr | 20+ years, global, adjusted |
| Real-Time Quotes | Finnhub | 60 req/min | WebSocket + REST, US stocks |
| Technical Indicators | Twelve Data | 800 req/day | 100+ indicators, global |
| Financials | FMP | 250 req/day | Income, balance, cashflow, ratios |
| SEC Filings | SEC EDGAR | 10 req/sec | All filings, XBRL, full-text search |
| Social Sentiment | StockTwits | 200 req/hr | Messages, sentiment tags, trending |
| Reddit Mentions | ApeWisdom | Unlimited | WSB rankings, mention counts |
| Search Interest | Google Trends | Unlimited | Search volume index |
| Options Chains | Yahoo Finance | ~2K req/hr | Chains, IV, volume, OI |
| Congress Trades | QuiverQuant | Generous | STOCK Act disclosures |
You now have the knowledge to build any financial data application. Here are some project ideas:
The DailyTickers MCP Gateway (mcp__dailytickers__* tools) is essentially a production version of the aggregator described in this series. It provides 58+ data types through a unified interface: quotes, bars, financials, sentiment, options, earnings, analyst actions, insider transactions, and more. When Claude generates the daily briefing or a stock analysis, it calls these MCP tools to get live market data — the exact same architecture you've learned in this series.
If you're redistributing data, you need proper data licensing. Yahoo Finance explicitly forbids redistribution. SEC EDGAR data is public domain. For a commercial product, you'll need Polygon.io ($199/mo minimum for redistribution rights), or a direct exchange data license. The aggregator architecture works the same — you just swap in licensed providers.