Files
gems/opal-task/internal/engine/database.go
T
joakim 4eb18388db feat(backend): add OAuth2/JWT authentication support
- Add OAuth2 client for Authentik integration
- Implement JWT token generation and validation
- Add refresh token support with database storage
- Update database schema with oauth_subject, oauth_provider, and refresh_tokens table
- Create auth package with config, jwt, oauth, and token management
- Add OAuth endpoints: /auth/login, /auth/callback, /auth/refresh, /auth/logout
- Update AuthMiddleware to support both JWT and API key authentication
- Add user helper functions for OAuth user creation and retrieval
- Add .env.example with OAuth configuration template

API keys still work for CLI compatibility while JWT tokens support web/mobile clients.
2026-01-06 15:42:03 +01:00

390 lines
12 KiB
Go

package engine
import (
"database/sql"
"fmt"
_ "github.com/mattn/go-sqlite3"
)
var db *sql.DB
// InitDB initializes the database connection and runs migrations
func InitDB() error {
if db != nil {
return nil // Already initialized
}
dbPath, err := GetDBPath()
if err != nil {
return fmt.Errorf("failed to get database path: %w", err)
}
// Open database connection
database, err := sql.Open("sqlite3", dbPath)
if err != nil {
return fmt.Errorf("failed to open database: %w", err)
}
// Enable foreign keys
if _, err := database.Exec("PRAGMA foreign_keys = ON"); err != nil {
return fmt.Errorf("failed to enable foreign keys: %w", err)
}
db = database
// Run migrations
if err := runMigrations(); err != nil {
return fmt.Errorf("failed to run migrations: %w", err)
}
return nil
}
// GetDB returns the database connection
func GetDB() *sql.DB {
return db
}
// CloseDB closes the database connection
func CloseDB() error {
if db != nil {
return db.Close()
}
return nil
}
// runMigrations runs database migrations
func runMigrations() error {
// Create schema_version table if it doesn't exist
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at INTEGER NOT NULL
)
`)
if err != nil {
return fmt.Errorf("failed to create schema_version table: %w", err)
}
// Get current schema version
var currentVersion int
err = db.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_version").Scan(&currentVersion)
if err != nil {
return fmt.Errorf("failed to get current schema version: %w", err)
}
// Define migrations
migrations := []struct {
version int
sql string
}{
{
version: 1,
sql: `
CREATE TABLE tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE NOT NULL,
status INTEGER NOT NULL DEFAULT 80,
description TEXT NOT NULL,
project TEXT,
priority INTEGER DEFAULT 1,
created INTEGER NOT NULL,
modified INTEGER NOT NULL,
start INTEGER,
end INTEGER,
due INTEGER,
scheduled INTEGER,
wait INTEGER,
until_date INTEGER,
recurrence_duration INTEGER,
parent_uuid TEXT,
FOREIGN KEY (parent_uuid) REFERENCES tasks(uuid) ON DELETE CASCADE
);
CREATE INDEX idx_tasks_status ON tasks(status);
CREATE INDEX idx_tasks_uuid ON tasks(uuid);
CREATE INDEX idx_tasks_parent ON tasks(parent_uuid);
CREATE INDEX idx_tasks_due ON tasks(due);
CREATE INDEX idx_tasks_project ON tasks(project);
CREATE TABLE tags (
task_id INTEGER NOT NULL,
tag TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
PRIMARY KEY (task_id, tag)
);
CREATE INDEX idx_tags_tag ON tags(tag);
CREATE TABLE working_set (
display_id INTEGER PRIMARY KEY,
task_uuid TEXT NOT NULL,
FOREIGN KEY (task_uuid) REFERENCES tasks(uuid) ON DELETE CASCADE
);
-- Users table (minimal for now, expandable later)
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT,
oauth_subject TEXT UNIQUE,
oauth_provider TEXT DEFAULT 'authentik',
created_at INTEGER NOT NULL
);
-- Default shared user for household
INSERT INTO users (id, username, created_at) VALUES (1, 'shared', unixepoch());
-- API Keys for authentication
CREATE TABLE api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
user_id INTEGER NOT NULL DEFAULT 1,
created_at INTEGER NOT NULL,
last_used INTEGER,
revoked INTEGER DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE INDEX idx_api_keys_key ON api_keys(key);
CREATE INDEX idx_api_keys_user ON api_keys(user_id);
-- Sync state (per client device)
CREATE TABLE sync_state (
client_id TEXT PRIMARY KEY,
last_sync INTEGER NOT NULL,
last_change_id INTEGER DEFAULT 0
);
-- Change log (key:value format like edit command)
CREATE TABLE change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_uuid TEXT NOT NULL,
change_type TEXT NOT NULL,
changed_at INTEGER NOT NULL,
data TEXT NOT NULL,
FOREIGN KEY (task_uuid) REFERENCES tasks(uuid) ON DELETE CASCADE
);
CREATE INDEX idx_change_log_timestamp ON change_log(changed_at);
CREATE INDEX idx_change_log_task ON change_log(task_uuid);
CREATE INDEX idx_change_log_id ON change_log(id);
-- Sync configuration
CREATE TABLE sync_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
-- Default: keep change log for 30 days
INSERT INTO sync_config (key, value) VALUES ('change_log_retention_days', '30');
-- Refresh tokens for OAuth
CREATE TABLE refresh_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token_hash TEXT UNIQUE NOT NULL,
expires_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
revoked INTEGER DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE INDEX idx_refresh_tokens_hash ON refresh_tokens(token_hash);
CREATE INDEX idx_refresh_tokens_user ON refresh_tokens(user_id);
-- Triggers to populate change_log
CREATE TRIGGER track_task_create AFTER INSERT ON tasks
BEGIN
INSERT INTO change_log (task_uuid, change_type, changed_at, data)
VALUES (NEW.uuid, 'create', NEW.modified,
'uuid: ' || NEW.uuid || CHAR(10) ||
'description: ' || NEW.description || CHAR(10) ||
'status: ' || CASE NEW.status
WHEN 80 THEN 'pending'
WHEN 67 THEN 'completed'
WHEN 68 THEN 'deleted'
WHEN 82 THEN 'recurring'
ELSE 'pending'
END || CHAR(10) ||
'priority: ' || CASE NEW.priority
WHEN 0 THEN 'L'
WHEN 1 THEN 'D'
WHEN 2 THEN 'M'
WHEN 3 THEN 'H'
ELSE 'D'
END || CHAR(10) ||
CASE WHEN NEW.project IS NOT NULL THEN 'project: ' || NEW.project || CHAR(10) ELSE '' END ||
'created: ' || NEW.created || CHAR(10) ||
'modified: ' || NEW.modified || CHAR(10) ||
CASE WHEN NEW.start IS NOT NULL THEN 'start: ' || NEW.start || CHAR(10) ELSE '' END ||
CASE WHEN NEW.end IS NOT NULL THEN 'end: ' || NEW.end || CHAR(10) ELSE '' END ||
CASE WHEN NEW.due IS NOT NULL THEN 'due: ' || NEW.due || CHAR(10) ELSE '' END ||
CASE WHEN NEW.scheduled IS NOT NULL THEN 'scheduled: ' || NEW.scheduled || CHAR(10) ELSE '' END ||
CASE WHEN NEW.wait IS NOT NULL THEN 'wait: ' || NEW.wait || CHAR(10) ELSE '' END ||
CASE WHEN NEW.until_date IS NOT NULL THEN 'until: ' || NEW.until_date || CHAR(10) ELSE '' END ||
CASE WHEN NEW.recurrence_duration IS NOT NULL THEN 'recurrence: ' || NEW.recurrence_duration || CHAR(10) ELSE '' END ||
CASE WHEN NEW.parent_uuid IS NOT NULL THEN 'parent_uuid: ' || NEW.parent_uuid || CHAR(10) ELSE '' END ||
(SELECT CASE WHEN COUNT(*) > 0
THEN 'tags: ' || GROUP_CONCAT(tag, ',') || CHAR(10)
ELSE ''
END
FROM (SELECT tag FROM tags WHERE task_id = NEW.id ORDER BY tag))
);
END;
CREATE TRIGGER track_task_update AFTER UPDATE ON tasks
BEGIN
INSERT INTO change_log (task_uuid, change_type, changed_at, data)
VALUES (NEW.uuid, 'update', NEW.modified,
'uuid: ' || NEW.uuid || CHAR(10) ||
'description: ' || NEW.description || CHAR(10) ||
'status: ' || CASE NEW.status
WHEN 80 THEN 'pending'
WHEN 67 THEN 'completed'
WHEN 68 THEN 'deleted'
WHEN 82 THEN 'recurring'
ELSE 'pending'
END || CHAR(10) ||
'priority: ' || CASE NEW.priority
WHEN 0 THEN 'L'
WHEN 1 THEN 'D'
WHEN 2 THEN 'M'
WHEN 3 THEN 'H'
ELSE 'D'
END || CHAR(10) ||
CASE WHEN NEW.project IS NOT NULL THEN 'project: ' || NEW.project || CHAR(10) ELSE '' END ||
'created: ' || NEW.created || CHAR(10) ||
'modified: ' || NEW.modified || CHAR(10) ||
CASE WHEN NEW.start IS NOT NULL THEN 'start: ' || NEW.start || CHAR(10) ELSE '' END ||
CASE WHEN NEW.end IS NOT NULL THEN 'end: ' || NEW.end || CHAR(10) ELSE '' END ||
CASE WHEN NEW.due IS NOT NULL THEN 'due: ' || NEW.due || CHAR(10) ELSE '' END ||
CASE WHEN NEW.scheduled IS NOT NULL THEN 'scheduled: ' || NEW.scheduled || CHAR(10) ELSE '' END ||
CASE WHEN NEW.wait IS NOT NULL THEN 'wait: ' || NEW.wait || CHAR(10) ELSE '' END ||
CASE WHEN NEW.until_date IS NOT NULL THEN 'until: ' || NEW.until_date || CHAR(10) ELSE '' END ||
CASE WHEN NEW.recurrence_duration IS NOT NULL THEN 'recurrence: ' || NEW.recurrence_duration || CHAR(10) ELSE '' END ||
CASE WHEN NEW.parent_uuid IS NOT NULL THEN 'parent_uuid: ' || NEW.parent_uuid || CHAR(10) ELSE '' END ||
(SELECT CASE WHEN COUNT(*) > 0
THEN 'tags: ' || GROUP_CONCAT(tag, ',') || CHAR(10)
ELSE ''
END
FROM (SELECT tag FROM tags WHERE task_id = NEW.id ORDER BY tag))
);
END;
CREATE TRIGGER track_task_delete AFTER DELETE ON tasks
BEGIN
INSERT INTO change_log (task_uuid, change_type, changed_at, data)
VALUES (OLD.uuid, 'delete', unixepoch(), 'uuid: ' || OLD.uuid);
END;
`,
},
}
// Apply pending migrations
for _, migration := range migrations {
if migration.version > currentVersion {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.version, err)
}
// Execute migration SQL
if _, err := tx.Exec(migration.sql); err != nil {
tx.Rollback()
return fmt.Errorf("failed to execute migration %d: %w", migration.version, err)
}
// Record migration
if _, err := tx.Exec(
"INSERT INTO schema_version (version, applied_at) VALUES (?, ?)",
migration.version,
getCurrentTimestamp(),
); err != nil {
tx.Rollback()
return fmt.Errorf("failed to record migration %d: %w", migration.version, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit migration %d: %w", migration.version, err)
}
}
}
return nil
}
// getCurrentTimestamp returns the current Unix timestamp
func getCurrentTimestamp() int64 {
return timeNow().Unix()
}
// GetCurrentTimestamp returns the current Unix timestamp (exported for API use)
func GetCurrentTimestamp() int64 {
return getCurrentTimestamp()
}
// CleanupChangeLog removes old change log entries based on retention policy
func CleanupChangeLog() error {
db := GetDB()
if db == nil {
return fmt.Errorf("database not initialized")
}
// Get retention days from config
var retentionDays int
err := db.QueryRow("SELECT value FROM sync_config WHERE key = 'change_log_retention_days'").Scan(&retentionDays)
if err != nil {
retentionDays = 30 // Default to 30 days if not found
}
// Calculate cutoff timestamp
cutoffTime := timeNow().AddDate(0, 0, -retentionDays).Unix()
// Delete old entries
result, err := db.Exec("DELETE FROM change_log WHERE changed_at < ?", cutoffTime)
if err != nil {
return fmt.Errorf("failed to cleanup change log: %w", err)
}
rows, _ := result.RowsAffected()
if rows > 0 {
fmt.Printf("Cleaned up %d old change log entries\n", rows)
}
return nil
}
// GetChangeLogRetentionDays returns the configured retention period
func GetChangeLogRetentionDays() (int, error) {
db := GetDB()
if db == nil {
return 0, fmt.Errorf("database not initialized")
}
var days int
err := db.QueryRow("SELECT value FROM sync_config WHERE key = 'change_log_retention_days'").Scan(&days)
if err != nil {
return 30, nil // Default
}
return days, nil
}
// SetChangeLogRetentionDays sets the retention period
func SetChangeLogRetentionDays(days int) error {
db := GetDB()
if db == nil {
return fmt.Errorf("database not initialized")
}
_, err := db.Exec("INSERT OR REPLACE INTO sync_config (key, value) VALUES ('change_log_retention_days', ?)", days)
return err
}