From a11f452d3b90be6bd8d0a5437cada55f0bb4ef85 Mon Sep 17 00:00:00 2001 From: Joakim Date: Sat, 21 Feb 2026 01:11:04 +0100 Subject: [PATCH] fix: break sync feedback loop, respect timestamps, surface errors - Add migration v2: source column on change_log to distinguish local vs sync-originated entries, preventing the echo loop where synced tasks get re-pushed as local changes - PushChanges handler now skips save when server version is newer - Client PushChanges/pushQueuedChanges collect and report marshal errors instead of silently dropping them - De-duplicate getLocalChanges/getLastSyncTime into exported sync package functions - Fix logConflict winner detection via pointer identity instead of fragile UUID+timestamp comparison - Fix sync down to actually parse, save, and tag-sync pulled changes Co-Authored-By: Claude Opus 4.6 --- opal-task/cmd/sync.go | 113 +++++++++++------------- opal-task/internal/api/handlers/sync.go | 9 +- opal-task/internal/engine/database.go | 28 ++++++ opal-task/internal/sync/client.go | 75 ++++++++++++---- opal-task/internal/sync/strategy.go | 13 ++- 5 files changed, 151 insertions(+), 87 deletions(-) diff --git a/opal-task/cmd/sync.go b/opal-task/cmd/sync.go index a44c4d2..004474d 100644 --- a/opal-task/cmd/sync.go +++ b/opal-task/cmd/sync.go @@ -224,8 +224,8 @@ var syncUpCmd = &cobra.Command{ client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID) // Get local changes - lastSync := getLastSyncTime(cfg.SyncClientID) - localChanges, err := getLocalChanges(lastSync) + lastSync := sync.GetLastSyncTime(cfg.SyncClientID) + localChanges, err := sync.GetLocalChanges(lastSync) if err != nil { fmt.Fprintf(os.Stderr, "Error getting local changes: %v\n", err) os.Exit(1) @@ -264,7 +264,7 @@ var syncDownCmd = &cobra.Command{ } client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID) - lastSync := getLastSyncTime(cfg.SyncClientID) + lastSync := sync.GetLastSyncTime(cfg.SyncClientID) changes, err := client.PullChanges(lastSync) if err != nil { @@ -277,7 +277,55 @@ var syncDownCmd = &cobra.Command{ return } - fmt.Printf("✓ Pulled %d changes from server\n", len(changes)) + // Parse changes into tasks + tasks, err := client.ParseChanges(changes) + if err != nil { + fmt.Fprintf(os.Stderr, "Error parsing changes: %v\n", err) + os.Exit(1) + } + + // Apply each task locally + var applied int + for _, task := range tasks { + if err := task.Save(); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to save task %s: %v\n", task.UUID, err) + continue + } + + // Mark as sync-originated to prevent feedback loop + _ = engine.MarkChangeLogAsSync(task.UUID.String()) + + // Sync tags + savedTask, err := engine.GetTask(task.UUID) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to reload task %s: %v\n", task.UUID, err) + continue + } + + currentTags, _ := savedTask.GetTags() + currentSet := make(map[string]bool) + for _, tag := range currentTags { + currentSet[tag] = true + } + desiredSet := make(map[string]bool) + for _, tag := range task.Tags { + desiredSet[tag] = true + } + for tag := range currentSet { + if !desiredSet[tag] { + savedTask.RemoveTag(tag) + } + } + for tag := range desiredSet { + if !currentSet[tag] { + savedTask.AddTag(tag) + } + } + + applied++ + } + + fmt.Printf("✓ Pulled %d changes, applied %d tasks from server\n", len(changes), applied) }, } @@ -414,63 +462,6 @@ func init() { syncCmd.PersistentFlags().BoolVarP(&quietFlag, "quiet", "q", false, "Suppress progress output") } -// Helper functions - -func getLastSyncTime(clientID string) int64 { - db := engine.GetDB() - if db == nil { - return 0 - } - - var lastSync int64 - err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", clientID).Scan(&lastSync) - if err != nil { - return 0 - } - - return lastSync -} - -func getLocalChanges(since int64) ([]*engine.Task, error) { - db := engine.GetDB() - if db == nil { - return nil, fmt.Errorf("database not initialized") - } - - rows, err := db.Query(` - SELECT DISTINCT task_uuid - FROM change_log - WHERE changed_at > ? - ORDER BY changed_at ASC - `, since) - if err != nil { - return nil, err - } - defer rows.Close() - - var tasks []*engine.Task - for rows.Next() { - var uuidStr string - if err := rows.Scan(&uuidStr); err != nil { - continue - } - - taskUUID, err := uuid.Parse(uuidStr) - if err != nil { - continue - } - - task, err := engine.GetTask(taskUUID) - if err != nil { - continue - } - - tasks = append(tasks, task) - } - - return tasks, nil -} - func formatTimestamp(ts int64) string { t := time.Unix(ts, 0) now := time.Now() diff --git a/opal-task/internal/api/handlers/sync.go b/opal-task/internal/api/handlers/sync.go index 996a4ae..bc63b00 100644 --- a/opal-task/internal/api/handlers/sync.go +++ b/opal-task/internal/api/handlers/sync.go @@ -104,6 +104,8 @@ func PushChanges(w http.ResponseWriter, r *http.Request) { if err := task.Save(); err != nil { continue } + // Mark as sync-originated to prevent feedback loop + _ = engine.MarkChangeLogAsSync(task.UUID.String()) // Add tags for _, tag := range task.Tags { _ = task.AddTag(tag) @@ -114,15 +116,18 @@ func PushChanges(w http.ResponseWriter, r *http.Request) { // Task exists - check timestamps for conflicts if existing.Modified.Unix() > task.Modified.Unix() { - // Server version is newer - conflict (but we'll apply last-write-wins) + // Server version is newer - skip this push conflicts++ + continue } - // Apply changes (last-write-wins) + // Apply changes (client is newer or equal) task.ID = existing.ID // Preserve database ID if err := task.Save(); err != nil { continue } + // Mark as sync-originated to prevent feedback loop + _ = engine.MarkChangeLogAsSync(task.UUID.String()) // Sync tags existingTags := make(map[string]bool) diff --git a/opal-task/internal/engine/database.go b/opal-task/internal/engine/database.go index 7722295..b136c53 100644 --- a/opal-task/internal/engine/database.go +++ b/opal-task/internal/engine/database.go @@ -307,6 +307,13 @@ func runMigrations() error { END; `, }, + { + version: 2, + sql: ` + ALTER TABLE change_log ADD COLUMN source TEXT NOT NULL DEFAULT 'local'; + CREATE INDEX idx_change_log_source ON change_log(source); + `, + }, } // Apply pending migrations @@ -409,3 +416,24 @@ func SetChangeLogRetentionDays(days int) error { _, err := db.Exec("INSERT OR REPLACE INTO sync_config (key, value) VALUES ('change_log_retention_days', ?)", days) return err } + +// MarkChangeLogAsSync marks the most recent change_log entry for a task UUID +// as originating from sync (not local), preventing the feedback loop where +// synced changes get re-pushed as local changes. +func MarkChangeLogAsSync(taskUUID string) error { + db := GetDB() + if db == nil { + return fmt.Errorf("database not initialized") + } + + _, err := db.Exec(` + UPDATE change_log SET source = 'sync' + WHERE id = ( + SELECT id FROM change_log + WHERE task_uuid = ? + ORDER BY id DESC + LIMIT 1 + ) + `, taskUUID) + return err +} diff --git a/opal-task/internal/sync/client.go b/opal-task/internal/sync/client.go index e03b9f0..439136f 100644 --- a/opal-task/internal/sync/client.go +++ b/opal-task/internal/sync/client.go @@ -102,14 +102,20 @@ func (c *Client) PullChanges(since int64) ([]ChangeLogEntry, error) { func (c *Client) PushChanges(tasks []*engine.Task) error { // Convert tasks to JSON var taskData []json.RawMessage + var marshalErrors []string for _, task := range tasks { data, err := json.Marshal(task) if err != nil { + marshalErrors = append(marshalErrors, fmt.Sprintf("task %s: %v", task.UUID, err)) continue } taskData = append(taskData, data) } + if len(taskData) == 0 && len(marshalErrors) > 0 { + return fmt.Errorf("all tasks failed to marshal: %s", strings.Join(marshalErrors, "; ")) + } + reqBody := map[string]interface{}{ "tasks": taskData, "client_id": c.clientID, @@ -139,6 +145,11 @@ func (c *Client) PushChanges(tasks []*engine.Task) error { return fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body)) } + if len(marshalErrors) > 0 { + return fmt.Errorf("pushed %d tasks but %d failed to marshal: %s", + len(taskData), len(marshalErrors), strings.Join(marshalErrors, "; ")) + } + return nil } @@ -219,7 +230,7 @@ func (c *Client) Sync(strategy ConflictResolution, reporter ProgressReporter) (* } // Convert changes to tasks - remoteTasks, err := c.parseChanges(changes) + remoteTasks, err := c.ParseChanges(changes) if err != nil { if len(changes) > 0 { reporter.CompletePhase() @@ -283,6 +294,11 @@ func (c *Client) Sync(strategy ConflictResolution, reporter ProgressReporter) (* continue } + // Mark change_log entry as sync-originated to prevent feedback loop + if err := engine.MarkChangeLogAsSync(task.UUID.String()); err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("failed to mark change as sync for %s: %v", task.UUID, err)) + } + // Reload task to ensure we have the database ID savedTask, err := engine.GetTask(task.UUID) if err != nil { @@ -347,18 +363,7 @@ func (c *Client) Sync(strategy ConflictResolution, reporter ProgressReporter) (* // getLastSyncTime retrieves the last sync timestamp from database func (c *Client) getLastSyncTime() int64 { - db := engine.GetDB() - if db == nil { - return 0 - } - - var lastSync int64 - err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", c.clientID).Scan(&lastSync) - if err != nil { - return 0 - } - - return lastSync + return GetLastSyncTime(c.clientID) } // updateLastSyncTime updates the last sync timestamp @@ -376,6 +381,27 @@ func (c *Client) updateLastSyncTime(timestamp int64) { // getLocalChanges retrieves local changes since a timestamp func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) { + return GetLocalChanges(since) +} + +// GetLastSyncTime retrieves the last sync timestamp for a client ID from the database. +func GetLastSyncTime(clientID string) int64 { + db := engine.GetDB() + if db == nil { + return 0 + } + + var lastSync int64 + err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", clientID).Scan(&lastSync) + if err != nil { + return 0 + } + + return lastSync +} + +// GetLocalChanges retrieves local (non-sync-originated) changes since a timestamp. +func GetLocalChanges(since int64) ([]*engine.Task, error) { db := engine.GetDB() if db == nil { return nil, fmt.Errorf("database not initialized") @@ -384,7 +410,7 @@ func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) { rows, err := db.Query(` SELECT DISTINCT task_uuid FROM change_log - WHERE changed_at > ? + WHERE changed_at > ? AND source = 'local' ORDER BY changed_at ASC `, since) if err != nil { @@ -415,8 +441,8 @@ func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) { return tasks, nil } -// parseChanges converts change log entries to tasks -func (c *Client) parseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) { +// ParseChanges converts change log entries to tasks +func (c *Client) ParseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) { // Sort changes by timestamp (primary) and ID (secondary) to ensure correct order // This handles same-second updates (e.g., CREATE followed by UPDATE with tags) sort.Slice(changes, func(i, j int) bool { @@ -666,16 +692,31 @@ func parseTagsFromChangeLog(s string) []string { // pushQueuedChanges sends queued changes to server func (c *Client) pushQueuedChanges(changes []QueuedChange) error { var tasks []*engine.Task + var unmarshalErrors []string for _, change := range changes { var task engine.Task if err := json.Unmarshal(change.Data, &task); err != nil { + unmarshalErrors = append(unmarshalErrors, fmt.Sprintf("queued change: %v", err)) continue } tasks = append(tasks, &task) } - return c.PushChanges(tasks) + if len(tasks) == 0 && len(unmarshalErrors) > 0 { + return fmt.Errorf("all queued changes failed to unmarshal: %s", strings.Join(unmarshalErrors, "; ")) + } + + if err := c.PushChanges(tasks); err != nil { + return err + } + + if len(unmarshalErrors) > 0 { + return fmt.Errorf("pushed %d tasks but %d queued changes failed to unmarshal: %s", + len(tasks), len(unmarshalErrors), strings.Join(unmarshalErrors, "; ")) + } + + return nil } // SyncResult represents the result of a sync operation diff --git a/opal-task/internal/sync/strategy.go b/opal-task/internal/sync/strategy.go index 2de549e..87d0b4c 100644 --- a/opal-task/internal/sync/strategy.go +++ b/opal-task/internal/sync/strategy.go @@ -57,7 +57,11 @@ func MergeTasks(local, remote []*engine.Task, strategy ConflictResolution) ([]*e if DetectConflict(task, remoteTask) { conflicts++ winner := resolveConflict(task, remoteTask, strategy) - logConflict(task, remoteTask, winner) + winnerLabel := "local" + if winner == remoteTask { + winnerLabel = "remote" + } + logConflict(task, remoteTask, winnerLabel) result = append(result, winner) } else { // No conflict - use either (same content) @@ -110,17 +114,12 @@ func resolveConflict(local, remote *engine.Task, strategy ConflictResolution) *e } // logConflict writes conflict information to log file -func logConflict(local, remote *engine.Task, winner *engine.Task) { +func logConflict(local, remote *engine.Task, winnerLabel string) { logPath, err := engine.GetSyncConflictLogPath() if err != nil { return } - winnerLabel := "local" - if winner.UUID == remote.UUID && winner.Modified.Equal(remote.Modified) { - winnerLabel = "remote" - } - entry := fmt.Sprintf( "[%s] Conflict on task %s\n"+ " Local: modified %s - %s\n"+