"""
Dynamic watchlist generation with rotation.
"""
import json
import os
import random
from datetime import datetime, timedelta
from typing import List, Dict, Any
from .sentiment import get_quote, get_top_gainers_losers, _is_stale
USER_WATCHLIST_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_user.json"
DYNAMIC_WATCHLIST_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_dynamic.json"
HISTORY_PATH = "/home/hoffmann_admin/.openclaw/workspace/data/watchlist_history.json"
MIN_MARKET_CAP_BILLIONS = 1 # $1B minimum
DYNAMIC_COUNT = 4 # Number of dynamic tickers to add
def load_user_watchlist() -> List[str]:
"""Load the base user watchlist."""
try:
with open(USER_WATCHLIST_PATH, 'r') as f:
data = json.load(f)
if isinstance(data, dict):
return data.get("tickers", [])
return data
except (FileNotFoundError, json.JSONDecodeError):
# Fallback default
return ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "NVDA", "META", "JPM"]
def load_dynamic_watchlist() -> List[str]:
"""Load the current dynamic watchlist."""
try:
with open(DYNAMIC_WATCHLIST_PATH, 'r') as f:
data = json.load(f)
if isinstance(data, list):
return data
return []
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_dynamic_watchlist(tickers: List[str]):
"""Save the dynamic watchlist."""
with open(DYNAMIC_WATCHLIST_PATH, 'w') as f:
json.dump(tickers, f, indent=2)
def load_history() -> List[str]:
"""Load the history of recently used dynamic tickers."""
try:
with open(HISTORY_PATH, 'r') as f:
data = json.load(f)
if isinstance(data, list):
# Keep last 14 days (2 weeks) of history
return data[-28:] if len(data) > 28 else data
return []
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_history(history: List[str]):
"""Save the ticker history."""
with open(HISTORY_PATH, 'w') as f:
json.dump(history[-28:], f, indent=2) # Keep last 28 entries (2 weeks)
def filter_by_market_cap(candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filter candidates by market cap > $1B."""
filtered = []
for candidate in candidates:
symbol = candidate.get("symbol")
if not symbol:
continue
# Try to get quote for market cap info
# Finnhub doesn't directly give us market cap in the trending list,
# so we'll check the quote data if available
quote = get_quote(symbol)
if quote:
# Rough estimate: price * shares (we don't have shares, so we use price as proxy)
# For now, we assume stocks in our trending list are likely > $1B
# and supplement with a price check (> $5 typically indicates larger cap)
current_price = quote.get("c", 0)
if current_price > 5: # Filter out penny stocks
filtered.append(candidate)
else:
# If we can't check, include it (will be validated later)
filtered.append(candidate)
return filtered
def select_dynamic_tickers(candidates: List[Dict[str, Any]],
history: List[str],
count: int = DYNAMIC_COUNT) -> List[str]:
"""Select dynamic tickers, avoiding recent history."""
# Get yesterday's tickers (last count items from history)
yesterday_tickers = set(history[-count:] if len(history) >= count else history)
# Also get tickers from the past 7 days to avoid repetition
recent_tickers = set(history[-28:] if len(history) > 28 else history)
# First priority: tickers not in recent history
fresh_candidates = [c for c in candidates if c.get("symbol") not in recent_tickers]
# Second priority: tickers not from yesterday but possibly in older history
available = [c for c in candidates if c.get("symbol") not in yesterday_tickers]
selected = []
# Try fresh candidates first
random.shuffle(fresh_candidates)
for candidate in fresh_candidates[:count]:
symbol = candidate.get("symbol")
if symbol and symbol not in yesterday_tickers:
selected.append(symbol)
# Fill remaining slots from available (excluding yesterday)
if len(selected) < count:
remaining_needed = count - len(selected)
already_selected = set(selected)
for candidate in available:
symbol = candidate.get("symbol")
if symbol and symbol not in yesterday_tickers and symbol not in already_selected:
selected.append(symbol)
if len(selected) >= count:
break
# If still not enough, include some from yesterday as last resort
if len(selected) < count and len(candidates) > count:
# Get candidates not yet selected
selected_set = set(selected)
for candidate in candidates:
symbol = candidate.get("symbol")
if symbol and symbol not in selected_set:
selected.append(symbol)
if len(selected) >= count:
break
return selected
def get_trending_reason(symbol: str) -> str:
"""Get a reason why a ticker is trending."""
quote = get_quote(symbol)
if quote:
change_pct = quote.get("dp", 0) # Percent change
if change_pct > 5:
return f"🔥 Up {change_pct:.1f}% today"
elif change_pct > 2:
return f"📈 Up {change_pct:.1f}%"
elif change_pct < -5:
return f"🔻 Down {change_pct:.1f}% (oversold bounce potential)"
elif change_pct < -2:
return f"📉 Down {change_pct:.1f}%"
reasons = [
"High volume today",
"Social media trending",
"Sector momentum",
"Analyst coverage",
]
return random.choice(reasons)
def generate_dynamic_watchlist(force: bool = False) -> List[str]:
"""
Generate a new dynamic watchlist for today.
Args:
force: If True, regenerate even if today's list exists
Returns:
List of dynamic ticker symbols
"""
current_dynamic = load_dynamic_watchlist()
# Check if we need to regenerate (only once per day)
if not force and current_dynamic and not _is_stale("watchlist", 22 * 60 * 60):
print(f"Using existing dynamic watchlist: {current_dynamic}")
return current_dynamic
print("Generating new dynamic watchlist...")
# Get trending candidates
trending_data = get_top_gainers_losers()
candidates = trending_data.get("trending", [])
# Filter by market cap proxy
filtered = filter_by_market_cap(candidates)
# Load history to avoid repeats
history = load_history()
# Select new dynamic tickers
new_dynamic = select_dynamic_tickers(filtered, history)
if new_dynamic:
# Update history
history.extend(new_dynamic)
save_history(history)
# Save new dynamic watchlist
save_dynamic_watchlist(new_dynamic)
print(f"Generated dynamic watchlist: {new_dynamic}")
else:
# Fallback to some known tickers if API fails
fallback = ["AMD", "CRM", "NFLX", "DIS"]
# Filter out recently used
available_fallback = [f for f in fallback if f not in set(history[-7:])]
if available_fallback:
new_dynamic = available_fallback[:DYNAMIC_COUNT]
else:
new_dynamic = fallback[:DYNAMIC_COUNT]
save_dynamic_watchlist(new_dynamic)
print(f"Using fallback dynamic watchlist: {new_dynamic}")
return new_dynamic
def get_full_watchlist() -> List[str]:
"""Get the combined user + dynamic watchlist."""
user = load_user_watchlist()
dynamic = generate_dynamic_watchlist()
# Combine and remove duplicates
full = user.copy()
for ticker in dynamic:
if ticker not in full:
full.append(ticker)
return full
def get_watchlist_with_metadata() -> Dict[str, Any]:
"""Get watchlist with metadata for each ticker."""
user = load_user_watchlist()
dynamic = generate_dynamic_watchlist()
user_with_data = []
for ticker in user:
quote = get_quote(ticker)
if quote:
user_with_data.append({
"symbol": ticker,
"price": quote.get("c", 0),
"change": quote.get("d", 0),
"change_pct": quote.get("dp", 0),
"type": "user"
})
dynamic_with_data = []
for ticker in dynamic:
quote = get_quote(ticker)
reason = get_trending_reason(ticker)
if quote:
dynamic_with_data.append({
"symbol": ticker,
"price": quote.get("c", 0),
"change": quote.get("d", 0),
"change_pct": quote.get("dp", 0),
"type": "dynamic",
"reason": reason
})
return {
"user": user_with_data,
"dynamic": dynamic_with_data,
"all_symbols": user + [d for d in dynamic if d not in user]
}
if name == "main":
# Test
print("User watchlist:", load_user_watchlist())
print("Full watchlist:", get_full_watchlist())
print("With metadata:", get_watchlist_with_metadata())