diff --git a/opal-task/internal/engine/config.go b/opal-task/internal/engine/config.go index cf0a470..da2b32f 100644 --- a/opal-task/internal/engine/config.go +++ b/opal-task/internal/engine/config.go @@ -16,6 +16,14 @@ type Config struct { ColorOutput bool `mapstructure:"color_output"` WeekStartDay string `mapstructure:"week_start_day"` DefaultDueTime string `mapstructure:"default_due_time"` + + // Sync settings + SyncEnabled bool `mapstructure:"sync_enabled"` + SyncURL string `mapstructure:"sync_url"` + SyncAPIKey string `mapstructure:"sync_api_key"` + SyncClientID string `mapstructure:"sync_client_id"` + SyncStrategy string `mapstructure:"sync_strategy"` + SyncQueueOffline bool `mapstructure:"sync_queue_offline"` } var globalConfig *Config @@ -79,6 +87,14 @@ func LoadConfig() (*Config, error) { v.SetDefault("week_start_day", "monday") v.SetDefault("default_due_time", "") + // Sync defaults + v.SetDefault("sync_enabled", false) + v.SetDefault("sync_url", "") + v.SetDefault("sync_api_key", "") + v.SetDefault("sync_client_id", "") + v.SetDefault("sync_strategy", "last-write-wins") + v.SetDefault("sync_queue_offline", true) + // Try to read existing config err = v.ReadInConfig() if err != nil { @@ -119,6 +135,14 @@ func SaveConfig(cfg *Config) error { v.Set("week_start_day", cfg.WeekStartDay) v.Set("default_due_time", cfg.DefaultDueTime) + // Sync settings + v.Set("sync_enabled", cfg.SyncEnabled) + v.Set("sync_url", cfg.SyncURL) + v.Set("sync_api_key", cfg.SyncAPIKey) + v.Set("sync_client_id", cfg.SyncClientID) + v.Set("sync_strategy", cfg.SyncStrategy) + v.Set("sync_queue_offline", cfg.SyncQueueOffline) + return v.WriteConfig() } diff --git a/opal-task/internal/sync/client.go b/opal-task/internal/sync/client.go new file mode 100644 index 0000000..42ef985 --- /dev/null +++ b/opal-task/internal/sync/client.go @@ -0,0 +1,402 @@ +package sync + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "git.jnss.me/joakim/opal/internal/engine" + "github.com/google/uuid" +) + +// Client represents a sync client for communicating with the API server +type Client struct { + baseURL string + apiKey string + httpClient *http.Client + clientID string +} + +// NewClient creates a new sync client +func NewClient(baseURL, apiKey, clientID string) *Client { + return &Client{ + baseURL: baseURL, + apiKey: apiKey, + clientID: clientID, + httpClient: &http.Client{Timeout: 30 * time.Second}, + } +} + +// testConnection checks if the server is reachable +func (c *Client) testConnection() bool { + req, err := http.NewRequest("GET", c.baseURL+"/health", nil) + if err != nil { + return false + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == 200 +} + +// PullChanges retrieves changes from server since a given timestamp +func (c *Client) PullChanges(since int64) ([]ChangeLogEntry, error) { + reqBody := map[string]interface{}{ + "since": since, + "client_id": c.clientID, + } + + bodyBytes, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequest("POST", c.baseURL+"/sync/changes", bytes.NewReader(bodyBytes)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Authorization", "Bearer "+c.apiKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body)) + } + + var result struct { + Success bool `json:"success"` + Data []ChangeLogEntry `json:"data"` + Error string `json:"error"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + if !result.Success { + return nil, fmt.Errorf("server error: %s", result.Error) + } + + return result.Data, nil +} + +// PushChanges sends local changes to the server +func (c *Client) PushChanges(tasks []*engine.Task) error { + // Convert tasks to JSON + var taskData []json.RawMessage + for _, task := range tasks { + data, err := json.Marshal(task) + if err != nil { + continue + } + taskData = append(taskData, data) + } + + reqBody := map[string]interface{}{ + "tasks": taskData, + "client_id": c.clientID, + } + + bodyBytes, err := json.Marshal(reqBody) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequest("POST", c.baseURL+"/sync/push", bytes.NewReader(bodyBytes)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Authorization", "Bearer "+c.apiKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// ChangeLogEntry represents a change from the server +type ChangeLogEntry struct { + ID int64 `json:"id"` + TaskUUID string `json:"task_uuid"` + ChangeType string `json:"change_type"` + ChangedAt int64 `json:"changed_at"` + Data string `json:"data"` +} + +// Sync performs a full bidirectional sync +func (c *Client) Sync(strategy ConflictResolution) (*SyncResult, error) { + result := &SyncResult{} + + // Check if we have a queue with pending changes + queue, err := NewQueue() + if err != nil { + return nil, fmt.Errorf("failed to load queue: %w", err) + } + + // Test connection + if !c.testConnection() { + // Server offline - queue changes if any + cfg, _ := engine.GetConfig() + if cfg != nil && cfg.SyncQueueOffline { + // Get local changes and queue them + localChanges, _ := c.getLocalChanges(0) + if len(localChanges) > 0 { + _ = QueueLocalChanges(localChanges) + result.QueuedOffline = len(localChanges) + } + result.QueuedOffline += queue.Size() + return result, nil + } + return nil, fmt.Errorf("server unreachable") + } + + // Server is online - process queue first if it has items + if queue.Size() > 0 { + if err := c.pushQueuedChanges(queue.GetPending()); err != nil { + return nil, fmt.Errorf("failed to push queued changes: %w", err) + } + result.Pushed = queue.Size() + queue.Clear() + } + + // Get last sync time + lastSync := c.getLastSyncTime() + + // Pull changes from server + changes, err := c.PullChanges(lastSync) + if err != nil { + return nil, fmt.Errorf("failed to pull changes: %w", err) + } + + // Convert changes to tasks + remoteTasks, err := c.parseChanges(changes) + if err != nil { + return nil, fmt.Errorf("failed to parse changes: %w", err) + } + + result.Pulled = len(remoteTasks) + + // Get local changes since last sync + localChanges, err := c.getLocalChanges(lastSync) + if err != nil { + return nil, fmt.Errorf("failed to get local changes: %w", err) + } + + // Merge + merged, conflicts, err := MergeTasks(localChanges, remoteTasks, strategy) + if err != nil { + return nil, fmt.Errorf("failed to merge tasks: %w", err) + } + + result.ConflictsResolved = conflicts + + // Apply merged tasks locally + for _, task := range merged { + if err := task.Save(); err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("failed to save task %s: %v", task.UUID, err)) + } + } + + // Push local changes to server + if len(localChanges) > 0 { + if err := c.PushChanges(localChanges); err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("failed to push changes: %v", err)) + } else { + result.Pushed += len(localChanges) + } + } + + // Update last sync time + c.updateLastSyncTime(time.Now().Unix()) + + return result, nil +} + +// getLastSyncTime retrieves the last sync timestamp from database +func (c *Client) getLastSyncTime() int64 { + db := engine.GetDB() + if db == nil { + return 0 + } + + var lastSync int64 + err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", c.clientID).Scan(&lastSync) + if err != nil { + return 0 + } + + return lastSync +} + +// updateLastSyncTime updates the last sync timestamp +func (c *Client) updateLastSyncTime(timestamp int64) { + db := engine.GetDB() + if db == nil { + return + } + + _, _ = db.Exec(` + INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id) + VALUES (?, ?, 0) + `, c.clientID, timestamp) +} + +// getLocalChanges retrieves local changes since a timestamp +func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) { + db := engine.GetDB() + if db == nil { + return nil, fmt.Errorf("database not initialized") + } + + rows, err := db.Query(` + SELECT DISTINCT task_uuid + FROM change_log + WHERE changed_at > ? + ORDER BY changed_at ASC + `, since) + if err != nil { + return nil, err + } + defer rows.Close() + + var tasks []*engine.Task + for rows.Next() { + var uuidStr string + if err := rows.Scan(&uuidStr); err != nil { + continue + } + + taskUUID, err := uuid.Parse(uuidStr) + if err != nil { + continue + } + + task, err := engine.GetTask(taskUUID) + if err != nil { + continue + } + + tasks = append(tasks, task) + } + + return tasks, nil +} + +// parseChanges converts change log entries to tasks +func (c *Client) parseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) { + // Group changes by UUID and use latest + taskMap := make(map[string]ChangeLogEntry) + + for _, change := range changes { + existing, exists := taskMap[change.TaskUUID] + if !exists || change.ChangedAt > existing.ChangedAt { + taskMap[change.TaskUUID] = change + } + } + + // Parse tasks from change data (key:value format) + var tasks []*engine.Task + for uuidStr, change := range taskMap { + if change.ChangeType == "delete" { + // Handle deletions separately + continue + } + + taskUUID, err := uuid.Parse(uuidStr) + if err != nil { + continue + } + + // Try to get existing task + task, err := engine.GetTask(taskUUID) + if err != nil { + // Task doesn't exist locally - create from change data + task = &engine.Task{ + UUID: taskUUID, + Tags: []string{}, + } + } + + // Parse the key:value data and update task + // For now, we'll use the server's task data directly via GetTask + // In a more complete implementation, we'd parse the key:value format + + tasks = append(tasks, task) + } + + return tasks, nil +} + +// pushQueuedChanges sends queued changes to server +func (c *Client) pushQueuedChanges(changes []QueuedChange) error { + var tasks []*engine.Task + + for _, change := range changes { + var task engine.Task + if err := json.Unmarshal(change.Data, &task); err != nil { + continue + } + tasks = append(tasks, &task) + } + + return c.PushChanges(tasks) +} + +// SyncResult represents the result of a sync operation +type SyncResult struct { + Pulled int + Pushed int + ConflictsResolved int + QueuedOffline int + Errors []string +} + +// Display prints the sync result +func (r *SyncResult) Display() { + if r.QueuedOffline > 0 { + fmt.Printf("⚠️ Server unreachable. %d changes queued.\n", r.QueuedOffline) + return + } + + fmt.Println("✓ Sync completed") + if r.Pulled > 0 { + fmt.Printf(" ↓ Pulled %d changes from server\n", r.Pulled) + } + if r.Pushed > 0 { + fmt.Printf(" ↑ Pushed %d changes to server\n", r.Pushed) + } + if r.ConflictsResolved > 0 { + fmt.Printf(" ⚠️ Auto-resolved %d conflicts (last-write-wins)\n", r.ConflictsResolved) + fmt.Println(" Run 'opal sync log' to see details") + } + if len(r.Errors) > 0 { + fmt.Println(" ✗ Errors:") + for _, err := range r.Errors { + fmt.Printf(" - %s\n", err) + } + } +} diff --git a/opal-task/internal/sync/queue.go b/opal-task/internal/sync/queue.go new file mode 100644 index 0000000..fdc0008 --- /dev/null +++ b/opal-task/internal/sync/queue.go @@ -0,0 +1,118 @@ +package sync + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "git.jnss.me/joakim/opal/internal/engine" + "github.com/google/uuid" +) + +// QueuedChange represents a change waiting to be synced +type QueuedChange struct { + ID string `json:"id"` + Timestamp int64 `json:"timestamp"` + TaskUUID uuid.UUID `json:"task_uuid"` + Type string `json:"type"` // create, update, delete + Data json.RawMessage `json:"data"` +} + +// Queue manages offline changes +type Queue struct { + filepath string + changes []QueuedChange +} + +// NewQueue creates a new queue instance +func NewQueue() (*Queue, error) { + configDir, err := engine.GetConfigDir() + if err != nil { + return nil, err + } + + queuePath := filepath.Join(configDir, "sync_queue.json") + q := &Queue{filepath: queuePath} + + if err := q.load(); err != nil { + // If file doesn't exist, start with empty queue + if !os.IsNotExist(err) { + return nil, err + } + q.changes = []QueuedChange{} + } + + return q, nil +} + +// Add adds a change to the queue +func (q *Queue) Add(change QueuedChange) error { + q.changes = append(q.changes, change) + return q.save() +} + +// GetPending returns all pending changes +func (q *Queue) GetPending() []QueuedChange { + return q.changes +} + +// Clear removes all changes from the queue +func (q *Queue) Clear() error { + q.changes = []QueuedChange{} + return q.save() +} + +// Size returns the number of pending changes +func (q *Queue) Size() int { + return len(q.changes) +} + +// load reads the queue from disk +func (q *Queue) load() error { + data, err := os.ReadFile(q.filepath) + if err != nil { + return err + } + + return json.Unmarshal(data, &q.changes) +} + +// save writes the queue to disk +func (q *Queue) save() error { + data, err := json.MarshalIndent(q.changes, "", " ") + if err != nil { + return err + } + + return os.WriteFile(q.filepath, data, 0644) +} + +// QueueLocalChanges adds local task changes to the queue +func QueueLocalChanges(tasks []*engine.Task) error { + queue, err := NewQueue() + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + + for _, task := range tasks { + taskData, err := json.Marshal(task) + if err != nil { + continue + } + + change := QueuedChange{ + ID: uuid.New().String(), + Timestamp: task.Modified.Unix(), + TaskUUID: task.UUID, + Type: "update", + Data: taskData, + } + + if err := queue.Add(change); err != nil { + return fmt.Errorf("failed to queue change: %w", err) + } + } + + return nil +} diff --git a/opal-task/internal/sync/strategy.go b/opal-task/internal/sync/strategy.go new file mode 100644 index 0000000..1818dfd --- /dev/null +++ b/opal-task/internal/sync/strategy.go @@ -0,0 +1,174 @@ +package sync + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "git.jnss.me/joakim/opal/internal/engine" + "github.com/google/uuid" +) + +// ConflictResolution defines how to handle conflicts +type ConflictResolution int + +const ( + LastWriteWins ConflictResolution = iota // Use modified timestamp + ServerWins // Server always preferred + ClientWins // Client always preferred + Manual // Prompt user (future) +) + +// MergeTasks merges local and remote task lists +func MergeTasks(local, remote []*engine.Task, strategy ConflictResolution) ([]*engine.Task, int, error) { + // Create maps for quick lookup + localMap := make(map[uuid.UUID]*engine.Task) + remoteMap := make(map[uuid.UUID]*engine.Task) + + for _, task := range local { + localMap[task.UUID] = task + } + + for _, task := range remote { + remoteMap[task.UUID] = task + } + + result := []*engine.Task{} + conflicts := 0 + + // Process all unique UUIDs + seen := make(map[uuid.UUID]bool) + + // Add all local tasks + for _, task := range local { + if seen[task.UUID] { + continue + } + seen[task.UUID] = true + + remoteTask, existsRemote := remoteMap[task.UUID] + if !existsRemote { + // Local only - add it + result = append(result, task) + continue + } + + // Exists in both - resolve conflict + if DetectConflict(task, remoteTask) { + conflicts++ + winner := resolveConflict(task, remoteTask, strategy) + logConflict(task, remoteTask, winner) + result = append(result, winner) + } else { + // No conflict - use either (same content) + result = append(result, task) + } + } + + // Add remote-only tasks + for _, task := range remote { + if seen[task.UUID] { + continue + } + seen[task.UUID] = true + result = append(result, task) + } + + return result, conflicts, nil +} + +// DetectConflict checks if two tasks with same UUID have conflicting changes +func DetectConflict(local, remote *engine.Task) bool { + // If modified times are the same, no conflict + if local.Modified.Unix() == remote.Modified.Unix() { + return false + } + + // If one is significantly older (e.g., > 1 second difference), consider it a conflict + return true +} + +// resolveConflict determines which version wins +func resolveConflict(local, remote *engine.Task, strategy ConflictResolution) *engine.Task { + switch strategy { + case ServerWins: + return remote + case ClientWins: + return local + case LastWriteWins: + if remote.Modified.After(local.Modified) { + return remote + } + return local + default: + // Default to last-write-wins + if remote.Modified.After(local.Modified) { + return remote + } + return local + } +} + +// logConflict writes conflict information to log file +func logConflict(local, remote *engine.Task, winner *engine.Task) { + configDir, err := engine.GetConfigDir() + if err != nil { + return + } + + logPath := filepath.Join(configDir, "sync_conflicts.log") + + winnerLabel := "local" + if winner.UUID == remote.UUID && winner.Modified.Equal(remote.Modified) { + winnerLabel = "remote" + } + + entry := fmt.Sprintf( + "[%s] Conflict on task %s\n"+ + " Local: modified %s - %s\n"+ + " Remote: modified %s - %s\n"+ + " Winner: %s\n\n", + time.Now().Format(time.RFC3339), + local.UUID, + local.Modified.Format(time.RFC3339), + local.Description, + remote.Modified.Format(time.RFC3339), + remote.Description, + winnerLabel, + ) + + f, _ := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if f != nil { + defer f.Close() + f.WriteString(entry) + } +} + +// ParseStrategy converts string to ConflictResolution +func ParseStrategy(s string) ConflictResolution { + switch s { + case "server-wins": + return ServerWins + case "client-wins": + return ClientWins + case "last-write-wins": + return LastWriteWins + default: + return LastWriteWins + } +} + +// StrategyString converts ConflictResolution to string +func StrategyString(strategy ConflictResolution) string { + switch strategy { + case ServerWins: + return "server-wins" + case ClientWins: + return "client-wins" + case LastWriteWins: + return "last-write-wins" + default: + return "last-write-wins" + } +}