"""News fetching and sentiment extraction for market briefing. Fetches company news from Finnhub, extracts sentiment from headlines, and formats news items for the briefing. """ import json import os import re from datetime import datetime, timedelta from pathlib import Path import httpx from icarus.core.config.staging import DATA_DIR FINNHUB_BASE = "https://finnhub.io/api/v1" FINNHUB_API_KEY = os.environ.get("FINNHUB_API_KEY") CACHE_DIR = DATA_DIR / "market_cache" CACHE_DIR.mkdir(parents=True, exist_ok=True) # --------------------------------------------------------------------------- # Cache Management # --------------------------------------------------------------------------- def _cache_path(key: str) -> Path: """Get cache file path for a key.""" return CACHE_DIR / f"{key}.json" def _load_cache(key: str, max_age_minutes: int) -> dict | None: """Load cached data if fresh enough.""" cache_file = _cache_path(key) if not cache_file.exists(): return None try: with open(cache_file) as f: cached = json.load(f) cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01")) age = datetime.now() - cached_at if age > timedelta(minutes=max_age_minutes): return None return cached.get("data") except (json.JSONDecodeError, KeyError, ValueError): return None def _save_cache(key: str, data: dict) -> None: """Save data to cache with timestamp.""" cache_file = _cache_path(key) with open(cache_file, "w") as f: json.dump({"_cached_at": datetime.now().isoformat(), "data": data}, f, indent=2) # --------------------------------------------------------------------------- # API Helpers # --------------------------------------------------------------------------- async def _finnhub_get(endpoint: str, params: dict | None = None) -> dict: """Make authenticated Finnhub API request.""" if not FINNHUB_API_KEY: raise RuntimeError("FINNHUB_API_KEY not set") url = f"{FINNHUB_BASE}{endpoint}" query = params or {} query["token"] = FINNHUB_API_KEY async with httpx.AsyncClient() as client: resp = await client.get(url, params=query, timeout=30.0) resp.raise_for_status() return resp.json() # --------------------------------------------------------------------------- # News Fetching # --------------------------------------------------------------------------- async def get_company_news(ticker: str, limit: int = 5, days_back: int = 3) -> list[dict]: """Get company news for a ticker. Args: ticker: Stock symbol limit: Maximum news items to return days_back: How many days back to fetch Returns: List of news items with headline, source, date, summary """ cache_key = f"news_{ticker}_{days_back}" cached = _load_cache(cache_key, max_age_minutes=120) if cached: return cached[:limit] today = datetime.now() from_date = (today - timedelta(days=days_back)).strftime("%Y-%m-%d") to_date = today.strftime("%Y-%m-%d") try: news = await _finnhub_get( "/company-news", {"symbol": ticker, "from": from_date, "to": to_date, "limit": limit * 2} ) # Normalize and filter results = [] for item in news: if not item.get("headline"): continue # Parse datetime dt_str = item.get("datetime", 0) if dt_str: dt = datetime.fromtimestamp(dt_str) time_ago = _format_time_ago(dt) else: time_ago = "recent" results.append({ "headline": item["headline"], "source": item.get("source", "Unknown"), "summary": item.get("summary", "")[:200] + "..." if len(item.get("summary", "")) > 200 else item.get("summary", ""), "url": item.get("url", ""), "datetime": dt_str, "time_ago": time_ago, "sentiment": _extract_sentiment(item["headline"], item.get("summary", "")) }) _save_cache(cache_key, results) return results[:limit] except Exception as e: return [{"error": str(e), "ticker": ticker}] def _format_time_ago(dt: datetime) -> str: """Format datetime as '2h ago', '1d ago', etc.""" delta = datetime.now() - dt if delta.days > 0: return f"{delta.days}d ago" hours = delta.seconds // 3600 if hours > 0: return f"{hours}h ago" minutes = delta.seconds // 60 return f"{minutes}m ago" # --------------------------------------------------------------------------- # Sentiment Extraction # --------------------------------------------------------------------------- SENTIMENT_POSITIVE = [ "surge", "surges", "jump", "jumps", "soar", "soars", "rally", "rallies", "gain", "gains", "rise", "rises", "up", "boost", "boosts", "beat", "beats", "outperform", "outperforms", "bullish", "buy", "upgrade", "upgrades", "raised", "strong", "positive", "growth", "profit", "profits", "revenue up", "earnings beat", "exceeds", "record high", " ATH", " all-time high", " moon", "rocket", "rip", "pump" ] SENTIMENT_NEGATIVE = [ "drop", "drops", "fall", "falls", "plunge", "plunges", "crash", "crashes", "decline", "declines", "tumble", "tumbles", "sink", "sinks", "down", "slide", "slides", "plummet", "plummets", "miss", "misses", "underperform", "underperforms", "bearish", "sell", "downgrade", "downgrades", "cut", "weak", "negative", "loss", "losses", "revenue down", "earnings miss", "layoff", "layoffs", " investigation", " probe", "lawsuit", "low", "ATW", "dump", "tank" ] SENTIMENT_NEUTRAL = [ "report", "reports", "announces", "update", "update", "review", "maintain", "maintains", "hold", "holds", "neutral", "flat" ] def _extract_sentiment(headline: str, summary: str = "") -> dict: """Extract sentiment from headline and summary.""" text = (headline + " " + summary).lower() positive_score = sum(1 for word in SENTIMENT_POSITIVE if word.lower() in text) negative_score = sum(1 for word in SENTIMENT_NEGATIVE if word.lower() in text) neutral_score = sum(1 for word in SENTIMENT_NEUTRAL if word.lower() in text) # Determine overall sentiment if positive_score > negative_score: label = "positive" emoji = "📈" elif negative_score > positive_score: label = "negative" emoji = "📉" else: label = "neutral" emoji = "➖" # Calculate confidence (0-1) total_signals = positive_score + negative_score if total_signals > 0: confidence = max(positive_score, negative_score) / (total_signals + neutral_score * 0.5) confidence = min(confidence, 1.0) else: confidence = 0.3 # Low confidence if no clear signals return { "label": label, "emoji": emoji, "confidence": round(confidence, 2), "positive_signals": positive_score, "negative_signals": negative_score } def summarize_sentiment(news_items: list[dict]) -> dict: """Summarize sentiment across multiple news items. Returns dict with overall sentiment and key highlights. """ if not news_items: return {"label": "neutral", "emoji": "➖", "confidence": 0} labels = [n["sentiment"]["label"] for n in news_items if "sentiment" in n] positive_count = labels.count("positive") negative_count = labels.count("negative") neutral_count = labels.count("neutral") total = len(labels) if total == 0: return {"label": "neutral", "emoji": "➖", "confidence": 0} # Determine overall if positive_count > negative_count and positive_count >= total * 0.4: label = "positive" emoji = "📈" elif negative_count > positive_count and negative_count >= total * 0.4: label = "negative" emoji = "📉" else: label = "neutral" emoji = "➖" confidence = max(positive_count, negative_count, neutral_count) / total return { "label": label, "emoji": emoji, "confidence": round(confidence, 2), "positive_ratio": positive_count / total, "negative_ratio": negative_count / total, "total_items": total } # --------------------------------------------------------------------------- # News Formatting # --------------------------------------------------------------------------- def format_news_item(item: dict, include_ticker: bool = True) -> str: """Format a single news item for Telegram. Format: 📰 [Ticker]: Headline (Source) [time] """ ticker_str = f"*{item.get('ticker', 'Market')}*: " if include_ticker else "" sentiment_emoji = item.get("sentiment", {}).get("emoji", "") headline = item.get("headline", "No headline") source = item.get("source", "Unknown") time_ago = item.get("time_ago", "") return f"{sentiment_emoji} {ticker_str}{headline} _({source}, {time_ago})_" def format_ticker_news(ticker: str, news_items: list[dict], max_items: int = 2) -> str: """Format news for a specific ticker. Returns formatted string or empty string if no news. """ if not news_items or len(news_items) == 0: return "" lines = [] for item in news_items[:max_items]: if "error" in item: continue sentiment_emoji = item.get("sentiment", {}).get("emoji", "") headline = item.get("headline", "") # Truncate long headlines if len(headline) > 80: headline = headline[:77] + "..." lines.append(f" {sentiment_emoji} {headline}") return "\n".join(lines) if lines else "" def select_top_stories(all_news: dict[str, list[dict]], max_stories: int = 3) -> list[dict]: """Select top stories across all tickers. Prioritizes: 1. High sentiment confidence 2. Recent news 3. Major tickers """ scored_news = [] for ticker, items in all_news.items(): for item in items: if "error" in item: continue # Calculate score sentiment = item.get("sentiment", {}) confidence = sentiment.get("confidence", 0) label = sentiment.get("label", "neutral") # Boost for strong sentiment sentiment_boost = 2.0 if label in ["positive", "negative"] else 1.0 # Recency boost (assume higher in list = more recent from Finnhub) # Major ticker boost major_tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "NVDA", "META"] ticker_boost = 1.5 if ticker in major_tickers else 1.0 score = confidence * sentiment_boost * ticker_boost scored_news.append({ **item, "ticker": ticker, "score": score }) # Sort by score descending scored_news.sort(key=lambda x: x["score"], reverse=True) return scored_news[:max_stories] # --------------------------------------------------------------------------- # Market News (General) # --------------------------------------------------------------------------- async def get_market_news(category: str = "general", limit: int = 5) -> list[dict]: """Get general market news. Categories: general, forex, crypto, merger """ cache_key = f"market_news_{category}" cached = _load_cache(cache_key, max_age_minutes=60) if cached: return cached[:limit] try: news = await _finnhub_get("/news", {"category": category}) results = [] for item in news[:limit * 2]: if not item.get("headline"): continue dt_str = item.get("datetime", 0) if dt_str: dt = datetime.fromtimestamp(dt_str) time_ago = _format_time_ago(dt) else: time_ago = "recent" results.append({ "headline": item["headline"], "source": item.get("source", "Unknown"), "category": item.get("category", "general"), "time_ago": time_ago, "sentiment": _extract_sentiment(item["headline"]) }) _save_cache(cache_key, results) return results[:limit] except Exception: return [] # --------------------------------------------------------------------------- # Data Freshness Check # --------------------------------------------------------------------------- def check_data_freshness(tickers: list[str]) -> dict: """Check if cached data is stale. Returns dict with: - is_fresh: bool - stale_tickers: list of tickers with stale data - oldest_cache: timestamp of oldest cached data """ stale_tickers = [] oldest_cache = datetime.now() for ticker in tickers: cache_file = _cache_path(f"quote_{ticker}") if not cache_file.exists(): stale_tickers.append(ticker) continue try: with open(cache_file) as f: cached = json.load(f) cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01")) age = datetime.now() - cached_at if age > timedelta(hours=2): stale_tickers.append(ticker) if cached_at < oldest_cache: oldest_cache = cached_at except (json.JSONDecodeError, KeyError): stale_tickers.append(ticker) return { "is_fresh": len(stale_tickers) == 0, "stale_tickers": stale_tickers, "stale_count": len(stale_tickers), "oldest_cache": oldest_cache.isoformat() } def get_cache_status() -> dict: """Get overall cache status.""" status = { "quotes": {}, "news": {}, "sentiment": {} } now = datetime.now() for cache_file in CACHE_DIR.glob("*.json"): try: with open(cache_file) as f: cached = json.load(f) cached_at = datetime.fromisoformat(cached.get("_cached_at", now.isoformat())) age_minutes = (now - cached_at).total_seconds() / 60 key = cache_file.stem if key.startswith("quote_"): status["quotes"][key] = {"age_minutes": int(age_minutes)} elif key.startswith("news_"): status["news"][key] = {"age_minutes": int(age_minutes)} elif key.startswith("sentiment_"): status["sentiment"][key] = {"age_minutes": int(age_minutes)} except Exception: continue return status