"""Market sentiment briefing — Finnhub API client.
Fetches sentiment data for watchlist tickers, generates daily briefing.
Free tier: 60 calls/minute, sufficient for daily briefing.
"""
import asyncio
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
import httpx
from icarus.core.config.staging import DATA_DIR
FINNHUB_BASE = "https://finnhub.io/api/v1"
Load from .env if available
try:
from dotenv import load_dotenv
load_dotenv('/home/hoffmann_admin/.openclaw/.env')
except ImportError:
pass
FINNHUB_API_KEY = os.environ.get("FINNHUB_API_KEY")
Cache directory for freshness tracking
CACHE_DIR = DATA_DIR / "market_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
---------------------------------------------------------------------------
Cache Management
---------------------------------------------------------------------------
def _cache_path(key: str) -> Path:
"""Get cache file path for a key."""
return CACHE_DIR / f"{key}.json"
def _load_cache(key: str, max_age_minutes: int) -> dict | None:
"""Load cached data if fresh enough."""
cache_file = _cache_path(key)
if not cache_file.exists():
return None
try:
with open(cache_file) as f:
cached = json.load(f)
cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01"))
age = datetime.now() - cached_at
if age > timedelta(minutes=max_age_minutes):
return None
return cached.get("data")
except (json.JSONDecodeError, KeyError, ValueError):
return None
def _save_cache(key: str, data: dict) -> None:
"""Save data to cache with timestamp."""
cache_file = _cache_path(key)
with open(cache_file, "w") as f:
json.dump({"_cached_at": datetime.now().isoformat(), "data": data}, f, indent=2)
---------------------------------------------------------------------------
API calls (free tier only)
---------------------------------------------------------------------------
async def _finnhub_get(endpoint: str, params: dict | None = None) -> dict:
"""Make authenticated Finnhub API request."""
if not FINNHUB_API_KEY:
raise RuntimeError("FINNHUB_API_KEY not set")
url = f"{FINNHUB_BASE}{endpoint}"
query = params or {}
query["token"] = FINNHUB_API_KEY
async with httpx.AsyncClient() as client:
resp = await client.get(url, params=query, timeout=30.0)
resp.raise_for_status()
return resp.json()
async def get_quote(ticker: str) -> dict:
"""Get real-time quote for ticker."""
# Check cache first (15 minutes for quotes)
cached = load_cache(f"quote{ticker}", max_age_minutes=15)
if cached:
return cached
data = await _finnhub_get("/quote", {"symbol": ticker})
_save_cache(f"quote_{ticker}", data)
return data
async def get_sentiment(ticker: str) -> dict:
"""Get insider sentiment (monthly aggregation)."""
# Check cache first (24 hours for sentiment)
cached = load_cache(f"sentiment{ticker}", max_age_minutes=1440)
if cached:
return cached
today = datetime.now()
from_date = (today - timedelta(days=90)).strftime("%Y-%m-%d")
to_date = today.strftime("%Y-%m-%d")
data = await _finnhub_get(
"/stock/insider-sentiment",
{"symbol": ticker, "from": from_date, "to": to_date}
)
_save_cache(f"sentiment_{ticker}", data)
return data
async def get_recommendation(ticker: str) -> list[dict]:
"""Get analyst recommendation trends."""
# Check cache first (24 hours for recommendations)
cached = load_cache(f"rec{ticker}", max_age_minutes=1440)
if cached:
return cached
data = await _finnhub_get("/stock/recommendation", {"symbol": ticker})
_save_cache(f"rec_{ticker}", data)
return data
async def get_earnings_surprises(ticker: str) -> list[dict]:
"""Get earnings surprises (last 4 quarters)."""
# Check cache first (6 hours for earnings)
cached = load_cache(f"earnings{ticker}", max_age_minutes=360)
if cached:
return cached
data = await _finnhub_get("/stock/earnings", {"symbol": ticker})
_save_cache(f"earnings_{ticker}", data)
return data
async def get_company_news(ticker: str, limit: int = 5) -> list[dict]:
"""Get company news."""
# Check cache first (2 hours for news)
cached = load_cache(f"news{ticker}", max_age_minutes=120)
if cached:
return cached[:limit]
today = datetime.now()
from_date = (today - timedelta(days=7)).strftime("%Y-%m-%d")
to_date = today.strftime("%Y-%m-%d")
data = await _finnhub_get(
"/company-news",
{"symbol": ticker, "from": from_date, "to": to_date, "limit": limit}
)
_save_cache(f"news_{ticker}", data)
return data[:limit]
async def get_basic_financials(ticker: str) -> dict:
"""Get basic financials (market cap, etc.)."""
cached = load_cache(f"fin{ticker}", max_age_minutes=1440)
if cached:
return cached
data = await _finnhub_get("/stock/metric", {"symbol": ticker, "metric": "all"})
_save_cache(f"fin_{ticker}", data)
return data
---------------------------------------------------------------------------
Briefing generation
---------------------------------------------------------------------------
async def generate_briefing(tickers: list[str] | None = None, premarket: bool = False) -> dict:
"""Generate daily market sentiment briefing.
Uses free-tier endpoints:
- Quote (price, change) - cached 15 min
- Insider sentiment (mspr) - cached 24 hours
- Analyst recommendations (buy/sell/hold) - cached 24 hours
- Earnings surprises - cached 6 hours
- Company news - cached 2 hours
Args:
tickers: List of ticker symbols. Uses saved watchlist if None.
premarket: If True, uses previous close data for early morning briefings.
"""
# Import watchlist module here to avoid circular imports
from icarus.core.market.watchlist import get_combined_watchlist
from icarus.core.market.news import (
get_company_news, format_ticker_news, select_top_stories, get_market_news
)
# Get combined watchlist (user + dynamic)
watchlist_data = await get_combined_watchlist()
if tickers is None:
tickers = watchlist_data["all_tickers"]
user_tickers = watchlist_data["user_tickers"]
dynamic_tickers = watchlist_data["dynamic_tickers"]
dynamic_reasons = watchlist_data["dynamic_reasons"]
briefing = {
"generated_at": datetime.now().isoformat(),
"premarket": premarket,
"user_tickers": user_tickers,
"dynamic_tickers": dynamic_tickers,
"dynamic_reasons": dynamic_reasons,
"watchlist": tickers,
"summary": {
"bullish": 0,
"bearish": 0,
"neutral": 0,
"total": len(tickers),
},
"tickers": [],
"alerts": [],
"top_news": [],
"market_news": [],
"earnings_this_week": [],
}
# Fetch all ticker data
all_news = {}
for ticker in tickers:
try:
data = await _fetch_ticker_data(ticker, premarket=premarket)
# Add news for this ticker
try:
news = await get_company_news(ticker, limit=3)
data["news"] = news[:2] # Top 2 news items
all_news[ticker] = news
except Exception:
data["news"] = []
# Mark if this is a dynamic ticker
if ticker in dynamic_tickers:
data["is_dynamic"] = True
data["dynamic_reason"] = dynamic_reasons.get(ticker, "")
else:
data["is_dynamic"] = False
briefing["tickers"].append(data)
# Count signals
if data.get("signal") == "bullish":
briefing["summary"]["bullish"] += 1
elif data.get("signal") == "bearish":
briefing["summary"]["bearish"] += 1
else:
briefing["summary"]["neutral"] += 1
# Check for alerts
if data.get("earnings_surprise"):
briefing["alerts"].append({
"type": "earnings",
"ticker": ticker,
"message": data["earnings_surprise"],
})
if data.get("analyst_change"):
briefing["alerts"].append({
"type": "analyst",
"ticker": ticker,
"message": data["analyst_change"],
})
# Check for insider sentiment alerts
if data.get("insider_sentiment"):
mspr = data["insider_sentiment"].get("mspr", 0)
if abs(mspr) > 0.3:
direction = "buying" if mspr > 0 else "selling"
briefing["alerts"].append({
"type": "insider",
"ticker": ticker,
"message": f"Insider {direction} (MSPR: {mspr:+.2f})",
})
except Exception as e:
briefing["tickers"].append({
"ticker": ticker,
"error": str(e),
})
# Get top stories across all tickers
try:
top_stories = select_top_stories(all_news, max_stories=3)
briefing["top_news"] = top_stories
except Exception:
pass
# Get general market news
try:
briefing["market_news"] = await get_market_news(category="general", limit=2)
except Exception:
pass
# Get earnings calendar
try:
from icarus.core.market.watchlist import get_earnings_calendar
earnings = await get_earnings_calendar(days_ahead=7)
# Filter to just tickers in our watchlist
watchlist_set = set(t.upper() for t in tickers)
briefing["earnings_this_week"] = [
e for e in earnings
if e["ticker"] in watchlist_set
]
except Exception:
pass
return briefing
async def _fetch_ticker_data(ticker: str, premarket: bool = False) -> dict:
"""Fetch all data for a single ticker."""
quote = await get_quote(ticker)
# Handle pre-market (no current price yet)
if premarket and quote.get("dp") is None:
quote["c"] = quote.get("pc", quote.get("c"))
quote["dp"] = 0.0
sentiment, recs, earnings = await asyncio.gather(
get_sentiment(ticker),
get_recommendation(ticker),
get_earnings_surprises(ticker),
)
result = {
"ticker": ticker,
"price": quote.get("c"),
"change": quote.get("d"),
"change_percent": quote.get("dp"),
}
# Determine signal
signals = []
# Analyst recommendations
if recs and len(recs) > 0:
latest = recs[0]
buy = latest.get("buy", 0)
sell = latest.get("sell", 0)
hold = latest.get("hold", 0)
total = buy + sell + hold
if total > 0:
buy_pct = buy / total
sell_pct = sell / total
if buy_pct > 0.6:
signals.append("bullish")
elif sell_pct > 0.4:
signals.append("bearish")
# Insider sentiment
if sentiment.get("data"):
recent = sentiment["data"][-1] if sentiment["data"] else None
if recent:
mspr = recent.get("mspr", 0)
result["insider_sentiment"] = recent
if mspr > 0.2:
signals.append("bullish")
elif mspr < -0.2:
signals.append("bearish")
# Determine overall signal
if signals.count("bullish") > signals.count("bearish"):
result["signal"] = "bullish"
elif signals.count("bearish") > signals.count("bullish"):
result["signal"] = "bearish"
else:
result["signal"] = "neutral"
# Earnings surprise
if earnings and len(earnings) > 0:
latest = earnings[0]
surprise = latest.get("surprisePercent")
if surprise and abs(surprise) > 10:
direction = "beat" if surprise > 0 else "miss"
result["earnings_surprise"] = f"Q{latest.get('quarter')} {direction}: {surprise:+.1f}%"
# Analyst change (compare current vs previous month)
if recs and len(recs) > 1:
current = recs[0]
previous = recs[1]
buy_change = current.get("buy", 0) - previous.get("buy", 0)
if abs(buy_change) >= 2:
direction = "upgraded" if buy_change > 0 else "downgraded"
result["analyst_change"] = f"Analysts {direction} ({buy_change:+.0f} buys)"
return result
---------------------------------------------------------------------------
Formatting
---------------------------------------------------------------------------
def format_briefing(briefing: dict) -> str:
"""Format briefing as Telegram message."""
premarket = briefing.get("premarket", False)
date_str = datetime.now().strftime("%Y-%m-%d")
header = "🌅 Pre-Market Brief" if premarket else "📊 Market Close Brief"
lines = [
f"{header} — {date_str}",
"",
]
# Summary
summary = briefing["summary"]
lines.append(
f"📈 {summary['bullish']} bullish 📉 {summary['bearish']} bearish ➖ {summary['neutral']} neutral"
)
lines.append("")
# User watchlist
lines.append("📊 *Your Watchlist:*")
for t in briefing["tickers"]:
if "error" in t:
lines.append(f"❌ {t['ticker']}: {t['error']}")
continue
# Skip dynamic tickers in this section
if t.get("is_dynamic"):
continue
emoji = {"bullish": "🟢", "bearish": "🔴", "neutral": "⚪"}.get(t.get("signal"), "⚪")
change = t.get("change_percent", 0) or 0
change_emoji = "📈" if change > 0 else "📉" if change < 0 else "➖"
price = t.get("price")
price_str = f"${price:.2f}" if price is not None else "$N/A"
line = f"• {emoji} *{t['ticker']}*: {price_str} {change_emoji} {change:+.2f}%"
lines.append(line)
# Add brief news if available
if t.get("news"):
for news_item in t["news"][:1]: # Just 1 news item per ticker
if "headline" in news_item:
sentiment_emoji = news_item.get("sentiment", {}).get("emoji", "")
headline = news_item["headline"]
if len(headline) > 60:
headline = headline[:57] + "..."
lines.append(f" {sentiment_emoji} _{headline}_")
# Dynamic tickers section
if briefing.get("dynamic_tickers"):
lines.append("")
lines.append("🔥 *Trending Today:*")
for t in briefing["tickers"]:
if not t.get("is_dynamic"):
continue
if "error" in t:
continue
emoji = {"bullish": "🟢", "bearish": "🔴", "neutral": "⚪"}.get(t.get("signal"), "⚪")
change = t.get("change_percent", 0) or 0
change_emoji = "📈" if change > 0 else "📉" if change < 0 else "➖"
price = t.get("price")
price_str = f"${price:.2f}" if price is not None else "$N/A"
reason = t.get("dynamic_reason", "")
line = f"• {emoji} *{t['ticker']}*: {price_str} {change_emoji} {change:+.2f}%"
if reason:
line += f" — {reason}"
lines.append(line)
# Add brief news
if t.get("news"):
for news_item in t["news"][:1]:
if "headline" in news_item:
sentiment_emoji = news_item.get("sentiment", {}).get("emoji", "")
headline = news_item["headline"]
if len(headline) > 50:
headline = headline[:47] + "..."
lines.append(f" {sentiment_emoji} _{headline}_")
# Top News section
if briefing.get("top_news"):
lines.append("")
lines.append("📰 *Top Stories:*")
for item in briefing["top_news"][:3]:
sentiment_emoji = item.get("sentiment", {}).get("emoji", "📰")
ticker = item.get("ticker", "Market")
headline = item.get("headline", "")
source = item.get("source", "")
time_ago = item.get("time_ago", "")
# Truncate headline
if len(headline) > 70:
headline = headline[:67] + "..."
lines.append(f"{sentiment_emoji} *{ticker}*: {headline}")
lines.append(f" _({source}, {time_ago})_")
# Market news
if briefing.get("market_news"):
for item in briefing["market_news"][:2]:
sentiment_emoji = item.get("sentiment", {}).get("emoji", "🌍")
headline = item.get("headline", "")
source = item.get("source", "")
if len(headline) > 70:
headline = headline[:67] + "..."
lines.append(f"{sentiment_emoji} {headline} _({source})_")
# Insider Sentiment highlights
insider_alerts = [a for a in briefing.get("alerts", []) if a["type"] == "insider"]
if insider_alerts:
lines.append("")
lines.append("🎯 *Insider Activity:*")
for alert in insider_alerts[:3]:
lines.append(f"• {alert['ticker']}: {alert['message']}")
# Earnings This Week
if briefing.get("earnings_this_week"):
lines.append("")
lines.append("📅 *Earnings This Week:*")
for earning in briefing["earnings_this_week"][:4]:
ticker = earning["ticker"]
date = earning["date"]
days = earning["days_until"]
q = earning.get("quarter", "Q?")
when = "today" if days == 0 else f"in {days} days" if days > 0 else "recently"
lines.append(f"• {ticker}: {q} {when} ({date})")
# Other alerts
non_insider_alerts = [a for a in briefing.get("alerts", []) if a["type"] != "insider"]
if non_insider_alerts:
lines.append("")
lines.append("🚨 *Alerts:*")
for alert in non_insider_alerts[:5]:
emoji = {"earnings": "💰", "analyst": "📊"}.get(alert["type"], "🔔")
lines.append(f"{emoji} {alert['ticker']}: {alert['message']}")
lines.append("")
lines.append("Reply `watchlist` to edit tickers.")
return "\n".join(lines)
def extract_sentiment_from_headline(headline: str, summary: str = "") -> dict:
"""Extract sentiment from headline text."""
from icarus.core.market.news import _extract_sentiment
return _extract_sentiment(headline, summary)