"""Business logic layer for blog module.""" import sqlite3 import markdown from datetime import datetime, timezone from pathlib import Path from typing import Optional from .storage import ( get_db, POSTS_DIR, parse_frontmatter, write_frontmatter, slugify, calculate_reading_time, count_words, PostNotFoundError, DuplicateSlugError ) from .models import ( PostSummary, PostDetail, PostListResponse, CategoryListResponse, TagListResponse, SearchResponse, SearchResult, PreviewResponse, CreatePostRequest, UpdatePostRequest ) # Markdown converter md = markdown.Markdown(extensions=['extra', 'codehilite', 'toc']) def _row_to_summary(row: dict) -> PostSummary: """Convert a database row to PostSummary.""" # Get tags for the post conn = get_db() try: cursor = conn.cursor() cursor.execute(""" SELECT t.name FROM tags t JOIN post_tags pt ON t.id = pt.tag_id WHERE pt.post_id = ? ORDER BY t.name """, (row['id'],)) tags = [r['name'] for r in cursor.fetchall()] finally: conn.close() return PostSummary( slug=row['slug'], title=row['title'], category=row['category'], author=row['author'], status=row['status'], published_at=row['published_at'] and datetime.fromisoformat(row['published_at']), excerpt=row['excerpt'] or '', cover_image=row['cover_image'], reading_time=row['reading_time'] or 1, featured=bool(row['featured']), tags=tags ) def _row_to_detail(row: dict, content_md: str) -> PostDetail: """Convert a database row to PostDetail.""" summary = _row_to_summary(row) content_html = md.convert(content_md) md.reset() return PostDetail( **summary.dict(), content_html=content_html, content_md=content_md, created_at=datetime.fromisoformat(row['created_at']), updated_at=datetime.fromisoformat(row['updated_at']) ) def _upsert_tags(conn: sqlite3.Connection, tag_names: list[str]) -> list[int]: """Insert tags if they don't exist and return their IDs.""" cursor = conn.cursor() tag_ids = [] for name in tag_names: name = name.strip().lower() if not name: continue cursor.execute( "INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,) ) cursor.execute("SELECT id FROM tags WHERE name = ?", (name,)) tag_id = cursor.fetchone()['id'] tag_ids.append(tag_id) return tag_ids def list_posts( page: int = 1, per_page: int = 10, category: Optional[str] = None, tag: Optional[str] = None, featured: bool = False, status: Optional[str] = "published", include_content: bool = False ) -> PostListResponse: """List blog posts with pagination and filtering.""" conn = get_db() try: cursor = conn.cursor() # Build query where_clauses = [] params = [] if status: where_clauses.append("p.status = ?") params.append(status) if category: where_clauses.append("p.category = ?") params.append(category) if featured: where_clauses.append("p.featured = 1") if tag: where_clauses.append("EXISTS (SELECT 1 FROM post_tags pt JOIN tags t ON pt.tag_id = t.id WHERE pt.post_id = p.id AND t.name = ?)") params.append(tag.lower()) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Count total count_sql = f"SELECT COUNT(*) as count FROM posts p {where_sql}" cursor.execute(count_sql, params) total = cursor.fetchone()['count'] # Get posts query = f""" SELECT p.* FROM posts p {where_sql} ORDER BY p.published_at DESC, p.created_at DESC LIMIT ? OFFSET ? """ cursor.execute(query, params + [per_page, (page - 1) * per_page]) rows = cursor.fetchall() posts = [] for row in rows: if include_content: post_dir = POSTS_DIR / row['slug'] md_path = post_dir / "index.md" content_md = md_path.read_text() if md_path.exists() else "" fm, body = parse_frontmatter(content_md) posts.append(_row_to_detail(row, body)) else: posts.append(_row_to_summary(row)) total_pages = (total + per_page - 1) // per_page return PostListResponse( posts=posts, total=total, page=page, per_page=per_page, total_pages=total_pages ) finally: conn.close() def get_post(slug: str, require_published: bool = True) -> PostDetail: """Get a single post by slug.""" conn = get_db() try: cursor = conn.cursor() if require_published: cursor.execute( "SELECT * FROM posts WHERE slug = ? AND status = 'published'", (slug,) ) else: cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,)) row = cursor.fetchone() if not row: raise PostNotFoundError(f"Post not found: {slug}") post_dir = POSTS_DIR / slug md_path = post_dir / "index.md" content_md = md_path.read_text() if md_path.exists() else "" fm, body = parse_frontmatter(content_md) return _row_to_detail(row, body) finally: conn.close() def get_related_posts(slug: str, limit: int = 3) -> PostListResponse: """Get related posts based on shared categories and tags. Algorithm: 1. Posts in the same category score +2 2. Each shared tag scores +1 3. Sort by score (desc), then by published_at (desc) 4. Return top N posts """ conn = get_db() try: cursor = conn.cursor() # Get the source post's category and tags cursor.execute( "SELECT category FROM posts WHERE slug = ? AND status = 'published'", (slug,) ) source_row = cursor.fetchone() if not source_row: return PostListResponse(posts=[], total=0, page=1, per_page=limit, total_pages=0) source_category = source_row['category'] cursor.execute(""" SELECT t.name FROM tags t JOIN post_tags pt ON t.id = pt.tag_id JOIN posts p ON pt.post_id = p.id WHERE p.slug = ? AND p.status = 'published' """, (slug,)) source_tags = {r['name'] for r in cursor.fetchall()} # Find related posts with scoring cursor.execute(""" SELECT p.*, GROUP_CONCAT(t.name) as post_tags FROM posts p LEFT JOIN post_tags pt ON p.id = pt.tag_id LEFT JOIN tags t ON pt.tag_id = t.id WHERE p.status = 'published' AND p.slug != ? GROUP BY p.id ORDER BY p.published_at DESC """, (slug,)) scored_posts = [] for row in cursor.fetchall(): score = 0 # Category match: +2 points if row['category'] == source_category: score += 2 # Tag matches: +1 point each post_tags = set(row['post_tags'].split(',')) if row['post_tags'] else set() shared_tags = post_tags & source_tags score += len(shared_tags) if score > 0: scored_posts.append((score, row)) # Sort by score descending, then by published_at descending scored_posts.sort(key=lambda x: (-x[0], x[1]['published_at'] or ''), reverse=False) # Take top N top_posts = [p for _, p in scored_posts[:limit]] posts = [_row_to_summary(row) for row in top_posts] return PostListResponse( posts=posts, total=len(posts), page=1, per_page=limit, total_pages=1 ) finally: conn.close() def search_posts(query: str, limit: int = 20) -> SearchResponse: """Search posts using FTS5. Searches in title, excerpt, and content. Returns results ranked by relevance. """ if not query or not query.strip(): return SearchResponse(query="", results=[], total=0) # Escape FTS5 special characters and prepare query # FTS5 treats "-" as NOT operator, we need to quote or escape # Simple approach: wrap the whole query in quotes for phrase search # or escape special characters escaped_query = query.replace('"', '""') # Escape quotes escaped_query = f'"{escaped_query}"' # Wrap in quotes for exact phrase conn = get_db() try: cursor = conn.cursor() # Use FTS5 to search # bm25() returns lower values for better matches, so we negate for ranking search_sql = """ SELECT p.slug, p.title, p.excerpt, p.category, p.published_at, rank FROM posts_fts fts JOIN posts p ON fts.rowid = p.id WHERE posts_fts MATCH ? AND p.status = 'published' ORDER BY rank LIMIT ? """ cursor.execute(search_sql, (escaped_query, limit)) rows = cursor.fetchall() results = [] for row in rows: # bm25 returns negative values (lower is better), invert for display rank = -row['rank'] if row['rank'] else 0.0 results.append(SearchResult( slug=row['slug'], title=row['title'], excerpt=row['excerpt'] or '', category=row['category'], published_at=row['published_at'] and datetime.fromisoformat(row['published_at']), rank=round(rank, 4) )) return SearchResponse( query=query, results=results, total=len(results) ) finally: conn.close() def list_categories() -> CategoryListResponse: """List all categories with post counts.""" conn = get_db() try: cursor = conn.cursor() cursor.execute(""" SELECT category as slug, category as name, COUNT(*) as post_count FROM posts WHERE status = 'published' GROUP BY category ORDER BY post_count DESC """) rows = cursor.fetchall() categories = [ { "slug": row['slug'], "name": row['name'].title(), "description": None, "post_count": row['post_count'] } for row in rows ] return CategoryListResponse(categories=categories) finally: conn.close() def list_tags() -> TagListResponse: """List all tags with post counts.""" conn = get_db() try: cursor = conn.cursor() cursor.execute(""" SELECT t.name, COUNT(DISTINCT pt.post_id) as post_count FROM tags t JOIN post_tags pt ON t.id = pt.tag_id JOIN posts p ON pt.post_id = p.id WHERE p.status = 'published' GROUP BY t.id ORDER BY post_count DESC """) rows = cursor.fetchall() tags = [{"name": row['name'], "post_count": row['post_count']} for row in rows] return TagListResponse(tags=tags) finally: conn.close() def create_post(data: CreatePostRequest) -> PostDetail: """Create a new blog post.""" conn = get_db() try: cursor = conn.cursor() # Generate slug if not provided slug = data.slug or slugify(data.title) # Check for duplicate slug cursor.execute("SELECT id FROM posts WHERE slug = ?", (slug,)) if cursor.fetchone(): raise DuplicateSlugError(f"Slug already exists: {slug}") # Create post directory post_dir = POSTS_DIR / slug post_dir.mkdir(parents=True, exist_ok=True) # Write Markdown file frontmatter = { "title": data.title, "slug": slug, "category": data.category, "tags": data.tags, "author": data.author, "status": data.status, "featured": data.featured, } if data.excerpt: frontmatter["excerpt"] = data.excerpt if data.cover_image: frontmatter["cover_image"] = data.cover_image if data.status == "published": frontmatter["published_at"] = datetime.now(timezone.utc).isoformat() content_path = str(post_dir / "index.md") md_content = write_frontmatter(frontmatter, data.content_md) Path(content_path).write_text(md_content) # Calculate reading time word_count = count_words(data.content_md) reading_time = calculate_reading_time(word_count) # Insert into database now = datetime.now(timezone.utc).isoformat() published_at = frontmatter.get("published_at") cursor.execute(""" INSERT INTO posts ( slug, title, category, author, status, excerpt, cover_image, featured, published_at, created_at, updated_at, content_path, word_count, reading_time ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( slug, data.title, data.category, data.author, data.status, data.excerpt, data.cover_image, int(data.featured), published_at, now, now, content_path, word_count, reading_time )) post_id = cursor.lastrowid # Insert tags if data.tags: tag_ids = _upsert_tags(conn, data.tags) for tag_id in tag_ids: cursor.execute( "INSERT INTO post_tags (post_id, tag_id) VALUES (?, ?)", (post_id, tag_id) ) conn.commit() return get_post(slug, require_published=False) finally: conn.close() def update_post(slug: str, data: UpdatePostRequest) -> PostDetail: """Update an existing blog post.""" conn = get_db() try: cursor = conn.cursor() # Get existing post cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,)) row = cursor.fetchone() if not row: raise PostNotFoundError(f"Post not found: {slug}") post_id = row['id'] post_dir = POSTS_DIR / slug md_path = post_dir / "index.md" # Read existing content content_md = md_path.read_text() if md_path.exists() else "" fm, body = parse_frontmatter(content_md) # Update frontmatter with new values if data.title is not None: fm['title'] = data.title if data.category is not None: fm['category'] = data.category if data.author is not None: fm['author'] = data.author if data.excerpt is not None: fm['excerpt'] = data.excerpt if data.cover_image is not None: fm['cover_image'] = data.cover_image if data.featured is not None: fm['featured'] = data.featured # Update content if provided if data.content_md is not None: body = data.content_md # Write updated file md_content = write_frontmatter(fm, body) md_path.write_text(md_content) # Update database updates = [] params = [] if data.title is not None: updates.append("title = ?") params.append(data.title) if data.category is not None: updates.append("category = ?") params.append(data.category) if data.author is not None: updates.append("author = ?") params.append(data.author) if data.excerpt is not None: updates.append("excerpt = ?") params.append(data.excerpt) if data.cover_image is not None: updates.append("cover_image = ?") params.append(data.cover_image) if data.featured is not None: updates.append("featured = ?") params.append(int(data.featured)) # Recalculate reading time if content changed if data.content_md is not None: word_count = count_words(data.content_md) reading_time = calculate_reading_time(word_count) updates.append("word_count = ?") params.append(word_count) updates.append("reading_time = ?") params.append(reading_time) # Always update updated_at updates.append("updated_at = ?") params.append(datetime.now(timezone.utc).isoformat()) params.append(slug) cursor.execute( f"UPDATE posts SET {', '.join(updates)} WHERE slug = ?", params ) # Update tags if data.tags is not None: cursor.execute("DELETE FROM post_tags WHERE post_id = ?", (post_id,)) if data.tags: tag_ids = _upsert_tags(conn, data.tags) for tag_id in tag_ids: cursor.execute( "INSERT INTO post_tags (post_id, tag_id) VALUES (?, ?)", (post_id, tag_id) ) conn.commit() return get_post(slug, require_published=False) finally: conn.close() def publish_post(slug: str, publish: bool = True) -> PostDetail: """Publish or unpublish a post.""" conn = get_db() try: cursor = conn.cursor() cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,)) row = cursor.fetchone() if not row: raise PostNotFoundError(f"Post not found: {slug}") post_id = row['id'] new_status = "published" if publish else "draft" if row['status'] == new_status: status_name = "published" if publish else "draft" raise ValueError(f"Post is already {status_name}") # Update Markdown file post_dir = POSTS_DIR / slug md_path = post_dir / "index.md" content_md = md_path.read_text() if md_path.exists() else "" fm, body = parse_frontmatter(content_md) fm['status'] = new_status if publish: published_at = datetime.now(timezone.utc).isoformat() fm['published_at'] = published_at else: fm['published_at'] = None published_at = None md_content = write_frontmatter(fm, body) md_path.write_text(md_content) # Update database cursor.execute( "UPDATE posts SET status = ?, published_at = ?, updated_at = ? WHERE id = ?", (new_status, published_at, datetime.now(timezone.utc).isoformat(), post_id) ) conn.commit() return get_post(slug, require_published=False) finally: conn.close() def delete_post(slug: str) -> dict: """Soft-delete a post by archiving it.""" conn = get_db() try: cursor = conn.cursor() cursor.execute("SELECT id FROM posts WHERE slug = ?", (slug,)) row = cursor.fetchone() if not row: raise PostNotFoundError(f"Post not found: {slug}") # Update to archived status cursor.execute( "UPDATE posts SET status = 'archived', updated_at = ? WHERE slug = ?", (datetime.now(timezone.utc).isoformat(), slug) ) conn.commit() return {"message": "Post archived successfully", "slug": slug} finally: conn.close() def preview_markdown(content_md: str) -> PreviewResponse: """Render Markdown to HTML for preview.""" content_html = md.convert(content_md) md.reset() return PreviewResponse(content_html=content_html)