package sync import ( "bytes" "encoding/json" "fmt" "io" "net/http" "os" "sort" "strconv" "strings" "time" "git.jnss.me/joakim/opal/internal/engine" "github.com/google/uuid" ) // Client represents a sync client for communicating with the API server type Client struct { baseURL string apiKey string httpClient *http.Client clientID string } // NewClient creates a new sync client func NewClient(baseURL, apiKey, clientID string) *Client { return &Client{ baseURL: baseURL, apiKey: apiKey, clientID: clientID, httpClient: &http.Client{Timeout: 30 * time.Second}, } } // testConnection checks if the server is reachable func (c *Client) testConnection() bool { req, err := http.NewRequest("GET", c.baseURL+"/health", nil) if err != nil { return false } resp, err := c.httpClient.Do(req) if err != nil { return false } defer resp.Body.Close() return resp.StatusCode == 200 } // PullChanges retrieves changes from server since a given timestamp func (c *Client) PullChanges(since int64) ([]ChangeLogEntry, error) { reqBody := map[string]interface{}{ "since": since, "client_id": c.clientID, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } req, err := http.NewRequest("POST", c.baseURL+"/sync/changes", bytes.NewReader(bodyBytes)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Authorization", "Bearer "+c.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body)) } var result struct { Success bool `json:"success"` Data []ChangeLogEntry `json:"data"` Error string `json:"error"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } if !result.Success { return nil, fmt.Errorf("server error: %s", result.Error) } return result.Data, nil } // PushChanges sends local changes to the server func (c *Client) PushChanges(tasks []*engine.Task) error { // Convert tasks to JSON var taskData []json.RawMessage for _, task := range tasks { data, err := json.Marshal(task) if err != nil { continue } taskData = append(taskData, data) } reqBody := map[string]interface{}{ "tasks": taskData, "client_id": c.clientID, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return fmt.Errorf("failed to marshal request: %w", err) } req, err := http.NewRequest("POST", c.baseURL+"/sync/push", bytes.NewReader(bodyBytes)) if err != nil { return fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Authorization", "Bearer "+c.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body)) } return nil } // ChangeLogEntry represents a change from the server type ChangeLogEntry struct { ID int64 `json:"id"` TaskUUID string `json:"task_uuid"` ChangeType string `json:"change_type"` ChangedAt int64 `json:"changed_at"` Data string `json:"data"` } // Sync performs a full bidirectional sync func (c *Client) Sync(strategy ConflictResolution) (*SyncResult, error) { result := &SyncResult{} // Check if we have a queue with pending changes queue, err := NewQueue() if err != nil { return nil, fmt.Errorf("failed to load queue: %w", err) } // Test connection if !c.testConnection() { // Server offline - queue changes if any cfg, _ := engine.GetConfig() if cfg != nil && cfg.SyncQueueOffline { // Get local changes and queue them localChanges, _ := c.getLocalChanges(0) if len(localChanges) > 0 { _ = QueueLocalChanges(localChanges) result.QueuedOffline = len(localChanges) } result.QueuedOffline += queue.Size() return result, nil } return nil, fmt.Errorf("server unreachable") } // Server is online - process queue first if it has items if queue.Size() > 0 { if err := c.pushQueuedChanges(queue.GetPending()); err != nil { return nil, fmt.Errorf("failed to push queued changes: %w", err) } result.Pushed = queue.Size() queue.Clear() } // Get last sync time lastSync := c.getLastSyncTime() // Pull changes from server changes, err := c.PullChanges(lastSync) if err != nil { return nil, fmt.Errorf("failed to pull changes: %w", err) } // Convert changes to tasks remoteTasks, err := c.parseChanges(changes) if err != nil { return nil, fmt.Errorf("failed to parse changes: %w", err) } // Sort: parent tasks before children (ensures parent exists when child is saved) sort.Slice(remoteTasks, func(i, j int) bool { // Tasks without ParentUUID come first iHasParent := remoteTasks[i].ParentUUID != nil jHasParent := remoteTasks[j].ParentUUID != nil if !iHasParent && jHasParent { return true // i (no parent) before j (has parent) } if iHasParent && !jHasParent { return false // j (no parent) before i (has parent) } return false // maintain original order }) result.Pulled = len(remoteTasks) // Get local changes since last sync localChanges, err := c.getLocalChanges(lastSync) if err != nil { return nil, fmt.Errorf("failed to get local changes: %w", err) } // Merge merged, conflicts, err := MergeTasks(localChanges, remoteTasks, strategy) if err != nil { return nil, fmt.Errorf("failed to merge tasks: %w", err) } result.ConflictsResolved = conflicts // Apply merged tasks locally for _, task := range merged { if err := task.Save(); err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to save task %s: %v", task.UUID, err)) continue } // Reload task to ensure we have the database ID savedTask, err := engine.GetTask(task.UUID) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to reload task %s: %v", task.UUID, err)) continue } // Sync tags: task.Tags contains the desired tag list (from parsing or local) // Get current tags from database currentTags, _ := savedTask.GetTags() // Build sets for comparison currentSet := make(map[string]bool) for _, tag := range currentTags { currentSet[tag] = true } desiredSet := make(map[string]bool) for _, tag := range task.Tags { desiredSet[tag] = true } // Remove tags no longer present for tag := range currentSet { if !desiredSet[tag] { savedTask.RemoveTag(tag) } } // Add new tags for tag := range desiredSet { if !currentSet[tag] { savedTask.AddTag(tag) } } } // Push local changes to server if len(localChanges) > 0 { if err := c.PushChanges(localChanges); err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to push changes: %v", err)) } else { result.Pushed += len(localChanges) } } // Update last sync time c.updateLastSyncTime(time.Now().Unix()) return result, nil } // getLastSyncTime retrieves the last sync timestamp from database func (c *Client) getLastSyncTime() int64 { db := engine.GetDB() if db == nil { return 0 } var lastSync int64 err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", c.clientID).Scan(&lastSync) if err != nil { return 0 } return lastSync } // updateLastSyncTime updates the last sync timestamp func (c *Client) updateLastSyncTime(timestamp int64) { db := engine.GetDB() if db == nil { return } _, _ = db.Exec(` INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id) VALUES (?, ?, 0) `, c.clientID, timestamp) } // getLocalChanges retrieves local changes since a timestamp func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) { db := engine.GetDB() if db == nil { return nil, fmt.Errorf("database not initialized") } rows, err := db.Query(` SELECT DISTINCT task_uuid FROM change_log WHERE changed_at > ? ORDER BY changed_at ASC `, since) if err != nil { return nil, err } defer rows.Close() var tasks []*engine.Task for rows.Next() { var uuidStr string if err := rows.Scan(&uuidStr); err != nil { continue } taskUUID, err := uuid.Parse(uuidStr) if err != nil { continue } task, err := engine.GetTask(taskUUID) if err != nil { continue } tasks = append(tasks, task) } return tasks, nil } // parseChanges converts change log entries to tasks func (c *Client) parseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) { // Sort changes by timestamp (primary) and ID (secondary) to ensure correct order // This handles same-second updates (e.g., CREATE followed by UPDATE with tags) sort.Slice(changes, func(i, j int) bool { if changes[i].ChangedAt != changes[j].ChangedAt { return changes[i].ChangedAt < changes[j].ChangedAt } // Same timestamp: use ID as tiebreaker (higher ID = later change) return changes[i].ID < changes[j].ID }) // Group changes by UUID - last one wins (latest by timestamp+ID) taskMap := make(map[string]ChangeLogEntry) for _, change := range changes { // Since changes are sorted, we can simply overwrite // (later entries will overwrite earlier ones) taskMap[change.TaskUUID] = change } // Parse tasks from change data (key:value format) var tasks []*engine.Task for uuidStr, change := range taskMap { // Handle deletions if change.ChangeType == "delete" { taskUUID, err := uuid.Parse(uuidStr) if err != nil { continue } // Delete locally if exists if task, err := engine.GetTask(taskUUID); err == nil { task.Delete(false) // Soft delete } continue } // Parse the key:value data fields, err := engine.ParseKeyValueFormat(change.Data, false) // no comments in change log if err != nil { fmt.Fprintf(os.Stderr, "Warning: skipping task %s - failed to parse change data: %v\n", uuidStr, err) continue } // Get or create task taskUUID, err := uuid.Parse(uuidStr) if err != nil { fmt.Fprintf(os.Stderr, "Warning: skipping task - invalid UUID %s: %v\n", uuidStr, err) continue } task, err := engine.GetTask(taskUUID) if err != nil { // Task doesn't exist locally - create new task = &engine.Task{ UUID: taskUUID, Tags: []string{}, } } // Apply parsed change data to task if err := applyChangeDataToTask(task, fields); err != nil { fmt.Fprintf(os.Stderr, "Warning: skipping task %s - failed to apply changes: %v\n", uuidStr, err) continue } tasks = append(tasks, task) } return tasks, nil } // applyChangeDataToTask applies parsed change log data to a task // Expects fields from ParseKeyValueFormat with sync data (Unix timestamps, etc.) func applyChangeDataToTask(task *engine.Task, fields map[string]string) error { // UUID (required) if uuidStr, ok := fields["uuid"]; ok && uuidStr != "" { parsed, err := uuid.Parse(uuidStr) if err != nil { return fmt.Errorf("invalid uuid '%s': %w", uuidStr, err) } task.UUID = parsed } // Description (required, non-empty) if desc, ok := fields["description"]; ok { if strings.TrimSpace(desc) == "" { return fmt.Errorf("description cannot be empty") } task.Description = desc } // Status if statusStr, ok := fields["status"]; ok && statusStr != "" { task.Status = parseStatusFromChangeLog(statusStr) } // Priority if priStr, ok := fields["priority"]; ok && priStr != "" { task.Priority = parsePriorityFromChangeLog(priStr) } // Project (nullable) if proj, ok := fields["project"]; ok { if proj == "" { task.Project = nil } else { task.Project = &proj } } // Created timestamp if createdStr, ok := fields["created"]; ok && createdStr != "" { ts, err := strconv.ParseInt(createdStr, 10, 64) if err != nil { return fmt.Errorf("invalid created timestamp '%s': %w", createdStr, err) } task.Created = time.Unix(ts, 0) } // Modified timestamp if modStr, ok := fields["modified"]; ok && modStr != "" { ts, err := strconv.ParseInt(modStr, 10, 64) if err != nil { return fmt.Errorf("invalid modified timestamp '%s': %w", modStr, err) } task.Modified = time.Unix(ts, 0) } // Date fields (nullable Unix timestamps) dateFields := map[string]**time.Time{ "start": &task.Start, "end": &task.End, "due": &task.Due, "scheduled": &task.Scheduled, "wait": &task.Wait, "until": &task.Until, } for fieldName, taskField := range dateFields { if dateStr, ok := fields[fieldName]; ok { if dateStr == "" { *taskField = nil } else { ts, err := strconv.ParseInt(dateStr, 10, 64) if err != nil { return fmt.Errorf("invalid %s timestamp '%s': %w", fieldName, dateStr, err) } t := time.Unix(ts, 0) *taskField = &t } } } // Recurrence duration (nullable, int64 nanoseconds) if recurStr, ok := fields["recurrence"]; ok { if recurStr == "" { task.RecurrenceDuration = nil } else { nanos, err := strconv.ParseInt(recurStr, 10, 64) if err != nil { return fmt.Errorf("invalid recurrence '%s': %w", recurStr, err) } // Validate: not negative, not unreasonably large (max 100 years) if nanos < 0 { return fmt.Errorf("recurrence cannot be negative: %d", nanos) } maxNanos := int64(time.Hour * 24 * 365 * 100) if nanos > maxNanos { return fmt.Errorf("recurrence too large (max 100 years): %d", nanos) } duration := time.Duration(nanos) task.RecurrenceDuration = &duration } } // Parent UUID (nullable) if parentStr, ok := fields["parent_uuid"]; ok { if parentStr == "" { task.ParentUUID = nil } else { parsed, err := uuid.Parse(parentStr) if err != nil { return fmt.Errorf("invalid parent_uuid '%s': %w", parentStr, err) } task.ParentUUID = &parsed } } // Tags (comma-separated, sorted) if tagsStr, ok := fields["tags"]; ok { task.Tags = parseTagsFromChangeLog(tagsStr) } return nil } // parseStatusFromChangeLog parses status from change log string func parseStatusFromChangeLog(s string) engine.Status { switch s { case "pending": return engine.StatusPending case "completed": return engine.StatusCompleted case "deleted": return engine.StatusDeleted case "recurring": return engine.StatusRecurring default: return engine.StatusPending } } // parsePriorityFromChangeLog parses priority from change log string func parsePriorityFromChangeLog(s string) engine.Priority { switch s { case "L": return engine.PriorityLow case "D": return engine.PriorityDefault case "M": return engine.PriorityMedium case "H": return engine.PriorityHigh default: return engine.PriorityDefault } } // parseTagsFromChangeLog parses and sorts tags from change log func parseTagsFromChangeLog(s string) []string { if s == "" { return []string{} } parts := strings.Split(s, ",") tags := make([]string, 0, len(parts)) for _, tag := range parts { tag = strings.TrimSpace(tag) if tag != "" { tags = append(tags, tag) } } sort.Strings(tags) // Sort alphabetically for consistency return tags } // pushQueuedChanges sends queued changes to server func (c *Client) pushQueuedChanges(changes []QueuedChange) error { var tasks []*engine.Task for _, change := range changes { var task engine.Task if err := json.Unmarshal(change.Data, &task); err != nil { continue } tasks = append(tasks, &task) } return c.PushChanges(tasks) } // SyncResult represents the result of a sync operation type SyncResult struct { Pulled int Pushed int ConflictsResolved int QueuedOffline int Errors []string } // Display prints the sync result func (r *SyncResult) Display() { if r.QueuedOffline > 0 { fmt.Printf("⚠️ Server unreachable. %d changes queued.\n", r.QueuedOffline) return } fmt.Println("✓ Sync completed") if r.Pulled > 0 { fmt.Printf(" ↓ Pulled %d changes from server\n", r.Pulled) } if r.Pushed > 0 { fmt.Printf(" ↑ Pushed %d changes to server\n", r.Pushed) } if r.ConflictsResolved > 0 { fmt.Printf(" ⚠️ Auto-resolved %d conflicts (last-write-wins)\n", r.ConflictsResolved) fmt.Println(" Run 'opal sync log' to see details") } if len(r.Errors) > 0 { fmt.Println(" ✗ Errors:") for _, err := range r.Errors { fmt.Printf(" - %s\n", err) } } }