"""Business logic layer for blog module."""
import sqlite3
import markdown
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from .storage import (
get_db, POSTS_DIR, parse_frontmatter, write_frontmatter,
slugify, calculate_reading_time, count_words,
PostNotFoundError, DuplicateSlugError
)
from .models import (
PostSummary, PostDetail, PostListResponse, CategoryListResponse,
TagListResponse, SearchResponse, SearchResult,
PreviewResponse, CreatePostRequest, UpdatePostRequest
)
Markdown converter
md = markdown.Markdown(extensions=['extra', 'codehilite', 'toc'])
def _row_to_summary(row: dict) -> PostSummary:
"""Convert a database row to PostSummary."""
# Get tags for the post
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT t.name FROM tags t
JOIN post_tags pt ON t.id = pt.tag_id
WHERE pt.post_id = ?
ORDER BY t.name
""", (row['id'],))
tags = [r['name'] for r in cursor.fetchall()]
finally:
conn.close()
return PostSummary(
slug=row['slug'],
title=row['title'],
category=row['category'],
author=row['author'],
status=row['status'],
published_at=row['published_at'] and datetime.fromisoformat(row['published_at']),
excerpt=row['excerpt'] or '',
cover_image=row['cover_image'],
reading_time=row['reading_time'] or 1,
featured=bool(row['featured']),
tags=tags
)
def _row_to_detail(row: dict, content_md: str) -> PostDetail:
"""Convert a database row to PostDetail."""
summary = _row_to_summary(row)
content_html = md.convert(content_md)
md.reset()
return PostDetail(
**summary.dict(),
content_html=content_html,
content_md=content_md,
created_at=datetime.fromisoformat(row['created_at']),
updated_at=datetime.fromisoformat(row['updated_at'])
)
def _upsert_tags(conn: sqlite3.Connection, tag_names: list[str]) -> list[int]:
"""Insert tags if they don't exist and return their IDs."""
cursor = conn.cursor()
tag_ids = []
for name in tag_names:
name = name.strip().lower()
if not name:
continue
cursor.execute(
"INSERT OR IGNORE INTO tags (name) VALUES (?)",
(name,)
)
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
tag_id = cursor.fetchone()['id']
tag_ids.append(tag_id)
return tag_ids
def list_posts(
page: int = 1,
per_page: int = 10,
category: Optional[str] = None,
tag: Optional[str] = None,
featured: bool = False,
status: Optional[str] = "published",
include_content: bool = False
) -> PostListResponse:
"""List blog posts with pagination and filtering."""
conn = get_db()
try:
cursor = conn.cursor()
# Build query
where_clauses = []
params = []
if status:
where_clauses.append("p.status = ?")
params.append(status)
if category:
where_clauses.append("p.category = ?")
params.append(category)
if featured:
where_clauses.append("p.featured = 1")
if tag:
where_clauses.append("EXISTS (SELECT 1 FROM post_tags pt JOIN tags t ON pt.tag_id = t.id WHERE pt.post_id = p.id AND t.name = ?)")
params.append(tag.lower())
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Count total
count_sql = f"SELECT COUNT(*) as count FROM posts p {where_sql}"
cursor.execute(count_sql, params)
total = cursor.fetchone()['count']
# Get posts
query = f"""
SELECT p.* FROM posts p
{where_sql}
ORDER BY p.published_at DESC, p.created_at DESC
LIMIT ? OFFSET ?
"""
cursor.execute(query, params + [per_page, (page - 1) * per_page])
rows = cursor.fetchall()
posts = []
for row in rows:
if include_content:
post_dir = POSTS_DIR / row['slug']
md_path = post_dir / "index.md"
content_md = md_path.read_text() if md_path.exists() else ""
fm, body = parse_frontmatter(content_md)
posts.append(_row_to_detail(row, body))
else:
posts.append(_row_to_summary(row))
total_pages = (total + per_page - 1) // per_page
return PostListResponse(
posts=posts,
total=total,
page=page,
per_page=per_page,
total_pages=total_pages
)
finally:
conn.close()
def get_post(slug: str, require_published: bool = True) -> PostDetail:
"""Get a single post by slug."""
conn = get_db()
try:
cursor = conn.cursor()
if require_published:
cursor.execute(
"SELECT * FROM posts WHERE slug = ? AND status = 'published'",
(slug,)
)
else:
cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,))
row = cursor.fetchone()
if not row:
raise PostNotFoundError(f"Post not found: {slug}")
post_dir = POSTS_DIR / slug
md_path = post_dir / "index.md"
content_md = md_path.read_text() if md_path.exists() else ""
fm, body = parse_frontmatter(content_md)
return _row_to_detail(row, body)
finally:
conn.close()
def get_related_posts(slug: str, limit: int = 3) -> PostListResponse:
"""Get related posts based on shared categories and tags.
Algorithm:
1. Posts in the same category score +2
2. Each shared tag scores +1
3. Sort by score (desc), then by published_at (desc)
4. Return top N posts
"""
conn = get_db()
try:
cursor = conn.cursor()
# Get the source post's category and tags
cursor.execute(
"SELECT category FROM posts WHERE slug = ? AND status = 'published'",
(slug,)
)
source_row = cursor.fetchone()
if not source_row:
return PostListResponse(posts=[], total=0, page=1, per_page=limit, total_pages=0)
source_category = source_row['category']
cursor.execute("""
SELECT t.name FROM tags t
JOIN post_tags pt ON t.id = pt.tag_id
JOIN posts p ON pt.post_id = p.id
WHERE p.slug = ? AND p.status = 'published'
""", (slug,))
source_tags = {r['name'] for r in cursor.fetchall()}
# Find related posts with scoring
cursor.execute("""
SELECT
p.*,
GROUP_CONCAT(t.name) as post_tags
FROM posts p
LEFT JOIN post_tags pt ON p.id = pt.tag_id
LEFT JOIN tags t ON pt.tag_id = t.id
WHERE p.status = 'published' AND p.slug != ?
GROUP BY p.id
ORDER BY p.published_at DESC
""", (slug,))
scored_posts = []
for row in cursor.fetchall():
score = 0
# Category match: +2 points
if row['category'] == source_category:
score += 2
# Tag matches: +1 point each
post_tags = set(row['post_tags'].split(',')) if row['post_tags'] else set()
shared_tags = post_tags & source_tags
score += len(shared_tags)
if score > 0:
scored_posts.append((score, row))
# Sort by score descending, then by published_at descending
scored_posts.sort(key=lambda x: (-x[0], x[1]['published_at'] or ''), reverse=False)
# Take top N
top_posts = [p for _, p in scored_posts[:limit]]
posts = [_row_to_summary(row) for row in top_posts]
return PostListResponse(
posts=posts,
total=len(posts),
page=1,
per_page=limit,
total_pages=1
)
finally:
conn.close()
def search_posts(query: str, limit: int = 20) -> SearchResponse:
"""Search posts using FTS5.
Searches in title, excerpt, and content.
Returns results ranked by relevance.
"""
if not query or not query.strip():
return SearchResponse(query="", results=[], total=0)
# Escape FTS5 special characters and prepare query
# FTS5 treats "-" as NOT operator, we need to quote or escape
# Simple approach: wrap the whole query in quotes for phrase search
# or escape special characters
escaped_query = query.replace('"', '""') # Escape quotes
escaped_query = f'"{escaped_query}"' # Wrap in quotes for exact phrase
conn = get_db()
try:
cursor = conn.cursor()
# Use FTS5 to search
# bm25() returns lower values for better matches, so we negate for ranking
search_sql = """
SELECT
p.slug,
p.title,
p.excerpt,
p.category,
p.published_at,
rank
FROM posts_fts fts
JOIN posts p ON fts.rowid = p.id
WHERE posts_fts MATCH ? AND p.status = 'published'
ORDER BY rank
LIMIT ?
"""
cursor.execute(search_sql, (escaped_query, limit))
rows = cursor.fetchall()
results = []
for row in rows:
# bm25 returns negative values (lower is better), invert for display
rank = -row['rank'] if row['rank'] else 0.0
results.append(SearchResult(
slug=row['slug'],
title=row['title'],
excerpt=row['excerpt'] or '',
category=row['category'],
published_at=row['published_at'] and datetime.fromisoformat(row['published_at']),
rank=round(rank, 4)
))
return SearchResponse(
query=query,
results=results,
total=len(results)
)
finally:
conn.close()
def list_categories() -> CategoryListResponse:
"""List all categories with post counts."""
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT
category as slug,
category as name,
COUNT(*) as post_count
FROM posts
WHERE status = 'published'
GROUP BY category
ORDER BY post_count DESC
""")
rows = cursor.fetchall()
categories = [
{
"slug": row['slug'],
"name": row['name'].title(),
"description": None,
"post_count": row['post_count']
}
for row in rows
]
return CategoryListResponse(categories=categories)
finally:
conn.close()
def list_tags() -> TagListResponse:
"""List all tags with post counts."""
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT
t.name,
COUNT(DISTINCT pt.post_id) as post_count
FROM tags t
JOIN post_tags pt ON t.id = pt.tag_id
JOIN posts p ON pt.post_id = p.id
WHERE p.status = 'published'
GROUP BY t.id
ORDER BY post_count DESC
""")
rows = cursor.fetchall()
tags = [{"name": row['name'], "post_count": row['post_count']} for row in rows]
return TagListResponse(tags=tags)
finally:
conn.close()
def create_post(data: CreatePostRequest) -> PostDetail:
"""Create a new blog post."""
conn = get_db()
try:
cursor = conn.cursor()
# Generate slug if not provided
slug = data.slug or slugify(data.title)
# Check for duplicate slug
cursor.execute("SELECT id FROM posts WHERE slug = ?", (slug,))
if cursor.fetchone():
raise DuplicateSlugError(f"Slug already exists: {slug}")
# Create post directory
post_dir = POSTS_DIR / slug
post_dir.mkdir(parents=True, exist_ok=True)
# Write Markdown file
frontmatter = {
"title": data.title,
"slug": slug,
"category": data.category,
"tags": data.tags,
"author": data.author,
"status": data.status,
"featured": data.featured,
}
if data.excerpt:
frontmatter["excerpt"] = data.excerpt
if data.cover_image:
frontmatter["cover_image"] = data.cover_image
if data.status == "published":
frontmatter["published_at"] = datetime.now(timezone.utc).isoformat()
content_path = str(post_dir / "index.md")
md_content = write_frontmatter(frontmatter, data.content_md)
Path(content_path).write_text(md_content)
# Calculate reading time
word_count = count_words(data.content_md)
reading_time = calculate_reading_time(word_count)
# Insert into database
now = datetime.now(timezone.utc).isoformat()
published_at = frontmatter.get("published_at")
cursor.execute("""
INSERT INTO posts (
slug, title, category, author, status, excerpt, cover_image,
featured, published_at, created_at, updated_at, content_path,
word_count, reading_time
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
slug, data.title, data.category, data.author, data.status,
data.excerpt, data.cover_image, int(data.featured), published_at,
now, now, content_path, word_count, reading_time
))
post_id = cursor.lastrowid
# Insert tags
if data.tags:
tag_ids = _upsert_tags(conn, data.tags)
for tag_id in tag_ids:
cursor.execute(
"INSERT INTO post_tags (post_id, tag_id) VALUES (?, ?)",
(post_id, tag_id)
)
conn.commit()
return get_post(slug, require_published=False)
finally:
conn.close()
def update_post(slug: str, data: UpdatePostRequest) -> PostDetail:
"""Update an existing blog post."""
conn = get_db()
try:
cursor = conn.cursor()
# Get existing post
cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,))
row = cursor.fetchone()
if not row:
raise PostNotFoundError(f"Post not found: {slug}")
post_id = row['id']
post_dir = POSTS_DIR / slug
md_path = post_dir / "index.md"
# Read existing content
content_md = md_path.read_text() if md_path.exists() else ""
fm, body = parse_frontmatter(content_md)
# Update frontmatter with new values
if data.title is not None:
fm['title'] = data.title
if data.category is not None:
fm['category'] = data.category
if data.author is not None:
fm['author'] = data.author
if data.excerpt is not None:
fm['excerpt'] = data.excerpt
if data.cover_image is not None:
fm['cover_image'] = data.cover_image
if data.featured is not None:
fm['featured'] = data.featured
# Update content if provided
if data.content_md is not None:
body = data.content_md
# Write updated file
md_content = write_frontmatter(fm, body)
md_path.write_text(md_content)
# Update database
updates = []
params = []
if data.title is not None:
updates.append("title = ?")
params.append(data.title)
if data.category is not None:
updates.append("category = ?")
params.append(data.category)
if data.author is not None:
updates.append("author = ?")
params.append(data.author)
if data.excerpt is not None:
updates.append("excerpt = ?")
params.append(data.excerpt)
if data.cover_image is not None:
updates.append("cover_image = ?")
params.append(data.cover_image)
if data.featured is not None:
updates.append("featured = ?")
params.append(int(data.featured))
# Recalculate reading time if content changed
if data.content_md is not None:
word_count = count_words(data.content_md)
reading_time = calculate_reading_time(word_count)
updates.append("word_count = ?")
params.append(word_count)
updates.append("reading_time = ?")
params.append(reading_time)
# Always update updated_at
updates.append("updated_at = ?")
params.append(datetime.now(timezone.utc).isoformat())
params.append(slug)
cursor.execute(
f"UPDATE posts SET {', '.join(updates)} WHERE slug = ?",
params
)
# Update tags
if data.tags is not None:
cursor.execute("DELETE FROM post_tags WHERE post_id = ?", (post_id,))
if data.tags:
tag_ids = _upsert_tags(conn, data.tags)
for tag_id in tag_ids:
cursor.execute(
"INSERT INTO post_tags (post_id, tag_id) VALUES (?, ?)",
(post_id, tag_id)
)
conn.commit()
return get_post(slug, require_published=False)
finally:
conn.close()
def publish_post(slug: str, publish: bool = True) -> PostDetail:
"""Publish or unpublish a post."""
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM posts WHERE slug = ?", (slug,))
row = cursor.fetchone()
if not row:
raise PostNotFoundError(f"Post not found: {slug}")
post_id = row['id']
new_status = "published" if publish else "draft"
if row['status'] == new_status:
status_name = "published" if publish else "draft"
raise ValueError(f"Post is already {status_name}")
# Update Markdown file
post_dir = POSTS_DIR / slug
md_path = post_dir / "index.md"
content_md = md_path.read_text() if md_path.exists() else ""
fm, body = parse_frontmatter(content_md)
fm['status'] = new_status
if publish:
published_at = datetime.now(timezone.utc).isoformat()
fm['published_at'] = published_at
else:
fm['published_at'] = None
published_at = None
md_content = write_frontmatter(fm, body)
md_path.write_text(md_content)
# Update database
cursor.execute(
"UPDATE posts SET status = ?, published_at = ?, updated_at = ? WHERE id = ?",
(new_status, published_at, datetime.now(timezone.utc).isoformat(), post_id)
)
conn.commit()
return get_post(slug, require_published=False)
finally:
conn.close()
def delete_post(slug: str) -> dict:
"""Soft-delete a post by archiving it."""
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute("SELECT id FROM posts WHERE slug = ?", (slug,))
row = cursor.fetchone()
if not row:
raise PostNotFoundError(f"Post not found: {slug}")
# Update to archived status
cursor.execute(
"UPDATE posts SET status = 'archived', updated_at = ? WHERE slug = ?",
(datetime.now(timezone.utc).isoformat(), slug)
)
conn.commit()
return {"message": "Post archived successfully", "slug": slug}
finally:
conn.close()
def preview_markdown(content_md: str) -> PreviewResponse:
"""Render Markdown to HTML for preview."""
content_html = md.convert(content_md)
md.reset()
return PreviewResponse(content_html=content_html)