""" Dynamic watchlist generation with rotation. """ import json import os import random from datetime import datetime, timedelta from typing import List, Dict, Any from .sentiment import get_quote, get_top_gainers_losers, _is_stale USER_WATCHLIST_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_user.json" DYNAMIC_WATCHLIST_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_dynamic.json" HISTORY_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_history.json" MIN_MARKET_CAP_BILLIONS = 1 # $1B minimum DYNAMIC_COUNT = 4 # Number of dynamic tickers to add def load_user_watchlist() -> List[str]: """Load the base user watchlist.""" try: with open(USER_WATCHLIST_PATH, 'r') as f: data = json.load(f) if isinstance(data, dict): return data.get("tickers", []) return data except (FileNotFoundError, json.JSONDecodeError): # Fallback default return ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "NVDA", "META", "JPM"] def load_dynamic_watchlist() -> List[str]: """Load the current dynamic watchlist.""" try: with open(DYNAMIC_WATCHLIST_PATH, 'r') as f: data = json.load(f) if isinstance(data, list): return data return [] except (FileNotFoundError, json.JSONDecodeError): return [] def save_dynamic_watchlist(tickers: List[str]): """Save the dynamic watchlist.""" with open(DYNAMIC_WATCHLIST_PATH, 'w') as f: json.dump(tickers, f, indent=2) def load_history() -> List[str]: """Load the history of recently used dynamic tickers.""" try: with open(HISTORY_PATH, 'r') as f: data = json.load(f) if isinstance(data, list): # Keep last 14 days (2 weeks) of history return data[-28:] if len(data) > 28 else data return [] except (FileNotFoundError, json.JSONDecodeError): return [] def save_history(history: List[str]): """Save the ticker history.""" with open(HISTORY_PATH, 'w') as f: json.dump(history[-28:], f, indent=2) # Keep last 28 entries (2 weeks) def filter_by_market_cap(candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Filter candidates by market cap > $1B.""" filtered = [] for candidate in candidates: symbol = candidate.get("symbol") if not symbol: continue # Try to get quote for market cap info # Finnhub doesn't directly give us market cap in the trending list, # so we'll check the quote data if available quote = get_quote(symbol) if quote: # Rough estimate: price * shares (we don't have shares, so we use price as proxy) # For now, we assume stocks in our trending list are likely > $1B # and supplement with a price check (> $5 typically indicates larger cap) current_price = quote.get("c", 0) if current_price > 5: # Filter out penny stocks filtered.append(candidate) else: # If we can't check, include it (will be validated later) filtered.append(candidate) return filtered def select_dynamic_tickers(candidates: List[Dict[str, Any]], history: List[str], count: int = DYNAMIC_COUNT) -> List[str]: """Select dynamic tickers, avoiding recent history.""" # Get yesterday's tickers (last count items from history) yesterday_tickers = set(history[-count:] if len(history) >= count else history) # Also get tickers from the past 7 days to avoid repetition recent_tickers = set(history[-28:] if len(history) > 28 else history) # First priority: tickers not in recent history fresh_candidates = [c for c in candidates if c.get("symbol") not in recent_tickers] # Second priority: tickers not from yesterday but possibly in older history available = [c for c in candidates if c.get("symbol") not in yesterday_tickers] selected = [] # Try fresh candidates first random.shuffle(fresh_candidates) for candidate in fresh_candidates[:count]: symbol = candidate.get("symbol") if symbol and symbol not in yesterday_tickers: selected.append(symbol) # Fill remaining slots from available (excluding yesterday) if len(selected) < count: remaining_needed = count - len(selected) already_selected = set(selected) for candidate in available: symbol = candidate.get("symbol") if symbol and symbol not in yesterday_tickers and symbol not in already_selected: selected.append(symbol) if len(selected) >= count: break # If still not enough, include some from yesterday as last resort if len(selected) < count and len(candidates) > count: # Get candidates not yet selected selected_set = set(selected) for candidate in candidates: symbol = candidate.get("symbol") if symbol and symbol not in selected_set: selected.append(symbol) if len(selected) >= count: break return selected def get_trending_reason(symbol: str) -> str: """Get a reason why a ticker is trending.""" quote = get_quote(symbol) if quote: change_pct = quote.get("dp", 0) # Percent change if change_pct > 5: return f"🔥 Up {change_pct:.1f}% today" elif change_pct > 2: return f"📈 Up {change_pct:.1f}%" elif change_pct < -5: return f"🔻 Down {change_pct:.1f}% (oversold bounce potential)" elif change_pct < -2: return f"📉 Down {change_pct:.1f}%" reasons = [ "High volume today", "Social media trending", "Sector momentum", "Analyst coverage", ] return random.choice(reasons) def generate_dynamic_watchlist(force: bool = False) -> List[str]: """ Generate a new dynamic watchlist for today. Args: force: If True, regenerate even if today's list exists Returns: List of dynamic ticker symbols """ current_dynamic = load_dynamic_watchlist() # Check if we need to regenerate (only once per day) if not force and current_dynamic and not _is_stale("watchlist", 22 * 60 * 60): print(f"Using existing dynamic watchlist: {current_dynamic}") return current_dynamic print("Generating new dynamic watchlist...") # Get trending candidates trending_data = get_top_gainers_losers() candidates = trending_data.get("trending", []) # Filter by market cap proxy filtered = filter_by_market_cap(candidates) # Load history to avoid repeats history = load_history() # Select new dynamic tickers new_dynamic = select_dynamic_tickers(filtered, history) if new_dynamic: # Update history history.extend(new_dynamic) save_history(history) # Save new dynamic watchlist save_dynamic_watchlist(new_dynamic) print(f"Generated dynamic watchlist: {new_dynamic}") else: # Fallback to some known tickers if API fails fallback = ["AMD", "CRM", "NFLX", "DIS"] # Filter out recently used available_fallback = [f for f in fallback if f not in set(history[-7:])] if available_fallback: new_dynamic = available_fallback[:DYNAMIC_COUNT] else: new_dynamic = fallback[:DYNAMIC_COUNT] save_dynamic_watchlist(new_dynamic) print(f"Using fallback dynamic watchlist: {new_dynamic}") return new_dynamic def get_full_watchlist() -> List[str]: """Get the combined user + dynamic watchlist.""" user = load_user_watchlist() dynamic = generate_dynamic_watchlist() # Combine and remove duplicates full = user.copy() for ticker in dynamic: if ticker not in full: full.append(ticker) return full def get_watchlist_with_metadata() -> Dict[str, Any]: """Get watchlist with metadata for each ticker.""" user = load_user_watchlist() dynamic = generate_dynamic_watchlist() user_with_data = [] for ticker in user: quote = get_quote(ticker) if quote: user_with_data.append({ "symbol": ticker, "price": quote.get("c", 0), "change": quote.get("d", 0), "change_pct": quote.get("dp", 0), "type": "user" }) dynamic_with_data = [] for ticker in dynamic: quote = get_quote(ticker) reason = get_trending_reason(ticker) if quote: dynamic_with_data.append({ "symbol": ticker, "price": quote.get("c", 0), "change": quote.get("d", 0), "change_pct": quote.get("dp", 0), "type": "dynamic", "reason": reason }) return { "user": user_with_data, "dynamic": dynamic_with_data, "all_symbols": user + [d for d in dynamic if d not in user] } if __name__ == "__main__": # Test print("User watchlist:", load_user_watchlist()) print("Full watchlist:", get_full_watchlist()) print("With metadata:", get_watchlist_with_metadata())