fix: break sync feedback loop, respect timestamps, surface errors
- Add migration v2: source column on change_log to distinguish local vs sync-originated entries, preventing the echo loop where synced tasks get re-pushed as local changes - PushChanges handler now skips save when server version is newer - Client PushChanges/pushQueuedChanges collect and report marshal errors instead of silently dropping them - De-duplicate getLocalChanges/getLastSyncTime into exported sync package functions - Fix logConflict winner detection via pointer identity instead of fragile UUID+timestamp comparison - Fix sync down to actually parse, save, and tag-sync pulled changes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+52
-61
@@ -224,8 +224,8 @@ var syncUpCmd = &cobra.Command{
|
|||||||
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
|
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
|
||||||
|
|
||||||
// Get local changes
|
// Get local changes
|
||||||
lastSync := getLastSyncTime(cfg.SyncClientID)
|
lastSync := sync.GetLastSyncTime(cfg.SyncClientID)
|
||||||
localChanges, err := getLocalChanges(lastSync)
|
localChanges, err := sync.GetLocalChanges(lastSync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "Error getting local changes: %v\n", err)
|
fmt.Fprintf(os.Stderr, "Error getting local changes: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@@ -264,7 +264,7 @@ var syncDownCmd = &cobra.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
|
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
|
||||||
lastSync := getLastSyncTime(cfg.SyncClientID)
|
lastSync := sync.GetLastSyncTime(cfg.SyncClientID)
|
||||||
|
|
||||||
changes, err := client.PullChanges(lastSync)
|
changes, err := client.PullChanges(lastSync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -277,7 +277,55 @@ var syncDownCmd = &cobra.Command{
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("✓ Pulled %d changes from server\n", len(changes))
|
// Parse changes into tasks
|
||||||
|
tasks, err := client.ParseChanges(changes)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error parsing changes: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply each task locally
|
||||||
|
var applied int
|
||||||
|
for _, task := range tasks {
|
||||||
|
if err := task.Save(); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to save task %s: %v\n", task.UUID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark as sync-originated to prevent feedback loop
|
||||||
|
_ = engine.MarkChangeLogAsSync(task.UUID.String())
|
||||||
|
|
||||||
|
// Sync tags
|
||||||
|
savedTask, err := engine.GetTask(task.UUID)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to reload task %s: %v\n", task.UUID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
currentTags, _ := savedTask.GetTags()
|
||||||
|
currentSet := make(map[string]bool)
|
||||||
|
for _, tag := range currentTags {
|
||||||
|
currentSet[tag] = true
|
||||||
|
}
|
||||||
|
desiredSet := make(map[string]bool)
|
||||||
|
for _, tag := range task.Tags {
|
||||||
|
desiredSet[tag] = true
|
||||||
|
}
|
||||||
|
for tag := range currentSet {
|
||||||
|
if !desiredSet[tag] {
|
||||||
|
savedTask.RemoveTag(tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for tag := range desiredSet {
|
||||||
|
if !currentSet[tag] {
|
||||||
|
savedTask.AddTag(tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
applied++
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("✓ Pulled %d changes, applied %d tasks from server\n", len(changes), applied)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -414,63 +462,6 @@ func init() {
|
|||||||
syncCmd.PersistentFlags().BoolVarP(&quietFlag, "quiet", "q", false, "Suppress progress output")
|
syncCmd.PersistentFlags().BoolVarP(&quietFlag, "quiet", "q", false, "Suppress progress output")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper functions
|
|
||||||
|
|
||||||
func getLastSyncTime(clientID string) int64 {
|
|
||||||
db := engine.GetDB()
|
|
||||||
if db == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
var lastSync int64
|
|
||||||
err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", clientID).Scan(&lastSync)
|
|
||||||
if err != nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return lastSync
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLocalChanges(since int64) ([]*engine.Task, error) {
|
|
||||||
db := engine.GetDB()
|
|
||||||
if db == nil {
|
|
||||||
return nil, fmt.Errorf("database not initialized")
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := db.Query(`
|
|
||||||
SELECT DISTINCT task_uuid
|
|
||||||
FROM change_log
|
|
||||||
WHERE changed_at > ?
|
|
||||||
ORDER BY changed_at ASC
|
|
||||||
`, since)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var tasks []*engine.Task
|
|
||||||
for rows.Next() {
|
|
||||||
var uuidStr string
|
|
||||||
if err := rows.Scan(&uuidStr); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
taskUUID, err := uuid.Parse(uuidStr)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
task, err := engine.GetTask(taskUUID)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks = append(tasks, task)
|
|
||||||
}
|
|
||||||
|
|
||||||
return tasks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func formatTimestamp(ts int64) string {
|
func formatTimestamp(ts int64) string {
|
||||||
t := time.Unix(ts, 0)
|
t := time.Unix(ts, 0)
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|||||||
@@ -104,6 +104,8 @@ func PushChanges(w http.ResponseWriter, r *http.Request) {
|
|||||||
if err := task.Save(); err != nil {
|
if err := task.Save(); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Mark as sync-originated to prevent feedback loop
|
||||||
|
_ = engine.MarkChangeLogAsSync(task.UUID.String())
|
||||||
// Add tags
|
// Add tags
|
||||||
for _, tag := range task.Tags {
|
for _, tag := range task.Tags {
|
||||||
_ = task.AddTag(tag)
|
_ = task.AddTag(tag)
|
||||||
@@ -114,15 +116,18 @@ func PushChanges(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// Task exists - check timestamps for conflicts
|
// Task exists - check timestamps for conflicts
|
||||||
if existing.Modified.Unix() > task.Modified.Unix() {
|
if existing.Modified.Unix() > task.Modified.Unix() {
|
||||||
// Server version is newer - conflict (but we'll apply last-write-wins)
|
// Server version is newer - skip this push
|
||||||
conflicts++
|
conflicts++
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply changes (last-write-wins)
|
// Apply changes (client is newer or equal)
|
||||||
task.ID = existing.ID // Preserve database ID
|
task.ID = existing.ID // Preserve database ID
|
||||||
if err := task.Save(); err != nil {
|
if err := task.Save(); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Mark as sync-originated to prevent feedback loop
|
||||||
|
_ = engine.MarkChangeLogAsSync(task.UUID.String())
|
||||||
|
|
||||||
// Sync tags
|
// Sync tags
|
||||||
existingTags := make(map[string]bool)
|
existingTags := make(map[string]bool)
|
||||||
|
|||||||
@@ -307,6 +307,13 @@ func runMigrations() error {
|
|||||||
END;
|
END;
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
version: 2,
|
||||||
|
sql: `
|
||||||
|
ALTER TABLE change_log ADD COLUMN source TEXT NOT NULL DEFAULT 'local';
|
||||||
|
CREATE INDEX idx_change_log_source ON change_log(source);
|
||||||
|
`,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply pending migrations
|
// Apply pending migrations
|
||||||
@@ -409,3 +416,24 @@ func SetChangeLogRetentionDays(days int) error {
|
|||||||
_, err := db.Exec("INSERT OR REPLACE INTO sync_config (key, value) VALUES ('change_log_retention_days', ?)", days)
|
_, err := db.Exec("INSERT OR REPLACE INTO sync_config (key, value) VALUES ('change_log_retention_days', ?)", days)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarkChangeLogAsSync marks the most recent change_log entry for a task UUID
|
||||||
|
// as originating from sync (not local), preventing the feedback loop where
|
||||||
|
// synced changes get re-pushed as local changes.
|
||||||
|
func MarkChangeLogAsSync(taskUUID string) error {
|
||||||
|
db := GetDB()
|
||||||
|
if db == nil {
|
||||||
|
return fmt.Errorf("database not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := db.Exec(`
|
||||||
|
UPDATE change_log SET source = 'sync'
|
||||||
|
WHERE id = (
|
||||||
|
SELECT id FROM change_log
|
||||||
|
WHERE task_uuid = ?
|
||||||
|
ORDER BY id DESC
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
`, taskUUID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|||||||
@@ -102,14 +102,20 @@ func (c *Client) PullChanges(since int64) ([]ChangeLogEntry, error) {
|
|||||||
func (c *Client) PushChanges(tasks []*engine.Task) error {
|
func (c *Client) PushChanges(tasks []*engine.Task) error {
|
||||||
// Convert tasks to JSON
|
// Convert tasks to JSON
|
||||||
var taskData []json.RawMessage
|
var taskData []json.RawMessage
|
||||||
|
var marshalErrors []string
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
data, err := json.Marshal(task)
|
data, err := json.Marshal(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
marshalErrors = append(marshalErrors, fmt.Sprintf("task %s: %v", task.UUID, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
taskData = append(taskData, data)
|
taskData = append(taskData, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(taskData) == 0 && len(marshalErrors) > 0 {
|
||||||
|
return fmt.Errorf("all tasks failed to marshal: %s", strings.Join(marshalErrors, "; "))
|
||||||
|
}
|
||||||
|
|
||||||
reqBody := map[string]interface{}{
|
reqBody := map[string]interface{}{
|
||||||
"tasks": taskData,
|
"tasks": taskData,
|
||||||
"client_id": c.clientID,
|
"client_id": c.clientID,
|
||||||
@@ -139,6 +145,11 @@ func (c *Client) PushChanges(tasks []*engine.Task) error {
|
|||||||
return fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body))
|
return fmt.Errorf("server returned %d: %s", resp.StatusCode, string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(marshalErrors) > 0 {
|
||||||
|
return fmt.Errorf("pushed %d tasks but %d failed to marshal: %s",
|
||||||
|
len(taskData), len(marshalErrors), strings.Join(marshalErrors, "; "))
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -219,7 +230,7 @@ func (c *Client) Sync(strategy ConflictResolution, reporter ProgressReporter) (*
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Convert changes to tasks
|
// Convert changes to tasks
|
||||||
remoteTasks, err := c.parseChanges(changes)
|
remoteTasks, err := c.ParseChanges(changes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if len(changes) > 0 {
|
if len(changes) > 0 {
|
||||||
reporter.CompletePhase()
|
reporter.CompletePhase()
|
||||||
@@ -283,6 +294,11 @@ func (c *Client) Sync(strategy ConflictResolution, reporter ProgressReporter) (*
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark change_log entry as sync-originated to prevent feedback loop
|
||||||
|
if err := engine.MarkChangeLogAsSync(task.UUID.String()); err != nil {
|
||||||
|
result.Errors = append(result.Errors, fmt.Sprintf("failed to mark change as sync for %s: %v", task.UUID, err))
|
||||||
|
}
|
||||||
|
|
||||||
// Reload task to ensure we have the database ID
|
// Reload task to ensure we have the database ID
|
||||||
savedTask, err := engine.GetTask(task.UUID)
|
savedTask, err := engine.GetTask(task.UUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -347,18 +363,7 @@ func (c *Client) Sync(strategy ConflictResolution, reporter ProgressReporter) (*
|
|||||||
|
|
||||||
// getLastSyncTime retrieves the last sync timestamp from database
|
// getLastSyncTime retrieves the last sync timestamp from database
|
||||||
func (c *Client) getLastSyncTime() int64 {
|
func (c *Client) getLastSyncTime() int64 {
|
||||||
db := engine.GetDB()
|
return GetLastSyncTime(c.clientID)
|
||||||
if db == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
var lastSync int64
|
|
||||||
err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", c.clientID).Scan(&lastSync)
|
|
||||||
if err != nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return lastSync
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateLastSyncTime updates the last sync timestamp
|
// updateLastSyncTime updates the last sync timestamp
|
||||||
@@ -376,6 +381,27 @@ func (c *Client) updateLastSyncTime(timestamp int64) {
|
|||||||
|
|
||||||
// getLocalChanges retrieves local changes since a timestamp
|
// getLocalChanges retrieves local changes since a timestamp
|
||||||
func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) {
|
func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) {
|
||||||
|
return GetLocalChanges(since)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastSyncTime retrieves the last sync timestamp for a client ID from the database.
|
||||||
|
func GetLastSyncTime(clientID string) int64 {
|
||||||
|
db := engine.GetDB()
|
||||||
|
if db == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastSync int64
|
||||||
|
err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", clientID).Scan(&lastSync)
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return lastSync
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLocalChanges retrieves local (non-sync-originated) changes since a timestamp.
|
||||||
|
func GetLocalChanges(since int64) ([]*engine.Task, error) {
|
||||||
db := engine.GetDB()
|
db := engine.GetDB()
|
||||||
if db == nil {
|
if db == nil {
|
||||||
return nil, fmt.Errorf("database not initialized")
|
return nil, fmt.Errorf("database not initialized")
|
||||||
@@ -384,7 +410,7 @@ func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) {
|
|||||||
rows, err := db.Query(`
|
rows, err := db.Query(`
|
||||||
SELECT DISTINCT task_uuid
|
SELECT DISTINCT task_uuid
|
||||||
FROM change_log
|
FROM change_log
|
||||||
WHERE changed_at > ?
|
WHERE changed_at > ? AND source = 'local'
|
||||||
ORDER BY changed_at ASC
|
ORDER BY changed_at ASC
|
||||||
`, since)
|
`, since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -415,8 +441,8 @@ func (c *Client) getLocalChanges(since int64) ([]*engine.Task, error) {
|
|||||||
return tasks, nil
|
return tasks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseChanges converts change log entries to tasks
|
// ParseChanges converts change log entries to tasks
|
||||||
func (c *Client) parseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) {
|
func (c *Client) ParseChanges(changes []ChangeLogEntry) ([]*engine.Task, error) {
|
||||||
// Sort changes by timestamp (primary) and ID (secondary) to ensure correct order
|
// Sort changes by timestamp (primary) and ID (secondary) to ensure correct order
|
||||||
// This handles same-second updates (e.g., CREATE followed by UPDATE with tags)
|
// This handles same-second updates (e.g., CREATE followed by UPDATE with tags)
|
||||||
sort.Slice(changes, func(i, j int) bool {
|
sort.Slice(changes, func(i, j int) bool {
|
||||||
@@ -666,16 +692,31 @@ func parseTagsFromChangeLog(s string) []string {
|
|||||||
// pushQueuedChanges sends queued changes to server
|
// pushQueuedChanges sends queued changes to server
|
||||||
func (c *Client) pushQueuedChanges(changes []QueuedChange) error {
|
func (c *Client) pushQueuedChanges(changes []QueuedChange) error {
|
||||||
var tasks []*engine.Task
|
var tasks []*engine.Task
|
||||||
|
var unmarshalErrors []string
|
||||||
|
|
||||||
for _, change := range changes {
|
for _, change := range changes {
|
||||||
var task engine.Task
|
var task engine.Task
|
||||||
if err := json.Unmarshal(change.Data, &task); err != nil {
|
if err := json.Unmarshal(change.Data, &task); err != nil {
|
||||||
|
unmarshalErrors = append(unmarshalErrors, fmt.Sprintf("queued change: %v", err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tasks = append(tasks, &task)
|
tasks = append(tasks, &task)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.PushChanges(tasks)
|
if len(tasks) == 0 && len(unmarshalErrors) > 0 {
|
||||||
|
return fmt.Errorf("all queued changes failed to unmarshal: %s", strings.Join(unmarshalErrors, "; "))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.PushChanges(tasks); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(unmarshalErrors) > 0 {
|
||||||
|
return fmt.Errorf("pushed %d tasks but %d queued changes failed to unmarshal: %s",
|
||||||
|
len(tasks), len(unmarshalErrors), strings.Join(unmarshalErrors, "; "))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncResult represents the result of a sync operation
|
// SyncResult represents the result of a sync operation
|
||||||
|
|||||||
@@ -57,7 +57,11 @@ func MergeTasks(local, remote []*engine.Task, strategy ConflictResolution) ([]*e
|
|||||||
if DetectConflict(task, remoteTask) {
|
if DetectConflict(task, remoteTask) {
|
||||||
conflicts++
|
conflicts++
|
||||||
winner := resolveConflict(task, remoteTask, strategy)
|
winner := resolveConflict(task, remoteTask, strategy)
|
||||||
logConflict(task, remoteTask, winner)
|
winnerLabel := "local"
|
||||||
|
if winner == remoteTask {
|
||||||
|
winnerLabel = "remote"
|
||||||
|
}
|
||||||
|
logConflict(task, remoteTask, winnerLabel)
|
||||||
result = append(result, winner)
|
result = append(result, winner)
|
||||||
} else {
|
} else {
|
||||||
// No conflict - use either (same content)
|
// No conflict - use either (same content)
|
||||||
@@ -110,17 +114,12 @@ func resolveConflict(local, remote *engine.Task, strategy ConflictResolution) *e
|
|||||||
}
|
}
|
||||||
|
|
||||||
// logConflict writes conflict information to log file
|
// logConflict writes conflict information to log file
|
||||||
func logConflict(local, remote *engine.Task, winner *engine.Task) {
|
func logConflict(local, remote *engine.Task, winnerLabel string) {
|
||||||
logPath, err := engine.GetSyncConflictLogPath()
|
logPath, err := engine.GetSyncConflictLogPath()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
winnerLabel := "local"
|
|
||||||
if winner.UUID == remote.UUID && winner.Modified.Equal(remote.Modified) {
|
|
||||||
winnerLabel = "remote"
|
|
||||||
}
|
|
||||||
|
|
||||||
entry := fmt.Sprintf(
|
entry := fmt.Sprintf(
|
||||||
"[%s] Conflict on task %s\n"+
|
"[%s] Conflict on task %s\n"+
|
||||||
" Local: modified %s - %s\n"+
|
" Local: modified %s - %s\n"+
|
||||||
|
|||||||
Reference in New Issue
Block a user