"""News fetching and sentiment extraction for market briefing.
Fetches company news from Finnhub, extracts sentiment from headlines,
and formats news items for the briefing.
"""
import json
import os
import re
from datetime import datetime, timedelta
from pathlib import Path
import httpx
from icarus.core.config.staging import DATA_DIR
FINNHUB_BASE = "https://finnhub.io/api/v1"
FINNHUB_API_KEY = os.environ.get("FINNHUB_API_KEY")
CACHE_DIR = DATA_DIR / "market_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
---------------------------------------------------------------------------
Cache Management
---------------------------------------------------------------------------
def _cache_path(key: str) -> Path:
"""Get cache file path for a key."""
return CACHE_DIR / f"{key}.json"
def _load_cache(key: str, max_age_minutes: int) -> dict | None:
"""Load cached data if fresh enough."""
cache_file = _cache_path(key)
if not cache_file.exists():
return None
try:
with open(cache_file) as f:
cached = json.load(f)
cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01"))
age = datetime.now() - cached_at
if age > timedelta(minutes=max_age_minutes):
return None
return cached.get("data")
except (json.JSONDecodeError, KeyError, ValueError):
return None
def _save_cache(key: str, data: dict) -> None:
"""Save data to cache with timestamp."""
cache_file = _cache_path(key)
with open(cache_file, "w") as f:
json.dump({"_cached_at": datetime.now().isoformat(), "data": data}, f, indent=2)
---------------------------------------------------------------------------
API Helpers
---------------------------------------------------------------------------
async def _finnhub_get(endpoint: str, params: dict | None = None) -> dict:
"""Make authenticated Finnhub API request."""
if not FINNHUB_API_KEY:
raise RuntimeError("FINNHUB_API_KEY not set")
url = f"{FINNHUB_BASE}{endpoint}"
query = params or {}
query["token"] = FINNHUB_API_KEY
async with httpx.AsyncClient() as client:
resp = await client.get(url, params=query, timeout=30.0)
resp.raise_for_status()
return resp.json()
---------------------------------------------------------------------------
News Fetching
---------------------------------------------------------------------------
async def get_company_news(ticker: str, limit: int = 5, days_back: int = 3) -> list[dict]:
"""Get company news for a ticker.
Args:
ticker: Stock symbol
limit: Maximum news items to return
days_back: How many days back to fetch
Returns:
List of news items with headline, source, date, summary
"""
cache_key = f"news_{ticker}_{days_back}"
cached = _load_cache(cache_key, max_age_minutes=120)
if cached:
return cached[:limit]
today = datetime.now()
from_date = (today - timedelta(days=days_back)).strftime("%Y-%m-%d")
to_date = today.strftime("%Y-%m-%d")
try:
news = await _finnhub_get(
"/company-news",
{"symbol": ticker, "from": from_date, "to": to_date, "limit": limit * 2}
)
# Normalize and filter
results = []
for item in news:
if not item.get("headline"):
continue
# Parse datetime
dt_str = item.get("datetime", 0)
if dt_str:
dt = datetime.fromtimestamp(dt_str)
time_ago = _format_time_ago(dt)
else:
time_ago = "recent"
results.append({
"headline": item["headline"],
"source": item.get("source", "Unknown"),
"summary": item.get("summary", "")[:200] + "..." if len(item.get("summary", "")) > 200 else item.get("summary", ""),
"url": item.get("url", ""),
"datetime": dt_str,
"time_ago": time_ago,
"sentiment": _extract_sentiment(item["headline"], item.get("summary", ""))
})
_save_cache(cache_key, results)
return results[:limit]
except Exception as e:
return [{"error": str(e), "ticker": ticker}]
def _format_time_ago(dt: datetime) -> str:
"""Format datetime as '2h ago', '1d ago', etc."""
delta = datetime.now() - dt
if delta.days > 0:
return f"{delta.days}d ago"
hours = delta.seconds // 3600
if hours > 0:
return f"{hours}h ago"
minutes = delta.seconds // 60
return f"{minutes}m ago"
---------------------------------------------------------------------------
Sentiment Extraction
---------------------------------------------------------------------------
SENTIMENT_POSITIVE = [
"surge", "surges", "jump", "jumps", "soar", "soars", "rally", "rallies",
"gain", "gains", "rise", "rises", "up", "boost", "boosts", "beat",
"beats", "outperform", "outperforms", "bullish", "buy", "upgrade",
"upgrades", "raised", "strong", "positive", "growth", "profit",
"profits", "revenue up", "earnings beat", "exceeds", "record high",
" ATH", " all-time high", " moon", "rocket", "rip", "pump"
]
SENTIMENT_NEGATIVE = [
"drop", "drops", "fall", "falls", "plunge", "plunges", "crash",
"crashes", "decline", "declines", "tumble", "tumbles", "sink",
"sinks", "down", "slide", "slides", "plummet", "plummets",
"miss", "misses", "underperform", "underperforms", "bearish",
"sell", "downgrade", "downgrades", "cut", "weak", "negative",
"loss", "losses", "revenue down", "earnings miss", "layoff",
"layoffs", " investigation", " probe", "lawsuit", "low", "ATW",
"dump", "tank"
]
SENTIMENT_NEUTRAL = [
"report", "reports", "announces", "update", "update", "review",
"maintain", "maintains", "hold", "holds", "neutral", "flat"
]
def _extract_sentiment(headline: str, summary: str = "") -> dict:
"""Extract sentiment from headline and summary."""
text = (headline + " " + summary).lower()
positive_score = sum(1 for word in SENTIMENT_POSITIVE if word.lower() in text)
negative_score = sum(1 for word in SENTIMENT_NEGATIVE if word.lower() in text)
neutral_score = sum(1 for word in SENTIMENT_NEUTRAL if word.lower() in text)
# Determine overall sentiment
if positive_score > negative_score:
label = "positive"
emoji = "📈"
elif negative_score > positive_score:
label = "negative"
emoji = "📉"
else:
label = "neutral"
emoji = "➖"
# Calculate confidence (0-1)
total_signals = positive_score + negative_score
if total_signals > 0:
confidence = max(positive_score, negative_score) / (total_signals + neutral_score * 0.5)
confidence = min(confidence, 1.0)
else:
confidence = 0.3 # Low confidence if no clear signals
return {
"label": label,
"emoji": emoji,
"confidence": round(confidence, 2),
"positive_signals": positive_score,
"negative_signals": negative_score
}
def summarize_sentiment(news_items: list[dict]) -> dict:
"""Summarize sentiment across multiple news items.
Returns dict with overall sentiment and key highlights.
"""
if not news_items:
return {"label": "neutral", "emoji": "➖", "confidence": 0}
labels = [n["sentiment"]["label"] for n in news_items if "sentiment" in n]
positive_count = labels.count("positive")
negative_count = labels.count("negative")
neutral_count = labels.count("neutral")
total = len(labels)
if total == 0:
return {"label": "neutral", "emoji": "➖", "confidence": 0}
# Determine overall
if positive_count > negative_count and positive_count >= total * 0.4:
label = "positive"
emoji = "📈"
elif negative_count > positive_count and negative_count >= total * 0.4:
label = "negative"
emoji = "📉"
else:
label = "neutral"
emoji = "➖"
confidence = max(positive_count, negative_count, neutral_count) / total
return {
"label": label,
"emoji": emoji,
"confidence": round(confidence, 2),
"positive_ratio": positive_count / total,
"negative_ratio": negative_count / total,
"total_items": total
}
---------------------------------------------------------------------------
News Formatting
---------------------------------------------------------------------------
def format_news_item(item: dict, include_ticker: bool = True) -> str:
"""Format a single news item for Telegram.
Format: 📰 [Ticker]: Headline (Source) [time]
"""
ticker_str = f"*{item.get('ticker', 'Market')}*: " if include_ticker else ""
sentiment_emoji = item.get("sentiment", {}).get("emoji", "")
headline = item.get("headline", "No headline")
source = item.get("source", "Unknown")
time_ago = item.get("time_ago", "")
return f"{sentiment_emoji} {ticker_str}{headline} _({source}, {time_ago})_"
def format_ticker_news(ticker: str, news_items: list[dict], max_items: int = 2) -> str:
"""Format news for a specific ticker.
Returns formatted string or empty string if no news.
"""
if not news_items or len(news_items) == 0:
return ""
lines = []
for item in news_items[:max_items]:
if "error" in item:
continue
sentiment_emoji = item.get("sentiment", {}).get("emoji", "")
headline = item.get("headline", "")
# Truncate long headlines
if len(headline) > 80:
headline = headline[:77] + "..."
lines.append(f" {sentiment_emoji} {headline}")
return "\n".join(lines) if lines else ""
def select_top_stories(all_news: dict[str, list[dict]], max_stories: int = 3) -> list[dict]:
"""Select top stories across all tickers.
Prioritizes:
1. High sentiment confidence
2. Recent news
3. Major tickers
"""
scored_news = []
for ticker, items in all_news.items():
for item in items:
if "error" in item:
continue
# Calculate score
sentiment = item.get("sentiment", {})
confidence = sentiment.get("confidence", 0)
label = sentiment.get("label", "neutral")
# Boost for strong sentiment
sentiment_boost = 2.0 if label in ["positive", "negative"] else 1.0
# Recency boost (assume higher in list = more recent from Finnhub)
# Major ticker boost
major_tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "NVDA", "META"]
ticker_boost = 1.5 if ticker in major_tickers else 1.0
score = confidence * sentiment_boost * ticker_boost
scored_news.append({
**item,
"ticker": ticker,
"score": score
})
# Sort by score descending
scored_news.sort(key=lambda x: x["score"], reverse=True)
return scored_news[:max_stories]
---------------------------------------------------------------------------
Market News (General)
---------------------------------------------------------------------------
async def get_market_news(category: str = "general", limit: int = 5) -> list[dict]:
"""Get general market news.
Categories: general, forex, crypto, merger
"""
cache_key = f"market_news_{category}"
cached = _load_cache(cache_key, max_age_minutes=60)
if cached:
return cached[:limit]
try:
news = await _finnhub_get("/news", {"category": category})
results = []
for item in news[:limit * 2]:
if not item.get("headline"):
continue
dt_str = item.get("datetime", 0)
if dt_str:
dt = datetime.fromtimestamp(dt_str)
time_ago = _format_time_ago(dt)
else:
time_ago = "recent"
results.append({
"headline": item["headline"],
"source": item.get("source", "Unknown"),
"category": item.get("category", "general"),
"time_ago": time_ago,
"sentiment": _extract_sentiment(item["headline"])
})
_save_cache(cache_key, results)
return results[:limit]
except Exception:
return []
---------------------------------------------------------------------------
Data Freshness Check
---------------------------------------------------------------------------
def check_data_freshness(tickers: list[str]) -> dict:
"""Check if cached data is stale.
Returns dict with:
- is_fresh: bool
- stale_tickers: list of tickers with stale data
- oldest_cache: timestamp of oldest cached data
"""
stale_tickers = []
oldest_cache = datetime.now()
for ticker in tickers:
cache_file = _cache_path(f"quote_{ticker}")
if not cache_file.exists():
stale_tickers.append(ticker)
continue
try:
with open(cache_file) as f:
cached = json.load(f)
cached_at = datetime.fromisoformat(cached.get("_cached_at", "2000-01-01"))
age = datetime.now() - cached_at
if age > timedelta(hours=2):
stale_tickers.append(ticker)
if cached_at < oldest_cache:
oldest_cache = cached_at
except (json.JSONDecodeError, KeyError):
stale_tickers.append(ticker)
return {
"is_fresh": len(stale_tickers) == 0,
"stale_tickers": stale_tickers,
"stale_count": len(stale_tickers),
"oldest_cache": oldest_cache.isoformat()
}
def get_cache_status() -> dict:
"""Get overall cache status."""
status = {
"quotes": {},
"news": {},
"sentiment": {}
}
now = datetime.now()
for cache_file in CACHE_DIR.glob("*.json"):
try:
with open(cache_file) as f:
cached = json.load(f)
cached_at = datetime.fromisoformat(cached.get("_cached_at", now.isoformat()))
age_minutes = (now - cached_at).total_seconds() / 60
key = cache_file.stem
if key.startswith("quote_"):
status["quotes"][key] = {"age_minutes": int(age_minutes)}
elif key.startswith("news_"):
status["news"][key] = {"age_minutes": int(age_minutes)}
elif key.startswith("sentiment_"):
status["sentiment"][key] = {"age_minutes": int(age_minutes)}
except Exception:
continue
return status