📄 sentiment.py 19,039 bytes May 01, 2026 📋 Raw

"""Market sentiment briefing — Finnhub API client.

Fetches sentiment data for watchlist tickers, generates daily briefing.
Free tier: 60 calls/minute, sufficient for daily briefing.
"""

import asyncio
import json
import os
from datetime import datetime, timedelta
from pathlib import Path

import httpx

from icarus.core.config.staging import DATA_DIR

FINNHUB_BASE = "https://finnhub.io/api/v1"

Load from .env if available

try:
from dotenv import load_dotenv
load_dotenv('/home/hoffmann_admin/.openclaw/.env')
except ImportError:
pass

FINNHUB_API_KEY = os.environ.get("FINNHUB_API_KEY")

Cache directory for freshness tracking

CACHE_DIR = DATA_DIR / "market_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)

---------------------------------------------------------------------------

Cache Management

---------------------------------------------------------------------------

def _cache_path(key: str) -> Path:
"""Get cache file path for a key."""
return CACHE_DIR / f"{key}.json"

def _load_cache(key: str, max_age_minutes: int) -> dict | None:
"""Load cached data if fresh enough."""
cache_file = _cache_path(key)
if not cache_file.exists():
return None

try:
    with open(cache_file) as f:
        cached = json.load(f)

    cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01"))
    age = datetime.now() - cached_at

    if age > timedelta(minutes=max_age_minutes):
        return None

    return cached.get("data")
except (json.JSONDecodeError, KeyError, ValueError):
    return None

def _save_cache(key: str, data: dict) -> None:
"""Save data to cache with timestamp."""
cache_file = _cache_path(key)
with open(cache_file, "w") as f:
json.dump({"_cached_at": datetime.now().isoformat(), "data": data}, f, indent=2)

---------------------------------------------------------------------------

API calls (free tier only)

---------------------------------------------------------------------------

async def _finnhub_get(endpoint: str, params: dict | None = None) -> dict:
"""Make authenticated Finnhub API request."""
if not FINNHUB_API_KEY:
raise RuntimeError("FINNHUB_API_KEY not set")

url = f"{FINNHUB_BASE}{endpoint}"
query = params or {}
query["token"] = FINNHUB_API_KEY

async with httpx.AsyncClient() as client:
    resp = await client.get(url, params=query, timeout=30.0)
    resp.raise_for_status()
    return resp.json()

async def get_quote(ticker: str) -> dict:
"""Get real-time quote for ticker."""
# Check cache first (15 minutes for quotes)
cached = load_cache(f"quote{ticker}", max_age_minutes=15)
if cached:
return cached

data = await _finnhub_get("/quote", {"symbol": ticker})
_save_cache(f"quote_{ticker}", data)
return data

async def get_sentiment(ticker: str) -> dict:
"""Get insider sentiment (monthly aggregation)."""
# Check cache first (24 hours for sentiment)
cached = load_cache(f"sentiment{ticker}", max_age_minutes=1440)
if cached:
return cached

today = datetime.now()
from_date = (today - timedelta(days=90)).strftime("%Y-%m-%d")
to_date = today.strftime("%Y-%m-%d")
data = await _finnhub_get(
    "/stock/insider-sentiment",
    {"symbol": ticker, "from": from_date, "to": to_date}
)
_save_cache(f"sentiment_{ticker}", data)
return data

async def get_recommendation(ticker: str) -> list[dict]:
"""Get analyst recommendation trends."""
# Check cache first (24 hours for recommendations)
cached = load_cache(f"rec{ticker}", max_age_minutes=1440)
if cached:
return cached

data = await _finnhub_get("/stock/recommendation", {"symbol": ticker})
_save_cache(f"rec_{ticker}", data)
return data

async def get_earnings_surprises(ticker: str) -> list[dict]:
"""Get earnings surprises (last 4 quarters)."""
# Check cache first (6 hours for earnings)
cached = load_cache(f"earnings{ticker}", max_age_minutes=360)
if cached:
return cached

data = await _finnhub_get("/stock/earnings", {"symbol": ticker})
_save_cache(f"earnings_{ticker}", data)
return data

async def get_company_news(ticker: str, limit: int = 5) -> list[dict]:
"""Get company news."""
# Check cache first (2 hours for news)
cached = load_cache(f"news{ticker}", max_age_minutes=120)
if cached:
return cached[:limit]

today = datetime.now()
from_date = (today - timedelta(days=7)).strftime("%Y-%m-%d")
to_date = today.strftime("%Y-%m-%d")
data = await _finnhub_get(
    "/company-news",
    {"symbol": ticker, "from": from_date, "to": to_date, "limit": limit}
)
_save_cache(f"news_{ticker}", data)
return data[:limit]

async def get_basic_financials(ticker: str) -> dict:
"""Get basic financials (market cap, etc.)."""
cached = load_cache(f"fin{ticker}", max_age_minutes=1440)
if cached:
return cached

data = await _finnhub_get("/stock/metric", {"symbol": ticker, "metric": "all"})
_save_cache(f"fin_{ticker}", data)
return data

---------------------------------------------------------------------------

Briefing generation

---------------------------------------------------------------------------

async def generate_briefing(tickers: list[str] | None = None, premarket: bool = False) -> dict:
"""Generate daily market sentiment briefing.

Uses free-tier endpoints:
- Quote (price, change) - cached 15 min
- Insider sentiment (mspr) - cached 24 hours
- Analyst recommendations (buy/sell/hold) - cached 24 hours
- Earnings surprises - cached 6 hours
- Company news - cached 2 hours

Args:
    tickers: List of ticker symbols. Uses saved watchlist if None.
    premarket: If True, uses previous close data for early morning briefings.
"""
# Import watchlist module here to avoid circular imports
from icarus.core.market.watchlist import get_combined_watchlist
from icarus.core.market.news import (
    get_company_news, format_ticker_news, select_top_stories, get_market_news
)

# Get combined watchlist (user + dynamic)
watchlist_data = await get_combined_watchlist()
if tickers is None:
    tickers = watchlist_data["all_tickers"]

user_tickers = watchlist_data["user_tickers"]
dynamic_tickers = watchlist_data["dynamic_tickers"]
dynamic_reasons = watchlist_data["dynamic_reasons"]

briefing = {
    "generated_at": datetime.now().isoformat(),
    "premarket": premarket,
    "user_tickers": user_tickers,
    "dynamic_tickers": dynamic_tickers,
    "dynamic_reasons": dynamic_reasons,
    "watchlist": tickers,
    "summary": {
        "bullish": 0,
        "bearish": 0,
        "neutral": 0,
        "total": len(tickers),
    },
    "tickers": [],
    "alerts": [],
    "top_news": [],
    "market_news": [],
    "earnings_this_week": [],
}

# Fetch all ticker data
all_news = {}
for ticker in tickers:
    try:
        data = await _fetch_ticker_data(ticker, premarket=premarket)

        # Add news for this ticker
        try:
            news = await get_company_news(ticker, limit=3)
            data["news"] = news[:2]  # Top 2 news items
            all_news[ticker] = news
        except Exception:
            data["news"] = []

        # Mark if this is a dynamic ticker
        if ticker in dynamic_tickers:
            data["is_dynamic"] = True
            data["dynamic_reason"] = dynamic_reasons.get(ticker, "")
        else:
            data["is_dynamic"] = False

        briefing["tickers"].append(data)

        # Count signals
        if data.get("signal") == "bullish":
            briefing["summary"]["bullish"] += 1
        elif data.get("signal") == "bearish":
            briefing["summary"]["bearish"] += 1
        else:
            briefing["summary"]["neutral"] += 1

        # Check for alerts
        if data.get("earnings_surprise"):
            briefing["alerts"].append({
                "type": "earnings",
                "ticker": ticker,
                "message": data["earnings_surprise"],
            })

        if data.get("analyst_change"):
            briefing["alerts"].append({
                "type": "analyst",
                "ticker": ticker,
                "message": data["analyst_change"],
            })

        # Check for insider sentiment alerts
        if data.get("insider_sentiment"):
            mspr = data["insider_sentiment"].get("mspr", 0)
            if abs(mspr) > 0.3:
                direction = "buying" if mspr > 0 else "selling"
                briefing["alerts"].append({
                    "type": "insider",
                    "ticker": ticker,
                    "message": f"Insider {direction} (MSPR: {mspr:+.2f})",
                })

    except Exception as e:
        briefing["tickers"].append({
            "ticker": ticker,
            "error": str(e),
        })

# Get top stories across all tickers
try:
    top_stories = select_top_stories(all_news, max_stories=3)
    briefing["top_news"] = top_stories
except Exception:
    pass

# Get general market news
try:
    briefing["market_news"] = await get_market_news(category="general", limit=2)
except Exception:
    pass

# Get earnings calendar
try:
    from icarus.core.market.watchlist import get_earnings_calendar
    earnings = await get_earnings_calendar(days_ahead=7)
    # Filter to just tickers in our watchlist
    watchlist_set = set(t.upper() for t in tickers)
    briefing["earnings_this_week"] = [
        e for e in earnings 
        if e["ticker"] in watchlist_set
    ]
except Exception:
    pass

return briefing

async def _fetch_ticker_data(ticker: str, premarket: bool = False) -> dict:
"""Fetch all data for a single ticker."""
quote = await get_quote(ticker)

# Handle pre-market (no current price yet)
if premarket and quote.get("dp") is None:
    quote["c"] = quote.get("pc", quote.get("c"))
    quote["dp"] = 0.0

sentiment, recs, earnings = await asyncio.gather(
    get_sentiment(ticker),
    get_recommendation(ticker),
    get_earnings_surprises(ticker),
)

result = {
    "ticker": ticker,
    "price": quote.get("c"),
    "change": quote.get("d"),
    "change_percent": quote.get("dp"),
}

# Determine signal
signals = []

# Analyst recommendations
if recs and len(recs) > 0:
    latest = recs[0]
    buy = latest.get("buy", 0)
    sell = latest.get("sell", 0)
    hold = latest.get("hold", 0)
    total = buy + sell + hold
    if total > 0:
        buy_pct = buy / total
        sell_pct = sell / total
        if buy_pct > 0.6:
            signals.append("bullish")
        elif sell_pct > 0.4:
            signals.append("bearish")

# Insider sentiment
if sentiment.get("data"):
    recent = sentiment["data"][-1] if sentiment["data"] else None
    if recent:
        mspr = recent.get("mspr", 0)
        result["insider_sentiment"] = recent
        if mspr > 0.2:
            signals.append("bullish")
        elif mspr < -0.2:
            signals.append("bearish")

# Determine overall signal
if signals.count("bullish") > signals.count("bearish"):
    result["signal"] = "bullish"
elif signals.count("bearish") > signals.count("bullish"):
    result["signal"] = "bearish"
else:
    result["signal"] = "neutral"

# Earnings surprise
if earnings and len(earnings) > 0:
    latest = earnings[0]
    surprise = latest.get("surprisePercent")
    if surprise and abs(surprise) > 10:
        direction = "beat" if surprise > 0 else "miss"
        result["earnings_surprise"] = f"Q{latest.get('quarter')} {direction}: {surprise:+.1f}%"

# Analyst change (compare current vs previous month)
if recs and len(recs) > 1:
    current = recs[0]
    previous = recs[1]
    buy_change = current.get("buy", 0) - previous.get("buy", 0)
    if abs(buy_change) >= 2:
        direction = "upgraded" if buy_change > 0 else "downgraded"
        result["analyst_change"] = f"Analysts {direction} ({buy_change:+.0f} buys)"

return result

---------------------------------------------------------------------------

Formatting

---------------------------------------------------------------------------

def format_briefing(briefing: dict) -> str:
"""Format briefing as Telegram message."""
premarket = briefing.get("premarket", False)
date_str = datetime.now().strftime("%Y-%m-%d")

header = "🌅 Pre-Market Brief" if premarket else "📊 Market Close Brief"

lines = [
    f"{header} — {date_str}",
    "",
]

# Summary
summary = briefing["summary"]
lines.append(
    f"📈 {summary['bullish']} bullish  📉 {summary['bearish']} bearish  ➖ {summary['neutral']} neutral"
)
lines.append("")

# User watchlist
lines.append("📊 *Your Watchlist:*")
for t in briefing["tickers"]:
    if "error" in t:
        lines.append(f"❌ {t['ticker']}: {t['error']}")
        continue

    # Skip dynamic tickers in this section
    if t.get("is_dynamic"):
        continue

    emoji = {"bullish": "🟢", "bearish": "🔴", "neutral": "⚪"}.get(t.get("signal"), "⚪")
    change = t.get("change_percent", 0) or 0
    change_emoji = "📈" if change > 0 else "📉" if change < 0 else "➖"
    price = t.get("price")
    price_str = f"${price:.2f}" if price is not None else "$N/A"

    line = f"• {emoji} *{t['ticker']}*: {price_str} {change_emoji} {change:+.2f}%"
    lines.append(line)

    # Add brief news if available
    if t.get("news"):
        for news_item in t["news"][:1]:  # Just 1 news item per ticker
            if "headline" in news_item:
                sentiment_emoji = news_item.get("sentiment", {}).get("emoji", "")
                headline = news_item["headline"]
                if len(headline) > 60:
                    headline = headline[:57] + "..."
                lines.append(f"  {sentiment_emoji} _{headline}_")

# Dynamic tickers section
if briefing.get("dynamic_tickers"):
    lines.append("")
    lines.append("🔥 *Trending Today:*")
    for t in briefing["tickers"]:
        if not t.get("is_dynamic"):
            continue
        if "error" in t:
            continue

        emoji = {"bullish": "🟢", "bearish": "🔴", "neutral": "⚪"}.get(t.get("signal"), "⚪")
        change = t.get("change_percent", 0) or 0
        change_emoji = "📈" if change > 0 else "📉" if change < 0 else "➖"
        price = t.get("price")
        price_str = f"${price:.2f}" if price is not None else "$N/A"
        reason = t.get("dynamic_reason", "")

        line = f"• {emoji} *{t['ticker']}*: {price_str} {change_emoji} {change:+.2f}%"
        if reason:
            line += f" — {reason}"
        lines.append(line)

        # Add brief news
        if t.get("news"):
            for news_item in t["news"][:1]:
                if "headline" in news_item:
                    sentiment_emoji = news_item.get("sentiment", {}).get("emoji", "")
                    headline = news_item["headline"]
                    if len(headline) > 50:
                        headline = headline[:47] + "..."
                    lines.append(f"  {sentiment_emoji} _{headline}_")

# Top News section
if briefing.get("top_news"):
    lines.append("")
    lines.append("📰 *Top Stories:*")
    for item in briefing["top_news"][:3]:
        sentiment_emoji = item.get("sentiment", {}).get("emoji", "📰")
        ticker = item.get("ticker", "Market")
        headline = item.get("headline", "")
        source = item.get("source", "")
        time_ago = item.get("time_ago", "")

        # Truncate headline
        if len(headline) > 70:
            headline = headline[:67] + "..."

        lines.append(f"{sentiment_emoji} *{ticker}*: {headline}")
        lines.append(f"   _({source}, {time_ago})_")

# Market news
if briefing.get("market_news"):
    for item in briefing["market_news"][:2]:
        sentiment_emoji = item.get("sentiment", {}).get("emoji", "🌍")
        headline = item.get("headline", "")
        source = item.get("source", "")
        if len(headline) > 70:
            headline = headline[:67] + "..."
        lines.append(f"{sentiment_emoji} {headline} _({source})_")

# Insider Sentiment highlights
insider_alerts = [a for a in briefing.get("alerts", []) if a["type"] == "insider"]
if insider_alerts:
    lines.append("")
    lines.append("🎯 *Insider Activity:*")
    for alert in insider_alerts[:3]:
        lines.append(f"• {alert['ticker']}: {alert['message']}")

# Earnings This Week
if briefing.get("earnings_this_week"):
    lines.append("")
    lines.append("📅 *Earnings This Week:*")
    for earning in briefing["earnings_this_week"][:4]:
        ticker = earning["ticker"]
        date = earning["date"]
        days = earning["days_until"]
        q = earning.get("quarter", "Q?")
        when = "today" if days == 0 else f"in {days} days" if days > 0 else "recently"
        lines.append(f"• {ticker}: {q} {when} ({date})")

# Other alerts
non_insider_alerts = [a for a in briefing.get("alerts", []) if a["type"] != "insider"]
if non_insider_alerts:
    lines.append("")
    lines.append("🚨 *Alerts:*")
    for alert in non_insider_alerts[:5]:
        emoji = {"earnings": "💰", "analyst": "📊"}.get(alert["type"], "🔔")
        lines.append(f"{emoji} {alert['ticker']}: {alert['message']}")

lines.append("")
lines.append("Reply `watchlist` to edit tickers.")

return "\n".join(lines)

def extract_sentiment_from_headline(headline: str, summary: str = "") -> dict:
"""Extract sentiment from headline text."""
from icarus.core.market.news import _extract_sentiment
return _extract_sentiment(headline, summary)