📄 news.py 15,138 bytes May 01, 2026 📋 Raw

"""News fetching and sentiment extraction for market briefing.

Fetches company news from Finnhub, extracts sentiment from headlines,
and formats news items for the briefing.
"""

import json
import os
import re
from datetime import datetime, timedelta
from pathlib import Path

import httpx

from icarus.core.config.staging import DATA_DIR

FINNHUB_BASE = "https://finnhub.io/api/v1"
FINNHUB_API_KEY = os.environ.get("FINNHUB_API_KEY")

CACHE_DIR = DATA_DIR / "market_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)

---------------------------------------------------------------------------

Cache Management

---------------------------------------------------------------------------

def _cache_path(key: str) -> Path:
"""Get cache file path for a key."""
return CACHE_DIR / f"{key}.json"

def _load_cache(key: str, max_age_minutes: int) -> dict | None:
"""Load cached data if fresh enough."""
cache_file = _cache_path(key)
if not cache_file.exists():
return None

try:
    with open(cache_file) as f:
        cached = json.load(f)

    cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01"))
    age = datetime.now() - cached_at

    if age > timedelta(minutes=max_age_minutes):
        return None

    return cached.get("data")
except (json.JSONDecodeError, KeyError, ValueError):
    return None

def _save_cache(key: str, data: dict) -> None:
"""Save data to cache with timestamp."""
cache_file = _cache_path(key)
with open(cache_file, "w") as f:
json.dump({"_cached_at": datetime.now().isoformat(), "data": data}, f, indent=2)

---------------------------------------------------------------------------

API Helpers

---------------------------------------------------------------------------

async def _finnhub_get(endpoint: str, params: dict | None = None) -> dict:
"""Make authenticated Finnhub API request."""
if not FINNHUB_API_KEY:
raise RuntimeError("FINNHUB_API_KEY not set")

url = f"{FINNHUB_BASE}{endpoint}"
query = params or {}
query["token"] = FINNHUB_API_KEY

async with httpx.AsyncClient() as client:
    resp = await client.get(url, params=query, timeout=30.0)
    resp.raise_for_status()
    return resp.json()

---------------------------------------------------------------------------

News Fetching

---------------------------------------------------------------------------

async def get_company_news(ticker: str, limit: int = 5, days_back: int = 3) -> list[dict]:
"""Get company news for a ticker.

Args:
    ticker: Stock symbol
    limit: Maximum news items to return
    days_back: How many days back to fetch

Returns:
    List of news items with headline, source, date, summary
"""
cache_key = f"news_{ticker}_{days_back}"
cached = _load_cache(cache_key, max_age_minutes=120)
if cached:
    return cached[:limit]

today = datetime.now()
from_date = (today - timedelta(days=days_back)).strftime("%Y-%m-%d")
to_date = today.strftime("%Y-%m-%d")

try:
    news = await _finnhub_get(
        "/company-news",
        {"symbol": ticker, "from": from_date, "to": to_date, "limit": limit * 2}
    )

    # Normalize and filter
    results = []
    for item in news:
        if not item.get("headline"):
            continue

        # Parse datetime
        dt_str = item.get("datetime", 0)
        if dt_str:
            dt = datetime.fromtimestamp(dt_str)
            time_ago = _format_time_ago(dt)
        else:
            time_ago = "recent"

        results.append({
            "headline": item["headline"],
            "source": item.get("source", "Unknown"),
            "summary": item.get("summary", "")[:200] + "..." if len(item.get("summary", "")) > 200 else item.get("summary", ""),
            "url": item.get("url", ""),
            "datetime": dt_str,
            "time_ago": time_ago,
            "sentiment": _extract_sentiment(item["headline"], item.get("summary", ""))
        })

    _save_cache(cache_key, results)
    return results[:limit]
except Exception as e:
    return [{"error": str(e), "ticker": ticker}]

def _format_time_ago(dt: datetime) -> str:
"""Format datetime as '2h ago', '1d ago', etc."""
delta = datetime.now() - dt
if delta.days > 0:
return f"{delta.days}d ago"
hours = delta.seconds // 3600
if hours > 0:
return f"{hours}h ago"
minutes = delta.seconds // 60
return f"{minutes}m ago"

---------------------------------------------------------------------------

Sentiment Extraction

---------------------------------------------------------------------------

SENTIMENT_POSITIVE = [
"surge", "surges", "jump", "jumps", "soar", "soars", "rally", "rallies",
"gain", "gains", "rise", "rises", "up", "boost", "boosts", "beat",
"beats", "outperform", "outperforms", "bullish", "buy", "upgrade",
"upgrades", "raised", "strong", "positive", "growth", "profit",
"profits", "revenue up", "earnings beat", "exceeds", "record high",
" ATH", " all-time high", " moon", "rocket", "rip", "pump"
]

SENTIMENT_NEGATIVE = [
"drop", "drops", "fall", "falls", "plunge", "plunges", "crash",
"crashes", "decline", "declines", "tumble", "tumbles", "sink",
"sinks", "down", "slide", "slides", "plummet", "plummets",
"miss", "misses", "underperform", "underperforms", "bearish",
"sell", "downgrade", "downgrades", "cut", "weak", "negative",
"loss", "losses", "revenue down", "earnings miss", "layoff",
"layoffs", " investigation", " probe", "lawsuit", "low", "ATW",
"dump", "tank"
]

SENTIMENT_NEUTRAL = [
"report", "reports", "announces", "update", "update", "review",
"maintain", "maintains", "hold", "holds", "neutral", "flat"
]

def _extract_sentiment(headline: str, summary: str = "") -> dict:
"""Extract sentiment from headline and summary."""
text = (headline + " " + summary).lower()

positive_score = sum(1 for word in SENTIMENT_POSITIVE if word.lower() in text)
negative_score = sum(1 for word in SENTIMENT_NEGATIVE if word.lower() in text)
neutral_score = sum(1 for word in SENTIMENT_NEUTRAL if word.lower() in text)

# Determine overall sentiment
if positive_score > negative_score:
    label = "positive"
    emoji = "📈"
elif negative_score > positive_score:
    label = "negative"
    emoji = "📉"
else:
    label = "neutral"
    emoji = "➖"

# Calculate confidence (0-1)
total_signals = positive_score + negative_score
if total_signals > 0:
    confidence = max(positive_score, negative_score) / (total_signals + neutral_score * 0.5)
    confidence = min(confidence, 1.0)
else:
    confidence = 0.3  # Low confidence if no clear signals

return {
    "label": label,
    "emoji": emoji,
    "confidence": round(confidence, 2),
    "positive_signals": positive_score,
    "negative_signals": negative_score
}

def summarize_sentiment(news_items: list[dict]) -> dict:
"""Summarize sentiment across multiple news items.

Returns dict with overall sentiment and key highlights.
"""
if not news_items:
    return {"label": "neutral", "emoji": "➖", "confidence": 0}

labels = [n["sentiment"]["label"] for n in news_items if "sentiment" in n]

positive_count = labels.count("positive")
negative_count = labels.count("negative")
neutral_count = labels.count("neutral")
total = len(labels)

if total == 0:
    return {"label": "neutral", "emoji": "➖", "confidence": 0}

# Determine overall
if positive_count > negative_count and positive_count >= total * 0.4:
    label = "positive"
    emoji = "📈"
elif negative_count > positive_count and negative_count >= total * 0.4:
    label = "negative"
    emoji = "📉"
else:
    label = "neutral"
    emoji = "➖"

confidence = max(positive_count, negative_count, neutral_count) / total

return {
    "label": label,
    "emoji": emoji,
    "confidence": round(confidence, 2),
    "positive_ratio": positive_count / total,
    "negative_ratio": negative_count / total,
    "total_items": total
}

---------------------------------------------------------------------------

News Formatting

---------------------------------------------------------------------------

def format_news_item(item: dict, include_ticker: bool = True) -> str:
"""Format a single news item for Telegram.

Format: 📰 [Ticker]: Headline (Source) [time]
"""
ticker_str = f"*{item.get('ticker', 'Market')}*: " if include_ticker else ""
sentiment_emoji = item.get("sentiment", {}).get("emoji", "")
headline = item.get("headline", "No headline")
source = item.get("source", "Unknown")
time_ago = item.get("time_ago", "")

return f"{sentiment_emoji} {ticker_str}{headline} _({source}, {time_ago})_"

def format_ticker_news(ticker: str, news_items: list[dict], max_items: int = 2) -> str:
"""Format news for a specific ticker.

Returns formatted string or empty string if no news.
"""
if not news_items or len(news_items) == 0:
    return ""

lines = []
for item in news_items[:max_items]:
    if "error" in item:
        continue
    sentiment_emoji = item.get("sentiment", {}).get("emoji", "")
    headline = item.get("headline", "")
    # Truncate long headlines
    if len(headline) > 80:
        headline = headline[:77] + "..."
    lines.append(f"  {sentiment_emoji} {headline}")

return "\n".join(lines) if lines else ""

def select_top_stories(all_news: dict[str, list[dict]], max_stories: int = 3) -> list[dict]:
"""Select top stories across all tickers.

Prioritizes:
1. High sentiment confidence
2. Recent news
3. Major tickers
"""
scored_news = []

for ticker, items in all_news.items():
    for item in items:
        if "error" in item:
            continue

        # Calculate score
        sentiment = item.get("sentiment", {})
        confidence = sentiment.get("confidence", 0)
        label = sentiment.get("label", "neutral")

        # Boost for strong sentiment
        sentiment_boost = 2.0 if label in ["positive", "negative"] else 1.0

        # Recency boost (assume higher in list = more recent from Finnhub)

        # Major ticker boost
        major_tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "NVDA", "META"]
        ticker_boost = 1.5 if ticker in major_tickers else 1.0

        score = confidence * sentiment_boost * ticker_boost

        scored_news.append({
            **item,
            "ticker": ticker,
            "score": score
        })

# Sort by score descending
scored_news.sort(key=lambda x: x["score"], reverse=True)

return scored_news[:max_stories]

---------------------------------------------------------------------------

Market News (General)

---------------------------------------------------------------------------

async def get_market_news(category: str = "general", limit: int = 5) -> list[dict]:
"""Get general market news.

Categories: general, forex, crypto, merger
"""
cache_key = f"market_news_{category}"
cached = _load_cache(cache_key, max_age_minutes=60)
if cached:
    return cached[:limit]

try:
    news = await _finnhub_get("/news", {"category": category})

    results = []
    for item in news[:limit * 2]:
        if not item.get("headline"):
            continue

        dt_str = item.get("datetime", 0)
        if dt_str:
            dt = datetime.fromtimestamp(dt_str)
            time_ago = _format_time_ago(dt)
        else:
            time_ago = "recent"

        results.append({
            "headline": item["headline"],
            "source": item.get("source", "Unknown"),
            "category": item.get("category", "general"),
            "time_ago": time_ago,
            "sentiment": _extract_sentiment(item["headline"])
        })

    _save_cache(cache_key, results)
    return results[:limit]
except Exception:
    return []

---------------------------------------------------------------------------

Data Freshness Check

---------------------------------------------------------------------------

def check_data_freshness(tickers: list[str]) -> dict:
"""Check if cached data is stale.

Returns dict with:
- is_fresh: bool
- stale_tickers: list of tickers with stale data
- oldest_cache: timestamp of oldest cached data
"""
stale_tickers = []
oldest_cache = datetime.now()

for ticker in tickers:
    cache_file = _cache_path(f"quote_{ticker}")
    if not cache_file.exists():
        stale_tickers.append(ticker)
        continue

    try:
        with open(cache_file) as f:
            cached = json.load(f)
        cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01"))
        age = datetime.now() - cached_at

        if age > timedelta(hours=2):
            stale_tickers.append(ticker)

        if cached_at < oldest_cache:
            oldest_cache = cached_at
    except (json.JSONDecodeError, KeyError):
        stale_tickers.append(ticker)

return {
    "is_fresh": len(stale_tickers) == 0,
    "stale_tickers": stale_tickers,
    "stale_count": len(stale_tickers),
    "oldest_cache": oldest_cache.isoformat()
}

def get_cache_status() -> dict:
"""Get overall cache status."""
status = {
"quotes": {},
"news": {},
"sentiment": {}
}

now = datetime.now()

for cache_file in CACHE_DIR.glob("*.json"):
    try:
        with open(cache_file) as f:
            cached = json.load(f)

        cached_at = datetime.fromisoformat(cached.get("_cached_at", now.isoformat()))
        age_minutes = (now - cached_at).total_seconds() / 60

        key = cache_file.stem
        if key.startswith("quote_"):
            status["quotes"][key] = {"age_minutes": int(age_minutes)}
        elif key.startswith("news_"):
            status["news"][key] = {"age_minutes": int(age_minutes)}
        elif key.startswith("sentiment_"):
            status["sentiment"][key] = {"age_minutes": int(age_minutes)}
    except Exception:
        continue

return status