a11f452d3b
- Add migration v2: source column on change_log to distinguish local vs sync-originated entries, preventing the echo loop where synced tasks get re-pushed as local changes - PushChanges handler now skips save when server version is newer - Client PushChanges/pushQueuedChanges collect and report marshal errors instead of silently dropping them - De-duplicate getLocalChanges/getLastSyncTime into exported sync package functions - Fix logConflict winner detection via pointer identity instead of fragile UUID+timestamp comparison - Fix sync down to actually parse, save, and tag-sync pulled changes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
173 lines
4.1 KiB
Go
173 lines
4.1 KiB
Go
package handlers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
|
|
"git.jnss.me/joakim/opal/internal/engine"
|
|
)
|
|
|
|
// GetChangesRequest represents the request for getting changes
|
|
type GetChangesRequest struct {
|
|
Since int64 `json:"since"`
|
|
ClientID string `json:"client_id"`
|
|
}
|
|
|
|
// GetChanges returns tasks that have changed since a given timestamp
|
|
func GetChanges(w http.ResponseWriter, r *http.Request) {
|
|
var req GetChangesRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
errorResponse(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
// Query change_log for entries since timestamp
|
|
db := engine.GetDB()
|
|
if db == nil {
|
|
errorResponse(w, http.StatusInternalServerError, "database not initialized")
|
|
return
|
|
}
|
|
|
|
rows, err := db.Query(`
|
|
SELECT id, task_uuid, change_type, changed_at, data
|
|
FROM change_log
|
|
WHERE changed_at > ?
|
|
ORDER BY id ASC
|
|
`, req.Since)
|
|
if err != nil {
|
|
errorResponse(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
type Change struct {
|
|
ID int64 `json:"id"`
|
|
TaskUUID string `json:"task_uuid"`
|
|
ChangeType string `json:"change_type"`
|
|
ChangedAt int64 `json:"changed_at"`
|
|
Data string `json:"data"`
|
|
}
|
|
|
|
var changes []Change
|
|
for rows.Next() {
|
|
var change Change
|
|
if err := rows.Scan(&change.ID, &change.TaskUUID, &change.ChangeType, &change.ChangedAt, &change.Data); err != nil {
|
|
errorResponse(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
changes = append(changes, change)
|
|
}
|
|
|
|
// Update sync_state for client
|
|
_, _ = db.Exec(`
|
|
INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id)
|
|
VALUES (?, ?, ?)
|
|
`, req.ClientID, engine.GetCurrentTimestamp(), 0)
|
|
|
|
jsonResponse(w, http.StatusOK, changes)
|
|
}
|
|
|
|
// PushChangesRequest represents the request for pushing changes
|
|
type PushChangesRequest struct {
|
|
Tasks []json.RawMessage `json:"tasks"`
|
|
ClientID string `json:"client_id"`
|
|
}
|
|
|
|
// PushChanges accepts task changes from clients
|
|
func PushChanges(w http.ResponseWriter, r *http.Request) {
|
|
var req PushChangesRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
errorResponse(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
db := engine.GetDB()
|
|
if db == nil {
|
|
errorResponse(w, http.StatusInternalServerError, "database not initialized")
|
|
return
|
|
}
|
|
|
|
// For each task, parse and apply (last-write-wins for now)
|
|
var processed int
|
|
var conflicts int
|
|
|
|
for _, taskData := range req.Tasks {
|
|
var task engine.Task
|
|
if err := json.Unmarshal(taskData, &task); err != nil {
|
|
continue
|
|
}
|
|
|
|
// Check if task exists
|
|
existing, err := engine.GetTask(task.UUID)
|
|
if err != nil {
|
|
// Task doesn't exist - create it
|
|
if err := task.Save(); err != nil {
|
|
continue
|
|
}
|
|
// Mark as sync-originated to prevent feedback loop
|
|
_ = engine.MarkChangeLogAsSync(task.UUID.String())
|
|
// Add tags
|
|
for _, tag := range task.Tags {
|
|
_ = task.AddTag(tag)
|
|
}
|
|
processed++
|
|
continue
|
|
}
|
|
|
|
// Task exists - check timestamps for conflicts
|
|
if existing.Modified.Unix() > task.Modified.Unix() {
|
|
// Server version is newer - skip this push
|
|
conflicts++
|
|
continue
|
|
}
|
|
|
|
// Apply changes (client is newer or equal)
|
|
task.ID = existing.ID // Preserve database ID
|
|
if err := task.Save(); err != nil {
|
|
continue
|
|
}
|
|
// Mark as sync-originated to prevent feedback loop
|
|
_ = engine.MarkChangeLogAsSync(task.UUID.String())
|
|
|
|
// Sync tags
|
|
existingTags := make(map[string]bool)
|
|
for _, tag := range existing.Tags {
|
|
existingTags[tag] = true
|
|
}
|
|
|
|
// Add new tags
|
|
for _, tag := range task.Tags {
|
|
if !existingTags[tag] {
|
|
_ = task.AddTag(tag)
|
|
}
|
|
}
|
|
|
|
// Remove missing tags
|
|
for _, tag := range existing.Tags {
|
|
found := false
|
|
for _, newTag := range task.Tags {
|
|
if tag == newTag {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
_ = task.RemoveTag(tag)
|
|
}
|
|
}
|
|
|
|
processed++
|
|
}
|
|
|
|
// Update sync_state
|
|
_, _ = db.Exec(`
|
|
INSERT OR REPLACE INTO sync_state (client_id, last_sync, last_change_id)
|
|
VALUES (?, ?, ?)
|
|
`, req.ClientID, engine.GetCurrentTimestamp(), 0)
|
|
|
|
jsonResponse(w, http.StatusOK, map[string]interface{}{
|
|
"processed": processed,
|
|
"conflicts": conflicts,
|
|
})
|
|
}
|