"""Market sentiment briefing — Finnhub API client. Fetches sentiment data for watchlist tickers, generates daily briefing. Free tier: 60 calls/minute, sufficient for daily briefing. """ import asyncio import json import os from datetime import datetime, timedelta from pathlib import Path import httpx from icarus.core.config.staging import DATA_DIR FINNHUB_BASE = "https://finnhub.io/api/v1" # Load from .env if available try: from dotenv import load_dotenv load_dotenv('/home/hoffmann_admin/.openclaw/.env') except ImportError: pass FINNHUB_API_KEY = os.environ.get("FINNHUB_API_KEY") # Cache directory for freshness tracking CACHE_DIR = DATA_DIR / "market_cache" CACHE_DIR.mkdir(parents=True, exist_ok=True) # --------------------------------------------------------------------------- # Cache Management # --------------------------------------------------------------------------- def _cache_path(key: str) -> Path: """Get cache file path for a key.""" return CACHE_DIR / f"{key}.json" def _load_cache(key: str, max_age_minutes: int) -> dict | None: """Load cached data if fresh enough.""" cache_file = _cache_path(key) if not cache_file.exists(): return None try: with open(cache_file) as f: cached = json.load(f) cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01")) age = datetime.now() - cached_at if age > timedelta(minutes=max_age_minutes): return None return cached.get("data") except (json.JSONDecodeError, KeyError, ValueError): return None def _save_cache(key: str, data: dict) -> None: """Save data to cache with timestamp.""" cache_file = _cache_path(key) with open(cache_file, "w") as f: json.dump({"_cached_at": datetime.now().isoformat(), "data": data}, f, indent=2) # --------------------------------------------------------------------------- # API calls (free tier only) # --------------------------------------------------------------------------- async def _finnhub_get(endpoint: str, params: dict | None = None) -> dict: """Make authenticated Finnhub API request.""" if not FINNHUB_API_KEY: raise RuntimeError("FINNHUB_API_KEY not set") url = f"{FINNHUB_BASE}{endpoint}" query = params or {} query["token"] = FINNHUB_API_KEY async with httpx.AsyncClient() as client: resp = await client.get(url, params=query, timeout=30.0) resp.raise_for_status() return resp.json() async def get_quote(ticker: str) -> dict: """Get real-time quote for ticker.""" # Check cache first (15 minutes for quotes) cached = _load_cache(f"quote_{ticker}", max_age_minutes=15) if cached: return cached data = await _finnhub_get("/quote", {"symbol": ticker}) _save_cache(f"quote_{ticker}", data) return data async def get_sentiment(ticker: str) -> dict: """Get insider sentiment (monthly aggregation).""" # Check cache first (24 hours for sentiment) cached = _load_cache(f"sentiment_{ticker}", max_age_minutes=1440) if cached: return cached today = datetime.now() from_date = (today - timedelta(days=90)).strftime("%Y-%m-%d") to_date = today.strftime("%Y-%m-%d") data = await _finnhub_get( "/stock/insider-sentiment", {"symbol": ticker, "from": from_date, "to": to_date} ) _save_cache(f"sentiment_{ticker}", data) return data async def get_recommendation(ticker: str) -> list[dict]: """Get analyst recommendation trends.""" # Check cache first (24 hours for recommendations) cached = _load_cache(f"rec_{ticker}", max_age_minutes=1440) if cached: return cached data = await _finnhub_get("/stock/recommendation", {"symbol": ticker}) _save_cache(f"rec_{ticker}", data) return data async def get_earnings_surprises(ticker: str) -> list[dict]: """Get earnings surprises (last 4 quarters).""" # Check cache first (6 hours for earnings) cached = _load_cache(f"earnings_{ticker}", max_age_minutes=360) if cached: return cached data = await _finnhub_get("/stock/earnings", {"symbol": ticker}) _save_cache(f"earnings_{ticker}", data) return data async def get_company_news(ticker: str, limit: int = 5) -> list[dict]: """Get company news.""" # Check cache first (2 hours for news) cached = _load_cache(f"news_{ticker}", max_age_minutes=120) if cached: return cached[:limit] today = datetime.now() from_date = (today - timedelta(days=7)).strftime("%Y-%m-%d") to_date = today.strftime("%Y-%m-%d") data = await _finnhub_get( "/company-news", {"symbol": ticker, "from": from_date, "to": to_date, "limit": limit} ) _save_cache(f"news_{ticker}", data) return data[:limit] async def get_basic_financials(ticker: str) -> dict: """Get basic financials (market cap, etc.).""" cached = _load_cache(f"fin_{ticker}", max_age_minutes=1440) if cached: return cached data = await _finnhub_get("/stock/metric", {"symbol": ticker, "metric": "all"}) _save_cache(f"fin_{ticker}", data) return data # --------------------------------------------------------------------------- # Briefing generation # --------------------------------------------------------------------------- async def generate_briefing(tickers: list[str] | None = None, premarket: bool = False) -> dict: """Generate daily market sentiment briefing. Uses free-tier endpoints: - Quote (price, change) - cached 15 min - Insider sentiment (mspr) - cached 24 hours - Analyst recommendations (buy/sell/hold) - cached 24 hours - Earnings surprises - cached 6 hours - Company news - cached 2 hours Args: tickers: List of ticker symbols. Uses saved watchlist if None. premarket: If True, uses previous close data for early morning briefings. """ # Import watchlist module here to avoid circular imports from icarus.core.market.watchlist import get_combined_watchlist from icarus.core.market.news import ( get_company_news, format_ticker_news, select_top_stories, get_market_news ) # Get combined watchlist (user + dynamic) watchlist_data = await get_combined_watchlist() if tickers is None: tickers = watchlist_data["all_tickers"] user_tickers = watchlist_data["user_tickers"] dynamic_tickers = watchlist_data["dynamic_tickers"] dynamic_reasons = watchlist_data["dynamic_reasons"] briefing = { "generated_at": datetime.now().isoformat(), "premarket": premarket, "user_tickers": user_tickers, "dynamic_tickers": dynamic_tickers, "dynamic_reasons": dynamic_reasons, "watchlist": tickers, "summary": { "bullish": 0, "bearish": 0, "neutral": 0, "total": len(tickers), }, "tickers": [], "alerts": [], "top_news": [], "market_news": [], "earnings_this_week": [], } # Fetch all ticker data all_news = {} for ticker in tickers: try: data = await _fetch_ticker_data(ticker, premarket=premarket) # Add news for this ticker try: news = await get_company_news(ticker, limit=3) data["news"] = news[:2] # Top 2 news items all_news[ticker] = news except Exception: data["news"] = [] # Mark if this is a dynamic ticker if ticker in dynamic_tickers: data["is_dynamic"] = True data["dynamic_reason"] = dynamic_reasons.get(ticker, "") else: data["is_dynamic"] = False briefing["tickers"].append(data) # Count signals if data.get("signal") == "bullish": briefing["summary"]["bullish"] += 1 elif data.get("signal") == "bearish": briefing["summary"]["bearish"] += 1 else: briefing["summary"]["neutral"] += 1 # Check for alerts if data.get("earnings_surprise"): briefing["alerts"].append({ "type": "earnings", "ticker": ticker, "message": data["earnings_surprise"], }) if data.get("analyst_change"): briefing["alerts"].append({ "type": "analyst", "ticker": ticker, "message": data["analyst_change"], }) # Check for insider sentiment alerts if data.get("insider_sentiment"): mspr = data["insider_sentiment"].get("mspr", 0) if abs(mspr) > 0.3: direction = "buying" if mspr > 0 else "selling" briefing["alerts"].append({ "type": "insider", "ticker": ticker, "message": f"Insider {direction} (MSPR: {mspr:+.2f})", }) except Exception as e: briefing["tickers"].append({ "ticker": ticker, "error": str(e), }) # Get top stories across all tickers try: top_stories = select_top_stories(all_news, max_stories=3) briefing["top_news"] = top_stories except Exception: pass # Get general market news try: briefing["market_news"] = await get_market_news(category="general", limit=2) except Exception: pass # Get earnings calendar try: from icarus.core.market.watchlist import get_earnings_calendar earnings = await get_earnings_calendar(days_ahead=7) # Filter to just tickers in our watchlist watchlist_set = set(t.upper() for t in tickers) briefing["earnings_this_week"] = [ e for e in earnings if e["ticker"] in watchlist_set ] except Exception: pass return briefing async def _fetch_ticker_data(ticker: str, premarket: bool = False) -> dict: """Fetch all data for a single ticker.""" quote = await get_quote(ticker) # Handle pre-market (no current price yet) if premarket and quote.get("dp") is None: quote["c"] = quote.get("pc", quote.get("c")) quote["dp"] = 0.0 sentiment, recs, earnings = await asyncio.gather( get_sentiment(ticker), get_recommendation(ticker), get_earnings_surprises(ticker), ) result = { "ticker": ticker, "price": quote.get("c"), "change": quote.get("d"), "change_percent": quote.get("dp"), } # Determine signal signals = [] # Analyst recommendations if recs and len(recs) > 0: latest = recs[0] buy = latest.get("buy", 0) sell = latest.get("sell", 0) hold = latest.get("hold", 0) total = buy + sell + hold if total > 0: buy_pct = buy / total sell_pct = sell / total if buy_pct > 0.6: signals.append("bullish") elif sell_pct > 0.4: signals.append("bearish") # Insider sentiment if sentiment.get("data"): recent = sentiment["data"][-1] if sentiment["data"] else None if recent: mspr = recent.get("mspr", 0) result["insider_sentiment"] = recent if mspr > 0.2: signals.append("bullish") elif mspr < -0.2: signals.append("bearish") # Determine overall signal if signals.count("bullish") > signals.count("bearish"): result["signal"] = "bullish" elif signals.count("bearish") > signals.count("bullish"): result["signal"] = "bearish" else: result["signal"] = "neutral" # Earnings surprise if earnings and len(earnings) > 0: latest = earnings[0] surprise = latest.get("surprisePercent") if surprise and abs(surprise) > 10: direction = "beat" if surprise > 0 else "miss" result["earnings_surprise"] = f"Q{latest.get('quarter')} {direction}: {surprise:+.1f}%" # Analyst change (compare current vs previous month) if recs and len(recs) > 1: current = recs[0] previous = recs[1] buy_change = current.get("buy", 0) - previous.get("buy", 0) if abs(buy_change) >= 2: direction = "upgraded" if buy_change > 0 else "downgraded" result["analyst_change"] = f"Analysts {direction} ({buy_change:+.0f} buys)" return result # --------------------------------------------------------------------------- # Formatting # --------------------------------------------------------------------------- def format_briefing(briefing: dict) -> str: """Format briefing as Telegram message.""" premarket = briefing.get("premarket", False) date_str = datetime.now().strftime("%Y-%m-%d") header = "🌅 Pre-Market Brief" if premarket else "📊 Market Close Brief" lines = [ f"{header} — {date_str}", "", ] # Summary summary = briefing["summary"] lines.append( f"📈 {summary['bullish']} bullish 📉 {summary['bearish']} bearish ➖ {summary['neutral']} neutral" ) lines.append("") # User watchlist lines.append("📊 *Your Watchlist:*") for t in briefing["tickers"]: if "error" in t: lines.append(f"❌ {t['ticker']}: {t['error']}") continue # Skip dynamic tickers in this section if t.get("is_dynamic"): continue emoji = {"bullish": "🟢", "bearish": "🔴", "neutral": "⚪"}.get(t.get("signal"), "⚪") change = t.get("change_percent", 0) or 0 change_emoji = "📈" if change > 0 else "📉" if change < 0 else "➖" price = t.get("price") price_str = f"${price:.2f}" if price is not None else "$N/A" line = f"• {emoji} *{t['ticker']}*: {price_str} {change_emoji} {change:+.2f}%" lines.append(line) # Add brief news if available if t.get("news"): for news_item in t["news"][:1]: # Just 1 news item per ticker if "headline" in news_item: sentiment_emoji = news_item.get("sentiment", {}).get("emoji", "") headline = news_item["headline"] if len(headline) > 60: headline = headline[:57] + "..." lines.append(f" {sentiment_emoji} _{headline}_") # Dynamic tickers section if briefing.get("dynamic_tickers"): lines.append("") lines.append("🔥 *Trending Today:*") for t in briefing["tickers"]: if not t.get("is_dynamic"): continue if "error" in t: continue emoji = {"bullish": "🟢", "bearish": "🔴", "neutral": "⚪"}.get(t.get("signal"), "⚪") change = t.get("change_percent", 0) or 0 change_emoji = "📈" if change > 0 else "📉" if change < 0 else "➖" price = t.get("price") price_str = f"${price:.2f}" if price is not None else "$N/A" reason = t.get("dynamic_reason", "") line = f"• {emoji} *{t['ticker']}*: {price_str} {change_emoji} {change:+.2f}%" if reason: line += f" — {reason}" lines.append(line) # Add brief news if t.get("news"): for news_item in t["news"][:1]: if "headline" in news_item: sentiment_emoji = news_item.get("sentiment", {}).get("emoji", "") headline = news_item["headline"] if len(headline) > 50: headline = headline[:47] + "..." lines.append(f" {sentiment_emoji} _{headline}_") # Top News section if briefing.get("top_news"): lines.append("") lines.append("📰 *Top Stories:*") for item in briefing["top_news"][:3]: sentiment_emoji = item.get("sentiment", {}).get("emoji", "📰") ticker = item.get("ticker", "Market") headline = item.get("headline", "") source = item.get("source", "") time_ago = item.get("time_ago", "") # Truncate headline if len(headline) > 70: headline = headline[:67] + "..." lines.append(f"{sentiment_emoji} *{ticker}*: {headline}") lines.append(f" _({source}, {time_ago})_") # Market news if briefing.get("market_news"): for item in briefing["market_news"][:2]: sentiment_emoji = item.get("sentiment", {}).get("emoji", "🌍") headline = item.get("headline", "") source = item.get("source", "") if len(headline) > 70: headline = headline[:67] + "..." lines.append(f"{sentiment_emoji} {headline} _({source})_") # Insider Sentiment highlights insider_alerts = [a for a in briefing.get("alerts", []) if a["type"] == "insider"] if insider_alerts: lines.append("") lines.append("🎯 *Insider Activity:*") for alert in insider_alerts[:3]: lines.append(f"• {alert['ticker']}: {alert['message']}") # Earnings This Week if briefing.get("earnings_this_week"): lines.append("") lines.append("📅 *Earnings This Week:*") for earning in briefing["earnings_this_week"][:4]: ticker = earning["ticker"] date = earning["date"] days = earning["days_until"] q = earning.get("quarter", "Q?") when = "today" if days == 0 else f"in {days} days" if days > 0 else "recently" lines.append(f"• {ticker}: {q} {when} ({date})") # Other alerts non_insider_alerts = [a for a in briefing.get("alerts", []) if a["type"] != "insider"] if non_insider_alerts: lines.append("") lines.append("🚨 *Alerts:*") for alert in non_insider_alerts[:5]: emoji = {"earnings": "💰", "analyst": "📊"}.get(alert["type"], "🔔") lines.append(f"{emoji} {alert['ticker']}: {alert['message']}") lines.append("") lines.append("Reply `watchlist` to edit tickers.") return "\n".join(lines) def extract_sentiment_from_headline(headline: str, summary: str = "") -> dict: """Extract sentiment from headline text.""" from icarus.core.market.news import _extract_sentiment return _extract_sentiment(headline, summary)