Files
joakim a11f452d3b fix: break sync feedback loop, respect timestamps, surface errors
- Add migration v2: source column on change_log to distinguish local
  vs sync-originated entries, preventing the echo loop where synced
  tasks get re-pushed as local changes
- PushChanges handler now skips save when server version is newer
- Client PushChanges/pushQueuedChanges collect and report marshal errors
  instead of silently dropping them
- De-duplicate getLocalChanges/getLastSyncTime into exported sync
  package functions
- Fix logConflict winner detection via pointer identity instead of
  fragile UUID+timestamp comparison
- Fix sync down to actually parse, save, and tag-sync pulled changes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 01:11:04 +01:00

756 lines
20 KiB
Go

package sync
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sort"
"strconv"
"strings"
"time"
"git.jnss.me/joakim/opal/internal/engine"
"github.com/google/uuid"
)
// Client represents a sync client for communicating with the API server
type Client struct {
baseURL string
apiKey string
httpClient *http.Client
clientID string
}
// NewClient creates a new sync client
func NewClient(baseURL, apiKey, clientID string) *Client {
return &Client{
baseURL: baseURL,
apiKey: apiKey,
clientID: clientID,
httpClient: &http.Client{Timeout: 30 * time.Second},
}
}
// testConnection checks if the server is reachable
func (c *Client) testConnection() bool {
req, err := http.NewRequest("GET", c.baseURL+"/health", nil)
if err != nil {
return false
}
resp, err := c.httpClient.Do(req)
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == 200
}
// PullChanges retrieves changes from server since a given timestamp
func (c *Client) PullChanges(since int64) ([]ChangeLogEntry, error) {
reqBody := map[string]interface{}{
"since": since,
"client_id": c.clientID,
}
bodyBytes, err := json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
req, err := http.NewRequest("POST", c.baseURL+"/sync/changes", bytes.NewReader(bodyBytes))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body))
}
var result struct {
Success bool `json:"success"`
Data []ChangeLogEntry `json:"data"`
Error string `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
if !result.Success {
return nil, fmt.Errorf("server error: %s", result.Error)
}
return result.Data, nil
}
// PushChanges sends local changes to the server
func (c *Client) PushChanges(tasks []*engine.Task) error {
// Convert tasks to JSON
var taskData []json.RawMessage
var marshalErrors []string
for _, task := range tasks {
data, err := json.Marshal(task)
if err != nil {
marshalErrors = append(marshalErrors, fmt.Sprintf("task %s: %v", task.UUID, err))
continue
}
taskData = append(taskData, data)
}
if len(taskData) == 0 && len(marshalErrors) > 0 {
return fmt.Errorf("all tasks failed to marshal: %s", strings.Join(marshalErrors, "; "))
}
reqBody := map[string]interface{}{
"tasks": taskData,
"client_id": c.clientID,
}
bodyBytes, err := json.Marshal(reqBody)
if err != nil {
return fmt.Errorf("failed to marshal request: %w", err)
}
req, err := http.NewRequest("POST", c.baseURL+"/sync/push", bytes.NewReader(bodyBytes))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body))
}
if len(marshalErrors) > 0 {
return fmt.Errorf("pushed %d tasks but %d failed to marshal: %s",
len(taskData), len(marshalErrors), strings.Join(marshalErrors, "; "))
}
return nil
}
// ChangeLogEntry represents a change from the server
type ChangeLogEntry struct {
ID int64 `json:"id"`
TaskUUID string `json:"task_uuid"`
ChangeType string `json:"change_type"`
ChangedAt int64 `json:"changed_at"`
Data string `json:"data"`
}
// Sync performs a full bidirectional sync
func (c *Client) Sync(strategy ConflictResolution, reporter ProgressReporter) (*SyncResult, error) {
result := &SyncResult{}
// Use NoOp progress if none provided
if reporter == nil {
reporter = &NoOpProgress{}
}
// Check if we have a queue with pending changes
queue, err := NewQueue()
if err != nil {
return nil, fmt.Errorf("failed to load queue: %w", err)
}
// Phase 1: Test connection
reporter.StartPhase("Testing connection", 1)
if !c.testConnection() {
reporter.CompletePhase()
// Server offline - queue changes if any
cfg, _ := engine.GetConfig()
if cfg != nil && cfg.SyncQueueOffline {
// Get local changes and queue them
localChanges, _ := c.getLocalChanges(0)
if len(localChanges) > 0 {
_ = QueueLocalChanges(localChanges)
result.QueuedOffline = len(localChanges)
}
result.QueuedOffline += queue.Size()
return result, nil
}
return nil, fmt.Errorf("server unreachable")
}
reporter.UpdatePhase(1, "Connected")
reporter.CompletePhase()
// Phase 2: Process queued changes if any
if queue.Size() > 0 {
reporter.StartPhase("Pushing queued changes", int64(queue.Size()))
if err := c.pushQueuedChanges(queue.GetPending()); err != nil {
reporter.CompletePhase()
return nil, fmt.Errorf("failed to push queued changes: %w", err)
}
result.Pushed = queue.Size()
reporter.UpdatePhase(int64(queue.Size()), fmt.Sprintf("%d changes pushed", queue.Size()))
reporter.CompletePhase()
queue.Clear()
}
// Get last sync time
lastSync := c.getLastSyncTime()
// Phase 3: Pull changes from server
reporter.StartPhase("Pulling from server", 1)
changes, err := c.PullChanges(lastSync)
if err != nil {
reporter.CompletePhase()
return nil, fmt.Errorf("failed to pull changes: %w", err)
}
reporter.UpdatePhase(1, fmt.Sprintf("Received %d changes", len(changes)))
reporter.CompletePhase()
// Phase 4: Parse and sort changes
if len(changes) > 0 {
reporter.StartPhase("Processing changes", int64(len(changes)))
}
// Convert changes to tasks
remoteTasks, err := c.ParseChanges(changes)
if err != nil {
if len(changes) > 0 {
reporter.CompletePhase()
}
return nil, fmt.Errorf("failed to parse changes: %w", err)
}
// Sort: parent tasks before children (ensures parent exists when child is saved)
sort.Slice(remoteTasks, func(i, j int) bool {
// Tasks without ParentUUID come first
iHasParent := remoteTasks[i].ParentUUID != nil
jHasParent := remoteTasks[j].ParentUUID != nil
if !iHasParent && jHasParent {
return true // i (no parent) before j (has parent)
}
if iHasParent && !jHasParent {
return false // j (no parent) before i (has parent)
}
return false // maintain original order
})
if len(changes) > 0 {
reporter.UpdatePhase(int64(len(changes)), fmt.Sprintf("Parsed %d tasks", len(remoteTasks)))
reporter.CompletePhase()
}
result.Pulled = len(remoteTasks)
// Get local changes since last sync
localChanges, err := c.getLocalChanges(lastSync)
if err != nil {
return nil, fmt.Errorf("failed to get local changes: %w", err)
}
// Merge
merged, conflicts, err := MergeTasks(localChanges, remoteTasks, strategy)
if err != nil {
return nil, fmt.Errorf("failed to merge tasks: %w", err)
}
result.ConflictsResolved = conflicts
// Phase 5: Apply merged tasks locally
if len(merged) > 0 {
reporter.StartPhase("Applying changes locally", int64(len(merged)))
}
for i, task := range merged {
// Update progress every task or every 10% for large syncs
if len(merged) < 20 || i%(len(merged)/10+1) == 0 {
desc := task.Description
if len(desc) > 40 {
desc = desc[:37] + "..."
}
reporter.UpdatePhase(int64(i+1), desc)
}
if err := task.Save(); err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to save task %s: %v", task.UUID, err))
continue
}
// Mark change_log entry as sync-originated to prevent feedback loop
if err := engine.MarkChangeLogAsSync(task.UUID.String()); err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to mark change as sync for %s: %v", task.UUID, err))
}
// Reload task to ensure we have the database ID
savedTask, err := engine.GetTask(task.UUID)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to reload task %s: %v", task.UUID, err))
continue
}
// Sync tags: task.Tags contains the desired tag list (from parsing or local)
// Get current tags from database
currentTags, _ := savedTask.GetTags()
// Build sets for comparison
currentSet := make(map[string]bool)
for _, tag := range currentTags {
currentSet[tag] = true
}
desiredSet := make(map[string]bool)
for _, tag := range task.Tags {
desiredSet[tag] = true
}
// Remove tags no longer present
for tag := range currentSet {
if !desiredSet[tag] {
savedTask.RemoveTag(tag)
}
}
// Add new tags
for tag := range desiredSet {
if !currentSet[tag] {
savedTask.AddTag(tag)
}
}
}
if len(merged) > 0 {
reporter.CompletePhase()
}
// Phase 6: Push local changes to server
if len(localChanges) > 0 {
reporter.StartPhase("Pushing to server", int64(len(localChanges)))
if err := c.PushChanges(localChanges); err != nil {
reporter.CompletePhase()
result.Errors = append(result.Errors, fmt.Sprintf("failed to push changes: %v", err))
} else {
result.Pushed += len(localChanges)
reporter.UpdatePhase(int64(len(localChanges)), fmt.Sprintf("%d changes pushed", len(localChanges)))
reporter.CompletePhase()
}
}
// Update last sync time
c.updateLastSyncTime(time.Now().Unix())
// Complete progress
reporter.Done()
return result, nil
}
// getLastSyncTime retrieves the last sync timestamp from database
func (c *Client) getLastSyncTime() int64 {
return GetLastSyncTime(c.clientID)
}
// updateLastSyncTime updates the last sync timestamp
func (c *Client) updateLastSyncTime(timestamp int64) {
db := engine.GetDB()
if db == nil {
return
}
_, _ = db.Exec(`
INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id)
VALUES (?, ?, 0)
`, c.clientID, timestamp)
}
// getLocalChanges retrieves local changes since a timestamp
func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) {
return GetLocalChanges(since)
}
// GetLastSyncTime retrieves the last sync timestamp for a client ID from the database.
func GetLastSyncTime(clientID string) int64 {
db := engine.GetDB()
if db == nil {
return 0
}
var lastSync int64
err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", clientID).Scan(&lastSync)
if err != nil {
return 0
}
return lastSync
}
// GetLocalChanges retrieves local (non-sync-originated) changes since a timestamp.
func GetLocalChanges(since int64) ([]*engine.Task, error) {
db := engine.GetDB()
if db == nil {
return nil, fmt.Errorf("database not initialized")
}
rows, err := db.Query(`
SELECT DISTINCT task_uuid
FROM change_log
WHERE changed_at > ? AND source = 'local'
ORDER BY changed_at ASC
`, since)
if err != nil {
return nil, err
}
defer rows.Close()
var tasks []*engine.Task
for rows.Next() {
var uuidStr string
if err := rows.Scan(&uuidStr); err != nil {
continue
}
taskUUID, err := uuid.Parse(uuidStr)
if err != nil {
continue
}
task, err := engine.GetTask(taskUUID)
if err != nil {
continue
}
tasks = append(tasks, task)
}
return tasks, nil
}
// ParseChanges converts change log entries to tasks
func (c *Client) ParseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) {
// Sort changes by timestamp (primary) and ID (secondary) to ensure correct order
// This handles same-second updates (e.g., CREATE followed by UPDATE with tags)
sort.Slice(changes, func(i, j int) bool {
if changes[i].ChangedAt != changes[j].ChangedAt {
return changes[i].ChangedAt < changes[j].ChangedAt
}
// Same timestamp: use ID as tiebreaker (higher ID = later change)
return changes[i].ID < changes[j].ID
})
// Group changes by UUID - last one wins (latest by timestamp+ID)
taskMap := make(map[string]ChangeLogEntry)
for _, change := range changes {
// Since changes are sorted, we can simply overwrite
// (later entries will overwrite earlier ones)
taskMap[change.TaskUUID] = change
}
// Parse tasks from change data (key:value format)
var tasks []*engine.Task
for uuidStr, change := range taskMap {
// Handle deletions
if change.ChangeType == "delete" {
taskUUID, err := uuid.Parse(uuidStr)
if err != nil {
continue
}
// Delete locally if exists
if task, err := engine.GetTask(taskUUID); err == nil {
task.Delete(false) // Soft delete
}
continue
}
// Parse the key:value data
fields, err := engine.ParseKeyValueFormat(change.Data, false) // no comments in change log
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: skipping task %s - failed to parse change data: %v\n", uuidStr, err)
continue
}
// Get or create task
taskUUID, err := uuid.Parse(uuidStr)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: skipping task - invalid UUID %s: %v\n", uuidStr, err)
continue
}
task, err := engine.GetTask(taskUUID)
if err != nil {
// Task doesn't exist locally - create new
task = &engine.Task{
UUID: taskUUID,
Tags: []string{},
}
}
// Apply parsed change data to task
if err := applyChangeDataToTask(task, fields); err != nil {
fmt.Fprintf(os.Stderr, "Warning: skipping task %s - failed to apply changes: %v\n", uuidStr, err)
continue
}
tasks = append(tasks, task)
}
return tasks, nil
}
// applyChangeDataToTask applies parsed change log data to a task
// Expects fields from ParseKeyValueFormat with sync data (Unix timestamps, etc.)
func applyChangeDataToTask(task *engine.Task, fields map[string]string) error {
// UUID (required)
if uuidStr, ok := fields["uuid"]; ok && uuidStr != "" {
parsed, err := uuid.Parse(uuidStr)
if err != nil {
return fmt.Errorf("invalid uuid '%s': %w", uuidStr, err)
}
task.UUID = parsed
}
// Description (required, non-empty)
if desc, ok := fields["description"]; ok {
if strings.TrimSpace(desc) == "" {
return fmt.Errorf("description cannot be empty")
}
task.Description = desc
}
// Status
if statusStr, ok := fields["status"]; ok && statusStr != "" {
task.Status = parseStatusFromChangeLog(statusStr)
}
// Priority
if priStr, ok := fields["priority"]; ok && priStr != "" {
task.Priority = parsePriorityFromChangeLog(priStr)
}
// Project (nullable)
if proj, ok := fields["project"]; ok {
if proj == "" {
task.Project = nil
} else {
task.Project = &proj
}
}
// Created timestamp
if createdStr, ok := fields["created"]; ok && createdStr != "" {
ts, err := strconv.ParseInt(createdStr, 10, 64)
if err != nil {
return fmt.Errorf("invalid created timestamp '%s': %w", createdStr, err)
}
task.Created = time.Unix(ts, 0)
}
// Modified timestamp
if modStr, ok := fields["modified"]; ok && modStr != "" {
ts, err := strconv.ParseInt(modStr, 10, 64)
if err != nil {
return fmt.Errorf("invalid modified timestamp '%s': %w", modStr, err)
}
task.Modified = time.Unix(ts, 0)
}
// Date fields (nullable Unix timestamps)
dateFields := map[string]**time.Time{
"start": &task.Start,
"end": &task.End,
"due": &task.Due,
"scheduled": &task.Scheduled,
"wait": &task.Wait,
"until": &task.Until,
}
for fieldName, taskField := range dateFields {
if dateStr, ok := fields[fieldName]; ok {
if dateStr == "" {
*taskField = nil
} else {
ts, err := strconv.ParseInt(dateStr, 10, 64)
if err != nil {
return fmt.Errorf("invalid %s timestamp '%s': %w", fieldName, dateStr, err)
}
t := time.Unix(ts, 0)
*taskField = &t
}
}
}
// Recurrence duration (nullable, int64 nanoseconds)
if recurStr, ok := fields["recurrence"]; ok {
if recurStr == "" {
task.RecurrenceDuration = nil
} else {
nanos, err := strconv.ParseInt(recurStr, 10, 64)
if err != nil {
return fmt.Errorf("invalid recurrence '%s': %w", recurStr, err)
}
// Validate: not negative, not unreasonably large (max 100 years)
if nanos < 0 {
return fmt.Errorf("recurrence cannot be negative: %d", nanos)
}
maxNanos := int64(time.Hour * 24 * 365 * 100)
if nanos > maxNanos {
return fmt.Errorf("recurrence too large (max 100 years): %d", nanos)
}
duration := time.Duration(nanos)
task.RecurrenceDuration = &duration
}
}
// Parent UUID (nullable)
if parentStr, ok := fields["parent_uuid"]; ok {
if parentStr == "" {
task.ParentUUID = nil
} else {
parsed, err := uuid.Parse(parentStr)
if err != nil {
return fmt.Errorf("invalid parent_uuid '%s': %w", parentStr, err)
}
task.ParentUUID = &parsed
}
}
// Tags (comma-separated, sorted)
if tagsStr, ok := fields["tags"]; ok {
task.Tags = parseTagsFromChangeLog(tagsStr)
}
return nil
}
// parseStatusFromChangeLog parses status from change log string
func parseStatusFromChangeLog(s string) engine.Status {
switch s {
case "pending":
return engine.StatusPending
case "completed":
return engine.StatusCompleted
case "deleted":
return engine.StatusDeleted
case "recurring":
return engine.StatusRecurring
default:
return engine.StatusPending
}
}
// parsePriorityFromChangeLog parses priority from change log string
func parsePriorityFromChangeLog(s string) engine.Priority {
switch s {
case "L":
return engine.PriorityLow
case "D":
return engine.PriorityDefault
case "M":
return engine.PriorityMedium
case "H":
return engine.PriorityHigh
default:
return engine.PriorityDefault
}
}
// parseTagsFromChangeLog parses and sorts tags from change log
func parseTagsFromChangeLog(s string) []string {
if s == "" {
return []string{}
}
parts := strings.Split(s, ",")
tags := make([]string, 0, len(parts))
for _, tag := range parts {
tag = strings.TrimSpace(tag)
if tag != "" {
tags = append(tags, tag)
}
}
sort.Strings(tags) // Sort alphabetically for consistency
return tags
}
// pushQueuedChanges sends queued changes to server
func (c *Client) pushQueuedChanges(changes []QueuedChange) error {
var tasks []*engine.Task
var unmarshalErrors []string
for _, change := range changes {
var task engine.Task
if err := json.Unmarshal(change.Data, &task); err != nil {
unmarshalErrors = append(unmarshalErrors, fmt.Sprintf("queued change: %v", err))
continue
}
tasks = append(tasks, &task)
}
if len(tasks) == 0 && len(unmarshalErrors) > 0 {
return fmt.Errorf("all queued changes failed to unmarshal: %s", strings.Join(unmarshalErrors, "; "))
}
if err := c.PushChanges(tasks); err != nil {
return err
}
if len(unmarshalErrors) > 0 {
return fmt.Errorf("pushed %d tasks but %d queued changes failed to unmarshal: %s",
len(tasks), len(unmarshalErrors), strings.Join(unmarshalErrors, "; "))
}
return nil
}
// SyncResult represents the result of a sync operation
type SyncResult struct {
Pulled int
Pushed int
ConflictsResolved int
QueuedOffline int
Errors []string
}
// Display prints the sync result
func (r *SyncResult) Display() {
if r.QueuedOffline > 0 {
fmt.Printf("⚠️ Server unreachable. %d changes queued.\n", r.QueuedOffline)
return
}
fmt.Println("✓ Sync completed")
if r.Pulled > 0 {
fmt.Printf(" ↓ Pulled %d changes from server\n", r.Pulled)
}
if r.Pushed > 0 {
fmt.Printf(" ↑ Pushed %d changes to server\n", r.Pushed)
}
if r.ConflictsResolved > 0 {
fmt.Printf(" ⚠️ Auto-resolved %d conflicts (last-write-wins)\n", r.ConflictsResolved)
fmt.Println(" Run 'opal sync log' to see details")
}
if len(r.Errors) > 0 {
fmt.Println(" ✗ Errors:")
for _, err := range r.Errors {
fmt.Printf(" - %s\n", err)
}
}
}