📄 watchlist.py 9,572 bytes May 01, 2026 📋 Raw

"""
Dynamic watchlist generation with rotation.
"""
import json
import os
import random
from datetime import datetime, timedelta
from typing import List, Dict, Any

from .sentiment import get_quote, get_top_gainers_losers, _is_stale

USER_WATCHLIST_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_user.json"
DYNAMIC_WATCHLIST_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_dynamic.json"
HISTORY_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_history.json"

MIN_MARKET_CAP_BILLIONS = 1 # $1B minimum
DYNAMIC_COUNT = 4 # Number of dynamic tickers to add

def load_user_watchlist() -> List[str]:
"""Load the base user watchlist."""
try:
with open(USER_WATCHLIST_PATH, 'r') as f:
data = json.load(f)
if isinstance(data, dict):
return data.get("tickers", [])
return data
except (FileNotFoundError, json.JSONDecodeError):
# Fallback default
return ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "NVDA", "META", "JPM"]

def load_dynamic_watchlist() -> List[str]:
"""Load the current dynamic watchlist."""
try:
with open(DYNAMIC_WATCHLIST_PATH, 'r') as f:
data = json.load(f)
if isinstance(data, list):
return data
return []
except (FileNotFoundError, json.JSONDecodeError):
return []

def save_dynamic_watchlist(tickers: List[str]):
"""Save the dynamic watchlist."""
with open(DYNAMIC_WATCHLIST_PATH, 'w') as f:
json.dump(tickers, f, indent=2)

def load_history() -> List[str]:
"""Load the history of recently used dynamic tickers."""
try:
with open(HISTORY_PATH, 'r') as f:
data = json.load(f)
if isinstance(data, list):
# Keep last 14 days (2 weeks) of history
return data[-28:] if len(data) > 28 else data
return []
except (FileNotFoundError, json.JSONDecodeError):
return []

def save_history(history: List[str]):
"""Save the ticker history."""
with open(HISTORY_PATH, 'w') as f:
json.dump(history[-28:], f, indent=2) # Keep last 28 entries (2 weeks)

def filter_by_market_cap(candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filter candidates by market cap > $1B."""
filtered = []
for candidate in candidates:
symbol = candidate.get("symbol")
if not symbol:
continue

    # Try to get quote for market cap info
    # Finnhub doesn't directly give us market cap in the trending list,
    # so we'll check the quote data if available
    quote = get_quote(symbol)
    if quote:
        # Rough estimate: price * shares (we don't have shares, so we use price as proxy)
        # For now, we assume stocks in our trending list are likely > $1B
        # and supplement with a price check (> $5 typically indicates larger cap)
        current_price = quote.get("c", 0)
        if current_price > 5:  # Filter out penny stocks
            filtered.append(candidate)
    else:
        # If we can't check, include it (will be validated later)
        filtered.append(candidate)

return filtered

def select_dynamic_tickers(candidates: List[Dict[str, Any]],
history: List[str],
count: int = DYNAMIC_COUNT) -> List[str]:
"""Select dynamic tickers, avoiding recent history."""
# Get yesterday's tickers (last count items from history)
yesterday_tickers = set(history[-count:] if len(history) >= count else history)

# Also get tickers from the past 7 days to avoid repetition
recent_tickers = set(history[-28:] if len(history) > 28 else history)

# First priority: tickers not in recent history
fresh_candidates = [c for c in candidates if c.get("symbol") not in recent_tickers]

# Second priority: tickers not from yesterday but possibly in older history
available = [c for c in candidates if c.get("symbol") not in yesterday_tickers]

selected = []

# Try fresh candidates first
random.shuffle(fresh_candidates)
for candidate in fresh_candidates[:count]:
    symbol = candidate.get("symbol")
    if symbol and symbol not in yesterday_tickers:
        selected.append(symbol)

# Fill remaining slots from available (excluding yesterday)
if len(selected) < count:
    remaining_needed = count - len(selected)
    already_selected = set(selected)
    for candidate in available:
        symbol = candidate.get("symbol")
        if symbol and symbol not in yesterday_tickers and symbol not in already_selected:
            selected.append(symbol)
            if len(selected) >= count:
                break

# If still not enough, include some from yesterday as last resort
if len(selected) < count and len(candidates) > count:
    # Get candidates not yet selected
    selected_set = set(selected)
    for candidate in candidates:
        symbol = candidate.get("symbol")
        if symbol and symbol not in selected_set:
            selected.append(symbol)
            if len(selected) >= count:
                break

return selected

def get_trending_reason(symbol: str) -> str:
"""Get a reason why a ticker is trending."""
quote = get_quote(symbol)
if quote:
change_pct = quote.get("dp", 0) # Percent change
if change_pct > 5:
return f"🔥 Up {change_pct:.1f}% today"
elif change_pct > 2:
return f"📈 Up {change_pct:.1f}%"
elif change_pct < -5:
return f"🔻 Down {change_pct:.1f}% (oversold bounce potential)"
elif change_pct < -2:
return f"📉 Down {change_pct:.1f}%"

reasons = [
    "High volume today",
    "Social media trending",
    "Sector momentum",
    "Analyst coverage",
]
return random.choice(reasons)

def generate_dynamic_watchlist(force: bool = False) -> List[str]:
"""
Generate a new dynamic watchlist for today.

Args:
    force: If True, regenerate even if today's list exists

Returns:
    List of dynamic ticker symbols
"""
current_dynamic = load_dynamic_watchlist()

# Check if we need to regenerate (only once per day)
if not force and current_dynamic and not _is_stale("watchlist", 22 * 60 * 60):
    print(f"Using existing dynamic watchlist: {current_dynamic}")
    return current_dynamic

print("Generating new dynamic watchlist...")

# Get trending candidates
trending_data = get_top_gainers_losers()
candidates = trending_data.get("trending", [])

# Filter by market cap proxy
filtered = filter_by_market_cap(candidates)

# Load history to avoid repeats
history = load_history()

# Select new dynamic tickers
new_dynamic = select_dynamic_tickers(filtered, history)

if new_dynamic:
    # Update history
    history.extend(new_dynamic)
    save_history(history)

    # Save new dynamic watchlist
    save_dynamic_watchlist(new_dynamic)

    print(f"Generated dynamic watchlist: {new_dynamic}")
else:
    # Fallback to some known tickers if API fails
    fallback = ["AMD", "CRM", "NFLX", "DIS"]
    # Filter out recently used
    available_fallback = [f for f in fallback if f not in set(history[-7:])]
    if available_fallback:
        new_dynamic = available_fallback[:DYNAMIC_COUNT]
    else:
        new_dynamic = fallback[:DYNAMIC_COUNT]

    save_dynamic_watchlist(new_dynamic)
    print(f"Using fallback dynamic watchlist: {new_dynamic}")

return new_dynamic

def get_full_watchlist() -> List[str]:
"""Get the combined user + dynamic watchlist."""
user = load_user_watchlist()
dynamic = generate_dynamic_watchlist()

# Combine and remove duplicates
full = user.copy()
for ticker in dynamic:
    if ticker not in full:
        full.append(ticker)

return full

def get_watchlist_with_metadata() -> Dict[str, Any]:
"""Get watchlist with metadata for each ticker."""
user = load_user_watchlist()
dynamic = generate_dynamic_watchlist()

user_with_data = []
for ticker in user:
    quote = get_quote(ticker)
    if quote:
        user_with_data.append({
            "symbol": ticker,
            "price": quote.get("c", 0),
            "change": quote.get("d", 0),
            "change_pct": quote.get("dp", 0),
            "type": "user"
        })

dynamic_with_data = []
for ticker in dynamic:
    quote = get_quote(ticker)
    reason = get_trending_reason(ticker)
    if quote:
        dynamic_with_data.append({
            "symbol": ticker,
            "price": quote.get("c", 0),
            "change": quote.get("d", 0),
            "change_pct": quote.get("dp", 0),
            "type": "dynamic",
            "reason": reason
        })

return {
    "user": user_with_data,
    "dynamic": dynamic_with_data,
    "all_symbols": user + [d for d in dynamic if d not in user]
}

if name == "main":
# Test
print("User watchlist:", load_user_watchlist())
print("Full watchlist:", get_full_watchlist())
print("With metadata:", get_watchlist_with_metadata())