📄 service.py 20,812 bytes Apr 21, 2026 📋 Raw

"""Business logic layer for blog module."""

import sqlite3
import markdown
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

from .storage import (
get_db, POSTS_DIR, parse_frontmatter, write_frontmatter,
slugify, calculate_reading_time, count_words,
PostNotFoundError, DuplicateSlugError
)
from .models import (
PostSummary, PostDetail, PostListResponse, CategoryListResponse,
TagListResponse, SearchResponse, SearchResult,
PreviewResponse, CreatePostRequest, UpdatePostRequest
)

Markdown converter

md = markdown.Markdown(extensions=['extra', 'codehilite', 'toc'])

def _row_to_summary(row: dict) -> PostSummary:
"""Convert a database row to PostSummary."""
# Get tags for the post
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT t.name FROM tags t
JOIN post_tags pt ON t.id = pt.tag_id
WHERE pt.post_id = ?
ORDER BY t.name
""", (row['id'],))
tags = [r['name'] for r in cursor.fetchall()]
finally:
conn.close()

return PostSummary(
    slug=row['slug'],
    title=row['title'],
    category=row['category'],
    author=row['author'],
    status=row['status'],
    published_at=row['published_at'] and datetime.fromisoformat(row['published_at']),
    excerpt=row['excerpt'] or '',
    cover_image=row['cover_image'],
    reading_time=row['reading_time'] or 1,
    featured=bool(row['featured']),
    tags=tags
)

def _row_to_detail(row: dict, content_md: str) -> PostDetail:
"""Convert a database row to PostDetail."""
summary = _row_to_summary(row)
content_html = md.convert(content_md)
md.reset()

return PostDetail(
    **summary.dict(),
    content_html=content_html,
    content_md=content_md,
    created_at=datetime.fromisoformat(row['created_at']),
    updated_at=datetime.fromisoformat(row['updated_at'])
)

def _upsert_tags(conn: sqlite3.Connection, tag_names: list[str]) -> list[int]:
"""Insert tags if they don't exist and return their IDs."""
cursor = conn.cursor()
tag_ids = []
for name in tag_names:
name = name.strip().lower()
if not name:
continue
cursor.execute(
"INSERT OR IGNORE INTO tags (name) VALUES (?)",
(name,)
)
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
tag_id = cursor.fetchone()['id']
tag_ids.append(tag_id)
return tag_ids

def list_posts(
page: int = 1,
per_page: int = 10,
category: Optional[str] = None,
tag: Optional[str] = None,
featured: bool = False,
status: Optional[str] = "published",
include_content: bool = False
) -> PostListResponse:
"""List blog posts with pagination and filtering."""
conn = get_db()
try:
cursor = conn.cursor()

    # Build query
    where_clauses = []
    params = []

    if status:
        where_clauses.append("p.status = ?")
        params.append(status)

    if category:
        where_clauses.append("p.category = ?")
        params.append(category)

    if featured:
        where_clauses.append("p.featured = 1")

    if tag:
        where_clauses.append("EXISTS (SELECT 1 FROM post_tags pt JOIN tags t ON pt.tag_id = t.id WHERE pt.post_id = p.id AND t.name = ?)")
        params.append(tag.lower())

    where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""

    # Count total
    count_sql = f"SELECT COUNT(*) as count FROM posts p {where_sql}"
    cursor.execute(count_sql, params)
    total = cursor.fetchone()['count']

    # Get posts
    query = f"""
        SELECT p.* FROM posts p
        {where_sql}
        ORDER BY p.published_at DESC, p.created_at DESC
        LIMIT ? OFFSET ?
    """
    cursor.execute(query, params + [per_page, (page - 1) * per_page])
    rows = cursor.fetchall()

    posts = []
    for row in rows:
        if include_content:
            post_dir = POSTS_DIR / row['slug']
            md_path = post_dir / "index.md"
            content_md = md_path.read_text() if md_path.exists() else ""
            fm, body = parse_frontmatter(content_md)
            posts.append(_row_to_detail(row, body))
        else:
            posts.append(_row_to_summary(row))

    total_pages = (total + per_page - 1) // per_page

    return PostListResponse(
        posts=posts,
        total=total,
        page=page,
        per_page=per_page,
        total_pages=total_pages
    )
finally:
    conn.close()

def get_post(slug: str, require_published: bool = True) -> PostDetail:
"""Get a single post by slug."""
conn = get_db()
try:
cursor = conn.cursor()

    if require_published:
        cursor.execute(
            "SELECT * FROM posts WHERE slug = ? AND status = 'published'",
            (slug,)
        )
    else:
        cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,))

    row = cursor.fetchone()
    if not row:
        raise PostNotFoundError(f"Post not found: {slug}")

    post_dir = POSTS_DIR / slug
    md_path = post_dir / "index.md"
    content_md = md_path.read_text() if md_path.exists() else ""
    fm, body = parse_frontmatter(content_md)

    return _row_to_detail(row, body)
finally:
    conn.close()

def get_related_posts(slug: str, limit: int = 3) -> PostListResponse:
"""Get related posts based on shared categories and tags.

Algorithm:
1. Posts in the same category score +2
2. Each shared tag scores +1
3. Sort by score (desc), then by published_at (desc)
4. Return top N posts
"""
conn = get_db()
try:
    cursor = conn.cursor()

    # Get the source post's category and tags
    cursor.execute(
        "SELECT category FROM posts WHERE slug = ? AND status = 'published'",
        (slug,)
    )
    source_row = cursor.fetchone()
    if not source_row:
        return PostListResponse(posts=[], total=0, page=1, per_page=limit, total_pages=0)

    source_category = source_row['category']

    cursor.execute("""
        SELECT t.name FROM tags t
        JOIN post_tags pt ON t.id = pt.tag_id
        JOIN posts p ON pt.post_id = p.id
        WHERE p.slug = ? AND p.status = 'published'
    """, (slug,))
    source_tags = {r['name'] for r in cursor.fetchall()}

    # Find related posts with scoring
    cursor.execute("""
        SELECT 
            p.*,
            GROUP_CONCAT(t.name) as post_tags
        FROM posts p
        LEFT JOIN post_tags pt ON p.id = pt.tag_id
        LEFT JOIN tags t ON pt.tag_id = t.id
        WHERE p.status = 'published' AND p.slug != ?
        GROUP BY p.id
        ORDER BY p.published_at DESC
    """, (slug,))

    scored_posts = []
    for row in cursor.fetchall():
        score = 0

        # Category match: +2 points
        if row['category'] == source_category:
            score += 2

        # Tag matches: +1 point each
        post_tags = set(row['post_tags'].split(',')) if row['post_tags'] else set()
        shared_tags = post_tags & source_tags
        score += len(shared_tags)

        if score > 0:
            scored_posts.append((score, row))

    # Sort by score descending, then by published_at descending
    scored_posts.sort(key=lambda x: (-x[0], x[1]['published_at'] or ''), reverse=False)

    # Take top N
    top_posts = [p for _, p in scored_posts[:limit]]

    posts = [_row_to_summary(row) for row in top_posts]

    return PostListResponse(
        posts=posts,
        total=len(posts),
        page=1,
        per_page=limit,
        total_pages=1
    )
finally:
    conn.close()

def search_posts(query: str, limit: int = 20) -> SearchResponse:
"""Search posts using FTS5.

Searches in title, excerpt, and content.
Returns results ranked by relevance.
"""
if not query or not query.strip():
    return SearchResponse(query="", results=[], total=0)

# Escape FTS5 special characters and prepare query
# FTS5 treats "-" as NOT operator, we need to quote or escape
# Simple approach: wrap the whole query in quotes for phrase search
# or escape special characters
escaped_query = query.replace('"', '""')  # Escape quotes
escaped_query = f'"{escaped_query}"'  # Wrap in quotes for exact phrase

conn = get_db()
try:
    cursor = conn.cursor()

    # Use FTS5 to search
    # bm25() returns lower values for better matches, so we negate for ranking
    search_sql = """
        SELECT 
            p.slug,
            p.title,
            p.excerpt,
            p.category,
            p.published_at,
            rank
        FROM posts_fts fts
        JOIN posts p ON fts.rowid = p.id
        WHERE posts_fts MATCH ? AND p.status = 'published'
        ORDER BY rank
        LIMIT ?
    """

    cursor.execute(search_sql, (escaped_query, limit))
    rows = cursor.fetchall()

    results = []
    for row in rows:
        # bm25 returns negative values (lower is better), invert for display
        rank = -row['rank'] if row['rank'] else 0.0

        results.append(SearchResult(
            slug=row['slug'],
            title=row['title'],
            excerpt=row['excerpt'] or '',
            category=row['category'],
            published_at=row['published_at'] and datetime.fromisoformat(row['published_at']),
            rank=round(rank, 4)
        ))

    return SearchResponse(
        query=query,
        results=results,
        total=len(results)
    )
finally:
    conn.close()

def list_categories() -> CategoryListResponse:
"""List all categories with post counts."""
conn = get_db()
try:
cursor = conn.cursor()

    cursor.execute("""
        SELECT 
            category as slug,
            category as name,
            COUNT(*) as post_count
        FROM posts
        WHERE status = 'published'
        GROUP BY category
        ORDER BY post_count DESC
    """)

    rows = cursor.fetchall()
    categories = [
        {
            "slug": row['slug'],
            "name": row['name'].title(),
            "description": None,
            "post_count": row['post_count']
        }
        for row in rows
    ]

    return CategoryListResponse(categories=categories)
finally:
    conn.close()

def list_tags() -> TagListResponse:
"""List all tags with post counts."""
conn = get_db()
try:
cursor = conn.cursor()

    cursor.execute("""
        SELECT 
            t.name,
            COUNT(DISTINCT pt.post_id) as post_count
        FROM tags t
        JOIN post_tags pt ON t.id = pt.tag_id
        JOIN posts p ON pt.post_id = p.id
        WHERE p.status = 'published'
        GROUP BY t.id
        ORDER BY post_count DESC
    """)

    rows = cursor.fetchall()
    tags = [{"name": row['name'], "post_count": row['post_count']} for row in rows]

    return TagListResponse(tags=tags)
finally:
    conn.close()

def create_post(data: CreatePostRequest) -> PostDetail:
"""Create a new blog post."""
conn = get_db()
try:
cursor = conn.cursor()

    # Generate slug if not provided
    slug = data.slug or slugify(data.title)

    # Check for duplicate slug
    cursor.execute("SELECT id FROM posts WHERE slug = ?", (slug,))
    if cursor.fetchone():
        raise DuplicateSlugError(f"Slug already exists: {slug}")

    # Create post directory
    post_dir = POSTS_DIR / slug
    post_dir.mkdir(parents=True, exist_ok=True)

    # Write Markdown file
    frontmatter = {
        "title": data.title,
        "slug": slug,
        "category": data.category,
        "tags": data.tags,
        "author": data.author,
        "status": data.status,
        "featured": data.featured,
    }

    if data.excerpt:
        frontmatter["excerpt"] = data.excerpt
    if data.cover_image:
        frontmatter["cover_image"] = data.cover_image
    if data.status == "published":
        frontmatter["published_at"] = datetime.now(timezone.utc).isoformat()

    content_path = str(post_dir / "index.md")
    md_content = write_frontmatter(frontmatter, data.content_md)
    Path(content_path).write_text(md_content)

    # Calculate reading time
    word_count = count_words(data.content_md)
    reading_time = calculate_reading_time(word_count)

    # Insert into database
    now = datetime.now(timezone.utc).isoformat()
    published_at = frontmatter.get("published_at")

    cursor.execute("""
        INSERT INTO posts (
            slug, title, category, author, status, excerpt, cover_image,
            featured, published_at, created_at, updated_at, content_path,
            word_count, reading_time
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        slug, data.title, data.category, data.author, data.status,
        data.excerpt, data.cover_image, int(data.featured), published_at,
        now, now, content_path, word_count, reading_time
    ))

    post_id = cursor.lastrowid

    # Insert tags
    if data.tags:
        tag_ids = _upsert_tags(conn, data.tags)
        for tag_id in tag_ids:
            cursor.execute(
                "INSERT INTO post_tags (post_id, tag_id) VALUES (?, ?)",
                (post_id, tag_id)
            )

    conn.commit()

    return get_post(slug, require_published=False)
finally:
    conn.close()

def update_post(slug: str, data: UpdatePostRequest) -> PostDetail:
"""Update an existing blog post."""
conn = get_db()
try:
cursor = conn.cursor()

    # Get existing post
    cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,))
    row = cursor.fetchone()
    if not row:
        raise PostNotFoundError(f"Post not found: {slug}")

    post_id = row['id']
    post_dir = POSTS_DIR / slug
    md_path = post_dir / "index.md"

    # Read existing content
    content_md = md_path.read_text() if md_path.exists() else ""
    fm, body = parse_frontmatter(content_md)

    # Update frontmatter with new values
    if data.title is not None:
        fm['title'] = data.title
    if data.category is not None:
        fm['category'] = data.category
    if data.author is not None:
        fm['author'] = data.author
    if data.excerpt is not None:
        fm['excerpt'] = data.excerpt
    if data.cover_image is not None:
        fm['cover_image'] = data.cover_image
    if data.featured is not None:
        fm['featured'] = data.featured

    # Update content if provided
    if data.content_md is not None:
        body = data.content_md

    # Write updated file
    md_content = write_frontmatter(fm, body)
    md_path.write_text(md_content)

    # Update database
    updates = []
    params = []

    if data.title is not None:
        updates.append("title = ?")
        params.append(data.title)
    if data.category is not None:
        updates.append("category = ?")
        params.append(data.category)
    if data.author is not None:
        updates.append("author = ?")
        params.append(data.author)
    if data.excerpt is not None:
        updates.append("excerpt = ?")
        params.append(data.excerpt)
    if data.cover_image is not None:
        updates.append("cover_image = ?")
        params.append(data.cover_image)
    if data.featured is not None:
        updates.append("featured = ?")
        params.append(int(data.featured))

    # Recalculate reading time if content changed
    if data.content_md is not None:
        word_count = count_words(data.content_md)
        reading_time = calculate_reading_time(word_count)
        updates.append("word_count = ?")
        params.append(word_count)
        updates.append("reading_time = ?")
        params.append(reading_time)

    # Always update updated_at
    updates.append("updated_at = ?")
    params.append(datetime.now(timezone.utc).isoformat())

    params.append(slug)

    cursor.execute(
        f"UPDATE posts SET {', '.join(updates)} WHERE slug = ?",
        params
    )

    # Update tags
    if data.tags is not None:
        cursor.execute("DELETE FROM post_tags WHERE post_id = ?", (post_id,))
        if data.tags:
            tag_ids = _upsert_tags(conn, data.tags)
            for tag_id in tag_ids:
                cursor.execute(
                    "INSERT INTO post_tags (post_id, tag_id) VALUES (?, ?)",
                    (post_id, tag_id)
                )

    conn.commit()

    return get_post(slug, require_published=False)
finally:
    conn.close()

def publish_post(slug: str, publish: bool = True) -> PostDetail:
"""Publish or unpublish a post."""
conn = get_db()
try:
cursor = conn.cursor()

    cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,))
    row = cursor.fetchone()
    if not row:
        raise PostNotFoundError(f"Post not found: {slug}")

    post_id = row['id']
    new_status = "published" if publish else "draft"

    if row['status'] == new_status:
        status_name = "published" if publish else "draft"
        raise ValueError(f"Post is already {status_name}")

    # Update Markdown file
    post_dir = POSTS_DIR / slug
    md_path = post_dir / "index.md"
    content_md = md_path.read_text() if md_path.exists() else ""
    fm, body = parse_frontmatter(content_md)

    fm['status'] = new_status
    if publish:
        published_at = datetime.now(timezone.utc).isoformat()
        fm['published_at'] = published_at
    else:
        fm['published_at'] = None
        published_at = None

    md_content = write_frontmatter(fm, body)
    md_path.write_text(md_content)

    # Update database
    cursor.execute(
        "UPDATE posts SET status = ?, published_at = ?, updated_at = ? WHERE id = ?",
        (new_status, published_at, datetime.now(timezone.utc).isoformat(), post_id)
    )

    conn.commit()

    return get_post(slug, require_published=False)
finally:
    conn.close()

def delete_post(slug: str) -> dict:
"""Soft-delete a post by archiving it."""
conn = get_db()
try:
cursor = conn.cursor()

    cursor.execute("SELECT id FROM posts WHERE slug = ?", (slug,))
    row = cursor.fetchone()
    if not row:
        raise PostNotFoundError(f"Post not found: {slug}")

    # Update to archived status
    cursor.execute(
        "UPDATE posts SET status = 'archived', updated_at = ? WHERE slug = ?",
        (datetime.now(timezone.utc).isoformat(), slug)
    )

    conn.commit()

    return {"message": "Post archived successfully", "slug": slug}
finally:
    conn.close()

def preview_markdown(content_md: str) -> PreviewResponse:
"""Render Markdown to HTML for preview."""
content_html = md.convert(content_md)
md.reset()
return PreviewResponse(content_html=content_html)