Files
gems/opal-task/cmd/sync.go
T
joakim 1c3186a342 feat: Add visual progress indicators to sync operations
- Implement ProgressReporter interface with InteractiveProgress and NoOpProgress
- Add real-time progress bars using go-pretty/progress library
- Track 6 sync phases: connection test, queue push, pull, parse, apply, and server push
- Add --quiet flag to suppress progress output
- Auto-detect TTY to disable progress when piped/redirected
- Show task-level progress during apply phase with descriptions
- Display percentage complete and elapsed time for each phase
2026-01-05 23:20:27 +01:00

477 lines
12 KiB
Go

package cmd
import (
"bufio"
"fmt"
"os"
"strings"
"git.jnss.me/joakim/opal/internal/engine"
"git.jnss.me/joakim/opal/internal/sync"
"github.com/google/uuid"
"github.com/spf13/cobra"
)
var (
quietFlag bool
)
var syncCmd = &cobra.Command{
Use: "sync",
Short: "Sync tasks with remote server",
Long: `Manage synchronization with the opal API server`,
}
// opal sync init
var syncInitCmd = &cobra.Command{
Use: "init",
Short: "Configure sync with server",
Long: `Initialize sync configuration with the opal server.
This will prompt for server URL and API key, then save the configuration.
Examples:
opal sync init
opal sync init --url https://opal.example.com --key oak_xxx`,
Run: func(cmd *cobra.Command, args []string) {
url, _ := cmd.Flags().GetString("url")
key, _ := cmd.Flags().GetString("key")
// Load existing config
cfg, err := engine.GetConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading config: %v\n", err)
os.Exit(1)
}
// Prompt for URL if not provided
if url == "" {
fmt.Print("Enter server URL: ")
reader := bufio.NewReader(os.Stdin)
url, _ = reader.ReadString('\n')
url = strings.TrimSpace(url)
}
// Prompt for API key if not provided
if key == "" {
fmt.Print("Enter API key: ")
reader := bufio.NewReader(os.Stdin)
key, _ = reader.ReadString('\n')
key = strings.TrimSpace(key)
}
// Generate client ID if not exists
clientID := cfg.SyncClientID
if clientID == "" {
clientID = uuid.New().String()
}
// Update config
cfg.SyncEnabled = true
cfg.SyncURL = url
cfg.SyncAPIKey = key
cfg.SyncClientID = clientID
cfg.SyncStrategy = "last-write-wins"
cfg.SyncQueueOffline = true
// Save config
if err := engine.SaveConfig(cfg); err != nil {
fmt.Fprintf(os.Stderr, "Error saving config: %v\n", err)
os.Exit(1)
}
// Test connection
client := sync.NewClient(url, key, clientID)
fmt.Println("\n✓ Sync configuration saved")
fmt.Printf(" URL: %s\n", url)
fmt.Printf(" Client ID: %s\n", clientID)
fmt.Printf(" Strategy: %s\n", cfg.SyncStrategy)
// Offer to sync now
fmt.Print("\nRun initial sync now? [y/N]: ")
reader := bufio.NewReader(os.Stdin)
response, _ := reader.ReadString('\n')
response = strings.TrimSpace(strings.ToLower(response))
if response == "y" || response == "yes" {
strategy := sync.ParseStrategy(cfg.SyncStrategy)
// Create progress reporter
var reporter sync.ProgressReporter
if sync.ShouldShowProgress(quietFlag) {
reporter = sync.NewInteractiveProgress(os.Stdout)
} else {
reporter = &sync.NoOpProgress{}
}
result, err := client.Sync(strategy, reporter)
if err != nil {
fmt.Fprintf(os.Stderr, "Error syncing: %v\n", err)
os.Exit(1)
}
result.Display()
}
},
}
// opal sync status
var syncStatusCmd = &cobra.Command{
Use: "status",
Short: "Show sync status",
Long: `Display current sync configuration and status`,
Run: func(cmd *cobra.Command, args []string) {
cfg, err := engine.GetConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading config: %v\n", err)
os.Exit(1)
}
if !cfg.SyncEnabled {
fmt.Println("Sync is not configured.")
fmt.Println("Run 'opal sync init' to configure sync.")
return
}
fmt.Println("Sync Configuration")
fmt.Println("━━━━━━━━━━━━━━━━━━")
fmt.Printf("Enabled: %v\n", cfg.SyncEnabled)
fmt.Printf("URL: %s\n", cfg.SyncURL)
fmt.Printf("Client ID: %s\n", cfg.SyncClientID)
fmt.Printf("Strategy: %s\n", cfg.SyncStrategy)
fmt.Printf("Queue offline: %v\n", cfg.SyncQueueOffline)
// Check queue status
queue, err := sync.NewQueue()
if err == nil && queue.Size() > 0 {
fmt.Printf("\n⚠️ %d changes queued for sync\n", queue.Size())
}
// Get last sync time
db := engine.GetDB()
if db != nil {
var lastSync int64
err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", cfg.SyncClientID).Scan(&lastSync)
if err == nil && lastSync > 0 {
fmt.Printf("\nLast sync: %s\n", formatTimestamp(lastSync))
}
}
// Test connectivity
fmt.Print("\nTesting connection... ")
_ = sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
// Simple connectivity test - we can't call testConnection directly, so we'll just show config
fmt.Println("(run 'opal sync now' to test)")
},
}
// opal sync now (default)
var syncNowCmd = &cobra.Command{
Use: "now",
Short: "Sync with server now",
Long: `Perform bidirectional sync with the server`,
Run: func(cmd *cobra.Command, args []string) {
cfg, err := engine.GetConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading config: %v\n", err)
os.Exit(1)
}
if !cfg.SyncEnabled {
fmt.Println("Sync is not configured.")
fmt.Println("Run 'opal sync init' to configure sync.")
os.Exit(1)
}
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
strategy := sync.ParseStrategy(cfg.SyncStrategy)
// Create progress reporter
var reporter sync.ProgressReporter
if sync.ShouldShowProgress(quietFlag) {
reporter = sync.NewInteractiveProgress(os.Stdout)
} else {
reporter = &sync.NoOpProgress{}
}
result, err := client.Sync(strategy, reporter)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
result.Display()
},
}
// opal sync up
var syncUpCmd = &cobra.Command{
Use: "up",
Short: "Push local changes to server",
Long: `Upload local changes to the server`,
Run: func(cmd *cobra.Command, args []string) {
cfg, err := engine.GetConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading config: %v\n", err)
os.Exit(1)
}
if !cfg.SyncEnabled {
fmt.Println("Sync is not configured.")
os.Exit(1)
}
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
// Get local changes
lastSync := getLastSyncTime(cfg.SyncClientID)
localChanges, err := getLocalChanges(lastSync)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting local changes: %v\n", err)
os.Exit(1)
}
if len(localChanges) == 0 {
fmt.Println("No local changes to push")
return
}
fmt.Printf("Pushing %d changes...\n", len(localChanges))
if err := client.PushChanges(localChanges); err != nil {
fmt.Fprintf(os.Stderr, "Error pushing changes: %v\n", err)
os.Exit(1)
}
fmt.Println("✓ Changes pushed successfully")
},
}
// opal sync down
var syncDownCmd = &cobra.Command{
Use: "down",
Short: "Pull server changes to local",
Long: `Download changes from the server`,
Run: func(cmd *cobra.Command, args []string) {
cfg, err := engine.GetConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading config: %v\n", err)
os.Exit(1)
}
if !cfg.SyncEnabled {
fmt.Println("Sync is not configured.")
os.Exit(1)
}
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
lastSync := getLastSyncTime(cfg.SyncClientID)
changes, err := client.PullChanges(lastSync)
if err != nil {
fmt.Fprintf(os.Stderr, "Error pulling changes: %v\n", err)
os.Exit(1)
}
if len(changes) == 0 {
fmt.Println("No changes from server")
return
}
fmt.Printf("✓ Pulled %d changes from server\n", len(changes))
},
}
// opal sync log
var syncLogCmd = &cobra.Command{
Use: "log",
Short: "Show conflict resolution log",
Long: `Display the log of sync conflicts and how they were resolved`,
Run: func(cmd *cobra.Command, args []string) {
configDir, err := engine.GetConfigDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
logPath := fmt.Sprintf("%s/sync_conflicts.log", configDir)
data, err := os.ReadFile(logPath)
if err != nil {
if os.IsNotExist(err) {
fmt.Println("No conflicts logged yet.")
return
}
fmt.Fprintf(os.Stderr, "Error reading log: %v\n", err)
os.Exit(1)
}
fmt.Print(string(data))
},
}
// opal sync merge
var syncMergeCmd = &cobra.Command{
Use: "merge",
Short: "Initial database merge",
Long: `Merge local database with server database for first-time sync.
This is used when connecting an existing local database to a server for the first time.
Strategies:
--prefer-local: Upload all local tasks, merge server tasks
--prefer-server: Download all server tasks, merge local tasks
--smart: Merge by UUID, add unique tasks from both sides (default)
Examples:
opal sync merge
opal sync merge --prefer-local
opal sync merge --prefer-server`,
Run: func(cmd *cobra.Command, args []string) {
cfg, err := engine.GetConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading config: %v\n", err)
os.Exit(1)
}
if !cfg.SyncEnabled {
fmt.Println("Sync is not configured.")
fmt.Println("Run 'opal sync init' to configure sync.")
os.Exit(1)
}
preferLocal, _ := cmd.Flags().GetBool("prefer-local")
preferServer, _ := cmd.Flags().GetBool("prefer-server")
strategy := sync.LastWriteWins // Default: smart merge
if preferLocal {
strategy = sync.ClientWins
} else if preferServer {
strategy = sync.ServerWins
}
fmt.Println("Performing initial merge...")
fmt.Printf("Strategy: %s\n\n", sync.StrategyString(strategy))
// Get all local tasks (not just recent changes)
filter := engine.DefaultFilter()
localTasks, err := engine.GetTasks(filter)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting local tasks: %v\n", err)
os.Exit(1)
}
fmt.Printf("Found %d local tasks\n", len(localTasks))
// Pull all changes from server
client := sync.NewClient(cfg.SyncURL, cfg.SyncAPIKey, cfg.SyncClientID)
changes, err := client.PullChanges(0) // Get all changes (since epoch)
if err != nil {
fmt.Fprintf(os.Stderr, "Error pulling from server: %v\n", err)
os.Exit(1)
}
fmt.Printf("Found %d server changes\n", len(changes))
// Parse server tasks
// For initial merge, we'll do a full sync
// Create progress reporter
var reporter sync.ProgressReporter
if sync.ShouldShowProgress(quietFlag) {
reporter = sync.NewInteractiveProgress(os.Stdout)
} else {
reporter = &sync.NoOpProgress{}
}
result, err := client.Sync(strategy, reporter)
if err != nil {
fmt.Fprintf(os.Stderr, "Error during merge: %v\n", err)
os.Exit(1)
}
fmt.Println("\n✓ Initial merge completed")
result.Display()
},
}
func init() {
rootCmd.AddCommand(syncCmd)
syncCmd.AddCommand(syncInitCmd)
syncCmd.AddCommand(syncStatusCmd)
syncCmd.AddCommand(syncNowCmd)
syncCmd.AddCommand(syncUpCmd)
syncCmd.AddCommand(syncDownCmd)
syncCmd.AddCommand(syncLogCmd)
syncCmd.AddCommand(syncMergeCmd)
// Flags
syncInitCmd.Flags().StringP("url", "u", "", "Server URL")
syncInitCmd.Flags().StringP("key", "k", "", "API key")
syncMergeCmd.Flags().Bool("prefer-local", false, "Prefer local database")
syncMergeCmd.Flags().Bool("prefer-server", false, "Prefer server database")
// Global sync flags
syncCmd.PersistentFlags().BoolVarP(&quietFlag, "quiet", "q", false, "Suppress progress output")
}
// Helper functions
func getLastSyncTime(clientID string) int64 {
db := engine.GetDB()
if db == nil {
return 0
}
var lastSync int64
err := db.QueryRow("SELECT last_sync FROM sync_state WHERE client_id = ?", clientID).Scan(&lastSync)
if err != nil {
return 0
}
return lastSync
}
func getLocalChanges(since int64) ([]*engine.Task, error) {
db := engine.GetDB()
if db == nil {
return nil, fmt.Errorf("database not initialized")
}
rows, err := db.Query(`
SELECT DISTINCT task_uuid
FROM change_log
WHERE changed_at > ?
ORDER BY changed_at ASC
`, since)
if err != nil {
return nil, err
}
defer rows.Close()
var tasks []*engine.Task
for rows.Next() {
var uuidStr string
if err := rows.Scan(&uuidStr); err != nil {
continue
}
taskUUID, err := uuid.Parse(uuidStr)
if err != nil {
continue
}
task, err := engine.GetTask(taskUUID)
if err != nil {
continue
}
tasks = append(tasks, task)
}
return tasks, nil
}
func formatTimestamp(ts int64) string {
return fmt.Sprintf("%d", ts) // Simple for now, can enhance later
}