"""Admin routes for blog module — HTML views with Jinja2 templates.""" import os import json from datetime import datetime, timedelta from typing import Optional from pathlib import Path from fastapi import APIRouter, Request, Query, HTTPException, Depends from fastapi.responses import HTMLResponse, RedirectResponse from starlette.templating import Jinja2Templates as StarletteJinja2Templates from starlette.staticfiles import StaticFiles from shared.session_auth import require_auth, get_session, is_authenticated from .models import CreatePostRequest from .service import ( list_posts, get_post, create_post, update_post, preview_markdown, PostNotFoundError, DuplicateSlugError ) from .storage import POSTS_DIR, DATA_DIR # Path to templates - shared across projects TEMPLATES_DIR = Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/templates") ADMIN_TEMPLATES_DIR = TEMPLATES_DIR / "admin" STATIC_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/static") # Ensure static directory exists STATIC_DIR.mkdir(parents=True, exist_ok=True) # Copy/link admin.css if it doesn't exist ADMIN_CSS_SOURCE = Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/admin.css") ADMIN_CSS_DEST = STATIC_DIR / "admin.css" # Custom Jinja2 filters def relative_time_filter(dt: Optional[datetime]) -> str: """Convert datetime to relative time string.""" if dt is None: return "never" now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now() diff = now - dt if diff < timedelta(seconds=10): return "just now" elif diff < timedelta(minutes=1): return f"{int(diff.seconds)} seconds ago" elif diff < timedelta(hours=1): return f"{int(diff.seconds / 60)} minutes ago" elif diff < timedelta(days=1): return f"{int(diff.seconds / 3600)} hours ago" elif diff < timedelta(days=7): return f"{diff.days} days ago" elif diff < timedelta(days=30): return f"{int(diff.days / 7)} weeks ago" else: return dt.strftime("%b %d, %Y") def filesize_filter(size_bytes: int) -> str: """Convert bytes to human readable format.""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024 return f"{size_bytes:.1f} TB" # Create templates instance using jinja2 directly from jinja2 import Environment, FileSystemLoader jinja_env = Environment(loader=FileSystemLoader(str(ADMIN_TEMPLATES_DIR))) jinja_env.filters['relative_time'] = relative_time_filter jinja_env.filters['filesize'] = filesize_filter admin_router = APIRouter() def render_template(template_name: str, context: dict) -> HTMLResponse: """Render a Jinja2 template to HTML response.""" template = jinja_env.get_template(template_name) html = template.render(**context) return HTMLResponse(content=html) def _get_user_info(request: Request) -> dict: """Get current user info for template context.""" session = get_session(request) if session: return { "user_id": session.get("user_id", "admin"), "user_email": session.get("user_email", "admin@hoffdesk.local"), } return {"user_id": "admin", "user_email": "admin@hoffdesk.local"} @admin_router.get("/login", response_class=HTMLResponse) def admin_login_page(request: Request): """Show login page if not authenticated.""" if is_authenticated(request): return RedirectResponse(url="/admin/blog/") # Redirect to unified login return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302) @admin_router.post("/login") async def admin_login_post(request: Request): """Handle login form submission - deprecated, use /auth/login.""" return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302) @admin_router.get("/", response_class=HTMLResponse) def admin_dashboard(request: Request): """Admin dashboard with stats and recent posts.""" # Check auth (session middleware handles redirect, but double-check here) if not is_authenticated(request): return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302) # Get stats all_posts = list_posts(page=1, per_page=1000, status=None, include_content=False) published_posts = list_posts(page=1, per_page=1000, status="published", include_content=False) # Calculate stats total = all_posts.total published = published_posts.total drafts = total - published # Get new this week one_week_ago = datetime.now() - timedelta(days=7) new_this_week = sum( 1 for p in all_posts.posts if hasattr(p, 'created_at') and p.created_at and p.created_at > one_week_ago ) stats = { "total_posts": total, "published": published, "drafts": drafts, "new_this_week": new_this_week, "published_change": 0, } # Get recent posts (last 5) - need full PostDetail for updated_at and status recent_posts = [] for post_summary in all_posts.posts[:10]: # Get first 10 then sort try: full_post = get_post(post_summary.slug, require_published=False) recent_posts.append(full_post) except PostNotFoundError: pass recent_posts = sorted(recent_posts, key=lambda p: p.updated_at, reverse=True)[:5] user_info = _get_user_info(request) return render_template( "admin_dashboard.html.j2", { "request": request, "stats": stats, "recent_posts": recent_posts, "draft_count": drafts, "published_count": published, "total_count": total, "last_deploy": None, "deploy_status": None, **user_info, } ) def get_post_counts() -> dict: """Get draft, published, and total post counts for sidebar.""" all_posts = list_posts(page=1, per_page=1000, status=None, include_content=False) published_posts = list_posts(page=1, per_page=1000, status="published", include_content=False) return { "total_count": all_posts.total, "published_count": published_posts.total, "draft_count": all_posts.total - published_posts.total, } @admin_router.get("/posts", response_class=HTMLResponse) def admin_post_list( request: Request, page: int = Query(1, ge=1), status: Optional[str] = Query(None, description="Filter by status"), ): """Admin post list with filtering.""" if not is_authenticated(request): return RedirectResponse(url="/login?redirect=/admin/blog/posts", status_code=302) # Determine status filter status_filter = None if status == "all" else status # Get posts with full details for template result = list_posts( page=page, per_page=20, status=status_filter, include_content=True # Need full PostDetail for updated_at ) counts = get_post_counts() user_info = _get_user_info(request) context = { "request": request, "posts": result.posts, "current_status": status, "current_page": page, "total_pages": result.total_pages, **counts, **user_info, } # If HTMX request, return just the content partial if request.headers.get("HX-Request") == "true": return render_template("components/post_list_content.html.j2", context) return render_template("admin_post_list.html.j2", context) @admin_router.get("/posts/new", response_class=HTMLResponse) def admin_new_post(request: Request): """New post editor page.""" if not is_authenticated(request): return RedirectResponse(url="/login?redirect=/admin/blog/posts/new", status_code=302) counts = get_post_counts() user_info = _get_user_info(request) return render_template( "admin_editor.html.j2", { "request": request, "post": None, **counts, **user_info, } ) @admin_router.get("/posts/{slug}/edit", response_class=HTMLResponse) def admin_edit_post(request: Request, slug: str): """Edit post page.""" if not is_authenticated(request): return RedirectResponse(url=f"/login?redirect=/admin/blog/posts/{slug}/edit", status_code=302) try: post = get_post(slug, require_published=False) except PostNotFoundError: raise HTTPException(status_code=404, detail="Post not found") counts = get_post_counts() user_info = _get_user_info(request) return render_template( "admin_editor.html.j2", { "request": request, "post": post, **counts, **user_info, } ) @admin_router.get("/images", response_class=HTMLResponse) def admin_images( request: Request, post: Optional[str] = Query(None, description="Filter by post slug") ): """Image browser page.""" if not is_authenticated(request): return RedirectResponse(url="/login?redirect=/admin/blog/images", status_code=302) # Get all posts for the dropdown all_posts = list_posts(page=1, per_page=1000, status=None, include_content=False) # Scan for images in post directories images = [] for post_summary in all_posts.posts: if post and post_summary.slug != post: continue post_dir = POSTS_DIR / post_summary.slug / "images" if post_dir.exists(): for img_path in post_dir.iterdir(): if img_path.is_file(): stat = img_path.stat() images.append({ "name": img_path.name, "url": f"/blog/{post_summary.slug}/images/{img_path.name}", "thumb_url": f"/blog/{post_summary.slug}/images/{img_path.name}", "size": stat.st_size, "post_slug": post_summary.slug, }) counts = get_post_counts() user_info = _get_user_info(request) return render_template( "admin_images.html.j2", { "request": request, "images": images, "posts": all_posts.posts, "current_post": post, **counts, **user_info, } ) @admin_router.post("/posts/preview", response_class=HTMLResponse) async def admin_preview(request: Request): """Preview markdown content (HTMX endpoint).""" if not is_authenticated(request): return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302) from urllib.parse import unquote_plus # Get content from request body body = await request.body() content = body.decode('utf-8') if body else "" # Strip 'content=' prefix if present (form encoded) if content.startswith('content='): content = content[8:] # Remove 'content=' # URL decode content = unquote_plus(content) # Render markdown result = preview_markdown(content) return HTMLResponse(content=result.content_html) def setup_admin_static(app): """Set up static file serving for admin assets.""" # Ensure CSS files are linked css_sources = [ (Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/admin.css"), "admin.css"), (Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/static/pipeline.css"), "pipeline.css"), ] for src, dest_name in css_sources: dest = STATIC_DIR / dest_name if src.exists() and not dest.exists(): import shutil shutil.copy(src, dest) # Mount static files app.mount("/blog/admin/static", StaticFiles(directory=str(STATIC_DIR)), name="admin_static")