📄 admin_router.py 11,927 bytes Apr 26, 2026 📋 Raw

"""Admin routes for blog module — HTML views with Jinja2 templates."""

import os
import json
from datetime import datetime, timedelta
from typing import Optional
from pathlib import Path

from fastapi import APIRouter, Request, Query, HTTPException, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from starlette.templating import Jinja2Templates as StarletteJinja2Templates
from starlette.staticfiles import StaticFiles

from shared.session_auth import require_auth, get_session, is_authenticated

from .models import CreatePostRequest
from .service import (
list_posts, get_post, create_post, update_post, preview_markdown,
PostNotFoundError, DuplicateSlugError
)
from .storage import POSTS_DIR, DATA_DIR

Path to templates - shared across projects

TEMPLATES_DIR = Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/templates")
ADMIN_TEMPLATES_DIR = TEMPLATES_DIR / "admin"
STATIC_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/static")

Ensure static directory exists

STATIC_DIR.mkdir(parents=True, exist_ok=True)

Copy/link admin.css if it doesn't exist

ADMIN_CSS_SOURCE = Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/admin.css")
ADMIN_CSS_DEST = STATIC_DIR / "admin.css"

Custom Jinja2 filters

def relative_time_filter(dt: Optional[datetime]) -> str:
"""Convert datetime to relative time string."""
if dt is None:
return "never"

now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now()
diff = now - dt

if diff < timedelta(seconds=10):
    return "just now"
elif diff < timedelta(minutes=1):
    return f"{int(diff.seconds)} seconds ago"
elif diff < timedelta(hours=1):
    return f"{int(diff.seconds / 60)} minutes ago"
elif diff < timedelta(days=1):
    return f"{int(diff.seconds / 3600)} hours ago"
elif diff < timedelta(days=7):
    return f"{diff.days} days ago"
elif diff < timedelta(days=30):
    return f"{int(diff.days / 7)} weeks ago"
else:
    return dt.strftime("%b %d, %Y")

def filesize_filter(size_bytes: int) -> str:
"""Convert bytes to human readable format."""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"

Create templates instance using jinja2 directly

from jinja2 import Environment, FileSystemLoader

jinja_env = Environment(loader=FileSystemLoader(str(ADMIN_TEMPLATES_DIR)))
jinja_env.filters['relative_time'] = relative_time_filter
jinja_env.filters['filesize'] = filesize_filter

admin_router = APIRouter()

def render_template(template_name: str, context: dict) -> HTMLResponse:
"""Render a Jinja2 template to HTML response."""
template = jinja_env.get_template(template_name)
html = template.render(**context)
return HTMLResponse(content=html)

def _get_user_info(request: Request) -> dict:
"""Get current user info for template context."""
session = get_session(request)
if session:
return {
"user_id": session.get("user_id", "admin"),
"user_email": session.get("user_email", "admin@hoffdesk.local"),
}
return {"user_id": "admin", "user_email": "admin@hoffdesk.local"}

@admin_router.get("/login", response_class=HTMLResponse)
def admin_login_page(request: Request):
"""Show login page if not authenticated."""
if is_authenticated(request):
return RedirectResponse(url="/admin/blog/")
# Redirect to unified login
return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302)

@admin_router.post("/login")
async def admin_login_post(request: Request):
"""Handle login form submission - deprecated, use /auth/login."""
return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302)

@admin_router.get("/", response_class=HTMLResponse)
def admin_dashboard(request: Request):
"""Admin dashboard with stats and recent posts."""
# Check auth (session middleware handles redirect, but double-check here)
if not is_authenticated(request):
return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302)

# Get stats
all_posts = list_posts(page=1, per_page=1000, status=None, include_content=False)
published_posts = list_posts(page=1, per_page=1000, status="published", include_content=False)

# Calculate stats
total = all_posts.total
published = published_posts.total
drafts = total - published

# Get new this week
one_week_ago = datetime.now() - timedelta(days=7)
new_this_week = sum(
    1 for p in all_posts.posts 
    if hasattr(p, 'created_at') and p.created_at and p.created_at > one_week_ago
)

stats = {
    "total_posts": total,
    "published": published,
    "drafts": drafts,
    "new_this_week": new_this_week,
    "published_change": 0,
}

# Get recent posts (last 5) - need full PostDetail for updated_at and status
recent_posts = []
for post_summary in all_posts.posts[:10]:  # Get first 10 then sort
    try:
        full_post = get_post(post_summary.slug, require_published=False)
        recent_posts.append(full_post)
    except PostNotFoundError:
        pass

recent_posts = sorted(recent_posts, key=lambda p: p.updated_at, reverse=True)[:5]

user_info = _get_user_info(request)

return render_template(
    "admin_dashboard.html.j2",
    {
        "request": request,
        "stats": stats,
        "recent_posts": recent_posts,
        "draft_count": drafts,
        "published_count": published,
        "total_count": total,
        "last_deploy": None,
        "deploy_status": None,
        **user_info,
    }
)

def get_post_counts() -> dict:
"""Get draft, published, and total post counts for sidebar."""
all_posts = list_posts(page=1, per_page=1000, status=None, include_content=False)
published_posts = list_posts(page=1, per_page=1000, status="published", include_content=False)
return {
"total_count": all_posts.total,
"published_count": published_posts.total,
"draft_count": all_posts.total - published_posts.total,
}

@admin_router.get("/posts", response_class=HTMLResponse)
def admin_post_list(
request: Request,
page: int = Query(1, ge=1),
status: Optional[str] = Query(None, description="Filter by status"),
):
"""Admin post list with filtering."""
if not is_authenticated(request):
return RedirectResponse(url="/login?redirect=/admin/blog/posts", status_code=302)

# Determine status filter
status_filter = None if status == "all" else status

# Get posts with full details for template
result = list_posts(
    page=page,
    per_page=20,
    status=status_filter,
    include_content=True  # Need full PostDetail for updated_at
)

counts = get_post_counts()
user_info = _get_user_info(request)

context = {
    "request": request,
    "posts": result.posts,
    "current_status": status,
    "current_page": page,
    "total_pages": result.total_pages,
    **counts,
    **user_info,
}

# If HTMX request, return just the content partial
if request.headers.get("HX-Request") == "true":
    return render_template("components/post_list_content.html.j2", context)

return render_template("admin_post_list.html.j2", context)

@admin_router.get("/posts/new", response_class=HTMLResponse)
def admin_new_post(request: Request):
"""New post editor page."""
if not is_authenticated(request):
return RedirectResponse(url="/login?redirect=/admin/blog/posts/new", status_code=302)

counts = get_post_counts()
user_info = _get_user_info(request)

return render_template(
    "admin_editor.html.j2",
    {
        "request": request,
        "post": None,
        **counts,
        **user_info,
    }
)

@admin_router.get("/posts/{slug}/edit", response_class=HTMLResponse)
def admin_edit_post(request: Request, slug: str):
"""Edit post page."""
if not is_authenticated(request):
return RedirectResponse(url=f"/login?redirect=/admin/blog/posts/{slug}/edit", status_code=302)

try:
    post = get_post(slug, require_published=False)
except PostNotFoundError:
    raise HTTPException(status_code=404, detail="Post not found")

counts = get_post_counts()
user_info = _get_user_info(request)

return render_template(
    "admin_editor.html.j2",
    {
        "request": request,
        "post": post,
        **counts,
        **user_info,
    }
)

@admin_router.get("/images", response_class=HTMLResponse)
def admin_images(
request: Request,
post: Optional[str] = Query(None, description="Filter by post slug")
):
"""Image browser page."""
if not is_authenticated(request):
return RedirectResponse(url="/login?redirect=/admin/blog/images", status_code=302)

# Get all posts for the dropdown
all_posts = list_posts(page=1, per_page=1000, status=None, include_content=False)

# Scan for images in post directories
images = []

for post_summary in all_posts.posts:
    if post and post_summary.slug != post:
        continue

    post_dir = POSTS_DIR / post_summary.slug / "images"
    if post_dir.exists():
        for img_path in post_dir.iterdir():
            if img_path.is_file():
                stat = img_path.stat()
                images.append({
                    "name": img_path.name,
                    "url": f"/blog/{post_summary.slug}/images/{img_path.name}",
                    "thumb_url": f"/blog/{post_summary.slug}/images/{img_path.name}",
                    "size": stat.st_size,
                    "post_slug": post_summary.slug,
                })

counts = get_post_counts()
user_info = _get_user_info(request)

return render_template(
    "admin_images.html.j2",
    {
        "request": request,
        "images": images,
        "posts": all_posts.posts,
        "current_post": post,
        **counts,
        **user_info,
    }
)

@admin_router.post("/posts/preview", response_class=HTMLResponse)
async def admin_preview(request: Request):
"""Preview markdown content (HTMX endpoint)."""
if not is_authenticated(request):
return RedirectResponse(url="/login?redirect=/admin/blog/", status_code=302)

from urllib.parse import unquote_plus

# Get content from request body
body = await request.body()
content = body.decode('utf-8') if body else ""

# Strip 'content=' prefix if present (form encoded)
if content.startswith('content='):
    content = content[8:]  # Remove 'content='

# URL decode
content = unquote_plus(content)

# Render markdown
result = preview_markdown(content)

return HTMLResponse(content=result.content_html)

def setup_admin_static(app):
"""Set up static file serving for admin assets."""
# Ensure CSS files are linked
css_sources = [
(Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/admin.css"), "admin.css"),
(Path("/home/hoffmann_admin/.openclaw/shared/project-docs/blog/static/pipeline.css"), "pipeline.css"),
]
for src, dest_name in css_sources:
dest = STATIC_DIR / dest_name
if src.exists() and not dest.exists():
import shutil
shutil.copy(src, dest)

# Mount static files
app.mount("/blog/admin/static", StaticFiles(directory=str(STATIC_DIR)), name="admin_static")