📄 storage.py 7,268 bytes Apr 22, 2026 📋 Raw

"""Storage layer for blog module — SQLite + Markdown I/O."""

import os
import re
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

import yaml

Base paths

DATA_DIR = Path(os.environ.get("BLOG_DATA_DIR", "/home/hoffmann_admin/.openclaw/data/blog"))
POSTS_DIR = DATA_DIR / "posts"
DB_PATH = DATA_DIR / "blog.db"

Ensure directories exist

POSTS_DIR.mkdir(parents=True, exist_ok=True)
DATA_DIR.mkdir(parents=True, exist_ok=True)

def get_db() -> sqlite3.Connection:
"""Get a database connection with row factory."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
# Enable FTS5
conn.enable_load_extension(True)
return conn

def init_db():
"""Initialize the database schema."""
conn = get_db()
try:
cursor = conn.cursor()

    # Content briefs v2 table  Phase 0 of Content Pipeline v2
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS content_briefs_v2 (
            id TEXT PRIMARY KEY,
            title TEXT NOT NULL,
            struggle_angle TEXT NOT NULL,
            origin_story TEXT NOT NULL,
            attempts_json TEXT NOT NULL DEFAULT '[]',
            the_moment TEXT NOT NULL,
            the_fix TEXT NOT NULL,
            reflection TEXT NOT NULL,
            voice_checklist_json TEXT NOT NULL DEFAULT '{}',
            status TEXT NOT NULL DEFAULT 'draft'
                CHECK(status IN ('draft', 'pending', 'approved', 'rejected', 'generating', 'completed')),
            style_reference TEXT,
            target_length INTEGER DEFAULT 1500,
            created_by TEXT NOT NULL DEFAULT 'unknown',
            created_at TEXT NOT NULL DEFAULT (datetime('now')),
            updated_at TEXT NOT NULL DEFAULT (datetime('now')),
            approved_at TEXT,
            approved_by TEXT,
            generation_job_id TEXT,
            struggle_score REAL,
            content_output TEXT,
            content_html TEXT,
            published_post_id INTEGER REFERENCES posts(id) ON DELETE SET NULL
        )
    """)
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_briefs_status ON content_briefs_v2(status)")
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_briefs_created ON content_briefs_v2(created_at DESC)")
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_briefs_created_by ON content_briefs_v2(created_by)")

    # Posts table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS posts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            slug TEXT NOT NULL UNIQUE,
            title TEXT NOT NULL,
            category TEXT NOT NULL DEFAULT 'general',
            author TEXT NOT NULL DEFAULT 'HoffDesk Team',
            status TEXT NOT NULL DEFAULT 'draft' CHECK(status IN ('draft','published','archived')),
            excerpt TEXT,
            cover_image TEXT,
            featured INTEGER NOT NULL DEFAULT 0,
            published_at TEXT,
            created_at TEXT NOT NULL DEFAULT (datetime('now')),
            updated_at TEXT NOT NULL DEFAULT (datetime('now')),
            content_path TEXT NOT NULL,
            word_count INTEGER,
            reading_time INTEGER
        )
    """)

    # Tags table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS tags (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL UNIQUE
        )
    """)

    # Post tags junction table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS post_tags (
            post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
            tag_id INTEGER NOT NULL REFERENCES tags(id) ON DELETE CASCADE,
            PRIMARY KEY (post_id, tag_id)
        )
    """)

    # Indexes
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_posts_status ON posts(status)")
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_posts_category ON posts(category)")
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_posts_published ON posts(published_at DESC)")
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_posts_featured ON posts(featured) WHERE featured = 1")

    # FTS5 virtual table for full-text search
    cursor.execute("""
        CREATE VIRTUAL TABLE IF NOT EXISTS posts_fts USING fts5(
            title,
            excerpt,
            content,
            content='posts',
            content_rowid='id'
        )
    """)

    # FTS triggers to keep index in sync
    cursor.execute("""
        CREATE TRIGGER IF NOT EXISTS posts_fts_insert AFTER INSERT ON posts BEGIN
            INSERT INTO posts_fts(rowid, title, excerpt, content)
            SELECT NEW.id, NEW.title, NEW.excerpt, '';
        END
    """)

    cursor.execute("""
        CREATE TRIGGER IF NOT EXISTS posts_fts_delete AFTER DELETE ON posts BEGIN
            INSERT INTO posts_fts(posts_fts, rowid, title, excerpt, content)
            VALUES ('delete', OLD.id, OLD.title, OLD.excerpt, '');
        END
    """)

    cursor.execute("""
        CREATE TRIGGER IF NOT EXISTS posts_fts_update AFTER UPDATE ON posts BEGIN
            INSERT INTO posts_fts(posts_fts, rowid, title, excerpt, content)
            VALUES ('delete', OLD.id, OLD.title, OLD.excerpt, '');
            INSERT INTO posts_fts(rowid, title, excerpt, content)
            SELECT NEW.id, NEW.title, NEW.excerpt, '';
        END
    """)

    conn.commit()
finally:
    conn.close()

def parse_frontmatter(content: str) -> tuple[dict, str]:
"""Parse YAML frontmatter from Markdown content.

Returns (frontmatter_dict, markdown_body).
"""
if content.startswith("---"):
    parts = content.split("---", 2)
    if len(parts) >= 3:
        try:
            fm = yaml.safe_load(parts[1]) or {}
            return fm, parts[2].strip()
        except yaml.YAMLError:
            pass
return {}, content

def write_frontmatter(frontmatter: dict, body: str) -> str:
"""Write frontmatter and body to Markdown format."""
fm_yaml = yaml.dump(frontmatter, default_flow_style=False, allow_unicode=True)
return f"---\n{fm_yaml}---\n\n{body}\n"

def slugify(text: str) -> str:
"""Convert text to URL-friendly slug."""
text = text.lower()
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[-\s]+', '-', text)
return text.strip('-')[:255]

def calculate_reading_time(word_count: int) -> int:
"""Calculate reading time in minutes (200 wpm average)."""
return max(1, round(word_count / 200))

def count_words(text: str) -> int:
"""Count words in text."""
return len(text.split())

class PostNotFoundError(Exception):
"""Raised when a post is not found."""
pass

class DuplicateSlugError(Exception):
"""Raised when a slug already exists."""
pass

Initialize database on module load

init_db()