package sync import ( "bytes" "encoding/json" "fmt" "io" "net/http" "time" "git.jnss.me/joakim/opal/internal/engine" "github.com/google/uuid" ) // Client represents a sync client for communicating with the API server type Client struct { baseURL string apiKey string httpClient *http.Client clientID string } // NewClient creates a new sync client func NewClient(baseURL, apiKey, clientID string) *Client { return &Client{ baseURL: baseURL, apiKey: apiKey, clientID: clientID, httpClient: &http.Client{Timeout: 30 * time.Second}, } } // testConnection checks if the server is reachable func (c *Client) testConnection() bool { req, err := http.NewRequest("GET", c.baseURL+"/health", nil) if err != nil { return false } resp, err := c.httpClient.Do(req) if err != nil { return false } defer resp.Body.Close() return resp.StatusCode == 200 } // PullChanges retrieves changes from server since a given timestamp func (c *Client) PullChanges(since int64) ([]ChangeLogEntry, error) { reqBody := map[string]interface{}{ "since": since, "client_id": c.clientID, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } req, err := http.NewRequest("POST", c.baseURL+"/sync/changes", bytes.NewReader(bodyBytes)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Authorization", "Bearer "+c.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body)) } var result struct { Success bool `json:"success"` Data []ChangeLogEntry `json:"data"` Error string `json:"error"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } if !result.Success { return nil, fmt.Errorf("server error: %s", result.Error) } return result.Data, nil } // PushChanges sends local changes to the server func (c *Client) PushChanges(tasks []*engine.Task) error { // Convert tasks to JSON var taskData []json.RawMessage for _, task := range tasks { data, err := json.Marshal(task) if err != nil { continue } taskData = append(taskData, data) } reqBody := map[string]interface{}{ "tasks": taskData, "client_id": c.clientID, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return fmt.Errorf("failed to marshal request: %w", err) } req, err := http.NewRequest("POST", c.baseURL+"/sync/push", bytes.NewReader(bodyBytes)) if err != nil { return fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Authorization", "Bearer "+c.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body)) } return nil } // ChangeLogEntry represents a change from the server type ChangeLogEntry struct { ID int64 `json:"id"` TaskUUID string `json:"task_uuid"` ChangeType string `json:"change_type"` ChangedAt int64 `json:"changed_at"` Data string `json:"data"` } // Sync performs a full bidirectional sync func (c *Client) Sync(strategy ConflictResolution) (*SyncResult, error) { result := &SyncResult{} // Check if we have a queue with pending changes queue, err := NewQueue() if err != nil { return nil, fmt.Errorf("failed to load queue: %w", err) } // Test connection if !c.testConnection() { // Server offline - queue changes if any cfg, _ := engine.GetConfig() if cfg != nil && cfg.SyncQueueOffline { // Get local changes and queue them localChanges, _ := c.getLocalChanges(0) if len(localChanges) > 0 { _ = QueueLocalChanges(localChanges) result.QueuedOffline = len(localChanges) } result.QueuedOffline += queue.Size() return result, nil } return nil, fmt.Errorf("server unreachable") } // Server is online - process queue first if it has items if queue.Size() > 0 { if err := c.pushQueuedChanges(queue.GetPending()); err != nil { return nil, fmt.Errorf("failed to push queued changes: %w", err) } result.Pushed = queue.Size() queue.Clear() } // Get last sync time lastSync := c.getLastSyncTime() // Pull changes from server changes, err := c.PullChanges(lastSync) if err != nil { return nil, fmt.Errorf("failed to pull changes: %w", err) } // Convert changes to tasks remoteTasks, err := c.parseChanges(changes) if err != nil { return nil, fmt.Errorf("failed to parse changes: %w", err) } result.Pulled = len(remoteTasks) // Get local changes since last sync localChanges, err := c.getLocalChanges(lastSync) if err != nil { return nil, fmt.Errorf("failed to get local changes: %w", err) } // Merge merged, conflicts, err := MergeTasks(localChanges, remoteTasks, strategy) if err != nil { return nil, fmt.Errorf("failed to merge tasks: %w", err) } result.ConflictsResolved = conflicts // Apply merged tasks locally for _, task := range merged { if err := task.Save(); err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to save task %s: %v", task.UUID, err)) } } // Push local changes to server if len(localChanges) > 0 { if err := c.PushChanges(localChanges); err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to push changes: %v", err)) } else { result.Pushed += len(localChanges) } } // Update last sync time c.updateLastSyncTime(time.Now().Unix()) return result, nil } // getLastSyncTime retrieves the last sync timestamp from database func (c *Client) getLastSyncTime() int64 { db := engine.GetDB() if db == nil { return 0 } var lastSync int64 err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", c.clientID).Scan(&lastSync) if err != nil { return 0 } return lastSync } // updateLastSyncTime updates the last sync timestamp func (c *Client) updateLastSyncTime(timestamp int64) { db := engine.GetDB() if db == nil { return } _, _ = db.Exec(` INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id) VALUES (?, ?, 0) `, c.clientID, timestamp) } // getLocalChanges retrieves local changes since a timestamp func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) { db := engine.GetDB() if db == nil { return nil, fmt.Errorf("database not initialized") } rows, err := db.Query(` SELECT DISTINCT task_uuid FROM change_log WHERE changed_at > ? ORDER BY changed_at ASC `, since) if err != nil { return nil, err } defer rows.Close() var tasks []*engine.Task for rows.Next() { var uuidStr string if err := rows.Scan(&uuidStr); err != nil { continue } taskUUID, err := uuid.Parse(uuidStr) if err != nil { continue } task, err := engine.GetTask(taskUUID) if err != nil { continue } tasks = append(tasks, task) } return tasks, nil } // parseChanges converts change log entries to tasks func (c *Client) parseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) { // Group changes by UUID and use latest taskMap := make(map[string]ChangeLogEntry) for _, change := range changes { existing, exists := taskMap[change.TaskUUID] if !exists || change.ChangedAt > existing.ChangedAt { taskMap[change.TaskUUID] = change } } // Parse tasks from change data (key:value format) var tasks []*engine.Task for uuidStr, change := range taskMap { if change.ChangeType == "delete" { // Handle deletions separately continue } taskUUID, err := uuid.Parse(uuidStr) if err != nil { continue } // Try to get existing task task, err := engine.GetTask(taskUUID) if err != nil { // Task doesn't exist locally - create from change data task = &engine.Task{ UUID: taskUUID, Tags: []string{}, } } // Parse the key:value data and update task // For now, we'll use the server's task data directly via GetTask // In a more complete implementation, we'd parse the key:value format tasks = append(tasks, task) } return tasks, nil } // pushQueuedChanges sends queued changes to server func (c *Client) pushQueuedChanges(changes []QueuedChange) error { var tasks []*engine.Task for _, change := range changes { var task engine.Task if err := json.Unmarshal(change.Data, &task); err != nil { continue } tasks = append(tasks, &task) } return c.PushChanges(tasks) } // SyncResult represents the result of a sync operation type SyncResult struct { Pulled int Pushed int ConflictsResolved int QueuedOffline int Errors []string } // Display prints the sync result func (r *SyncResult) Display() { if r.QueuedOffline > 0 { fmt.Printf("⚠️ Server unreachable. %d changes queued.\n", r.QueuedOffline) return } fmt.Println("✓ Sync completed") if r.Pulled > 0 { fmt.Printf(" ↓ Pulled %d changes from server\n", r.Pulled) } if r.Pushed > 0 { fmt.Printf(" ↑ Pushed %d changes to server\n", r.Pushed) } if r.ConflictsResolved > 0 { fmt.Printf(" ⚠️ Auto-resolved %d conflicts (last-write-wins)\n", r.ConflictsResolved) fmt.Println(" Run 'opal sync log' to see details") } if len(r.Errors) > 0 { fmt.Println(" ✗ Errors:") for _, err := range r.Errors { fmt.Printf(" - %s\n", err) } } }