Files
gems/opal-task/internal/api/handlers/sync.go
T
joakim a11f452d3b fix: break sync feedback loop, respect timestamps, surface errors
- Add migration v2: source column on change_log to distinguish local
  vs sync-originated entries, preventing the echo loop where synced
  tasks get re-pushed as local changes
- PushChanges handler now skips save when server version is newer
- Client PushChanges/pushQueuedChanges collect and report marshal errors
  instead of silently dropping them
- De-duplicate getLocalChanges/getLastSyncTime into exported sync
  package functions
- Fix logConflict winner detection via pointer identity instead of
  fragile UUID+timestamp comparison
- Fix sync down to actually parse, save, and tag-sync pulled changes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 01:11:04 +01:00

173 lines
4.1 KiB
Go

package handlers
import (
"encoding/json"
"net/http"
"git.jnss.me/joakim/opal/internal/engine"
)
// GetChangesRequest represents the request for getting changes
type GetChangesRequest struct {
Since int64 `json:"since"`
ClientID string `json:"client_id"`
}
// GetChanges returns tasks that have changed since a given timestamp
func GetChanges(w http.ResponseWriter, r *http.Request) {
var req GetChangesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
errorResponse(w, http.StatusBadRequest, "invalid request body")
return
}
// Query change_log for entries since timestamp
db := engine.GetDB()
if db == nil {
errorResponse(w, http.StatusInternalServerError, "database not initialized")
return
}
rows, err := db.Query(`
SELECT id, task_uuid, change_type, changed_at, data
FROM change_log
WHERE changed_at > ?
ORDER BY id ASC
`, req.Since)
if err != nil {
errorResponse(w, http.StatusInternalServerError, err.Error())
return
}
defer rows.Close()
type Change struct {
ID int64 `json:"id"`
TaskUUID string `json:"task_uuid"`
ChangeType string `json:"change_type"`
ChangedAt int64 `json:"changed_at"`
Data string `json:"data"`
}
var changes []Change
for rows.Next() {
var change Change
if err := rows.Scan(&change.ID, &change.TaskUUID, &change.ChangeType, &change.ChangedAt, &change.Data); err != nil {
errorResponse(w, http.StatusInternalServerError, err.Error())
return
}
changes = append(changes, change)
}
// Update sync_state for client
_, _ = db.Exec(`
INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id)
VALUES (?, ?, ?)
`, req.ClientID, engine.GetCurrentTimestamp(), 0)
jsonResponse(w, http.StatusOK, changes)
}
// PushChangesRequest represents the request for pushing changes
type PushChangesRequest struct {
Tasks []json.RawMessage `json:"tasks"`
ClientID string `json:"client_id"`
}
// PushChanges accepts task changes from clients
func PushChanges(w http.ResponseWriter, r *http.Request) {
var req PushChangesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
errorResponse(w, http.StatusBadRequest, "invalid request body")
return
}
db := engine.GetDB()
if db == nil {
errorResponse(w, http.StatusInternalServerError, "database not initialized")
return
}
// For each task, parse and apply (last-write-wins for now)
var processed int
var conflicts int
for _, taskData := range req.Tasks {
var task engine.Task
if err := json.Unmarshal(taskData, &task); err != nil {
continue
}
// Check if task exists
existing, err := engine.GetTask(task.UUID)
if err != nil {
// Task doesn't exist - create it
if err := task.Save(); err != nil {
continue
}
// Mark as sync-originated to prevent feedback loop
_ = engine.MarkChangeLogAsSync(task.UUID.String())
// Add tags
for _, tag := range task.Tags {
_ = task.AddTag(tag)
}
processed++
continue
}
// Task exists - check timestamps for conflicts
if existing.Modified.Unix() > task.Modified.Unix() {
// Server version is newer - skip this push
conflicts++
continue
}
// Apply changes (client is newer or equal)
task.ID = existing.ID // Preserve database ID
if err := task.Save(); err != nil {
continue
}
// Mark as sync-originated to prevent feedback loop
_ = engine.MarkChangeLogAsSync(task.UUID.String())
// Sync tags
existingTags := make(map[string]bool)
for _, tag := range existing.Tags {
existingTags[tag] = true
}
// Add new tags
for _, tag := range task.Tags {
if !existingTags[tag] {
_ = task.AddTag(tag)
}
}
// Remove missing tags
for _, tag := range existing.Tags {
found := false
for _, newTag := range task.Tags {
if tag == newTag {
found = true
break
}
}
if !found {
_ = task.RemoveTag(tag)
}
}
processed++
}
// Update sync_state
_, _ = db.Exec(`
INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id)
VALUES (?, ?, ?)
`, req.ClientID, engine.GetCurrentTimestamp(), 0)
jsonResponse(w, http.StatusOK, map[string]interface{}{
"processed": processed,
"conflicts": conflicts,
})
}