diff --git a/.hermit/go/bin/bbolt b/.hermit/go/bin/bbolt new file mode 100755 index 0000000..f013c00 Binary files /dev/null and b/.hermit/go/bin/bbolt differ diff --git a/cmd/cachew/main.go b/cmd/cachew/main.go index b454b2f..8cc0dfb 100644 --- a/cmd/cachew/main.go +++ b/cmd/cachew/main.go @@ -160,17 +160,18 @@ func (c *NamespacesCmd) Run(ctx context.Context, cache cache.Cache) error { } type SnapshotCmd struct { - Namespace string `arg:"" help:"Namespace for organizing cache objects."` - Key PlatformKey `arg:"" help:"Object key (hex or string)."` - Directory string `arg:"" help:"Directory to archive." type:"path"` - TTL time.Duration `help:"Time to live for the object."` - Exclude []string `help:"Patterns to exclude (tar --exclude syntax)."` + Namespace string `arg:"" help:"Namespace for organizing cache objects."` + Key PlatformKey `arg:"" help:"Object key (hex or string)."` + Directory string `arg:"" help:"Directory to archive." type:"path"` + TTL time.Duration `help:"Time to live for the object."` + Exclude []string `help:"Patterns to exclude (tar --exclude syntax)."` + ZstdThreads int `help:"Threads for zstd compression (0 = all CPU cores)." default:"0"` } func (c *SnapshotCmd) Run(ctx context.Context, cache cache.Cache) error { fmt.Fprintf(os.Stderr, "Archiving %s...\n", c.Directory) //nolint:forbidigo namespacedCache := cache.Namespace(c.Namespace) - if err := snapshot.Create(ctx, namespacedCache, c.Key.Key(), c.Directory, c.TTL, c.Exclude); err != nil { + if err := snapshot.Create(ctx, namespacedCache, c.Key.Key(), c.Directory, c.TTL, c.Exclude, c.ZstdThreads); err != nil { return errors.Wrap(err, "failed to create snapshot") } @@ -179,15 +180,16 @@ func (c *SnapshotCmd) Run(ctx context.Context, cache cache.Cache) error { } type RestoreCmd struct { - Namespace string `arg:"" help:"Namespace for organizing cache objects."` - Key PlatformKey `arg:"" help:"Object key (hex or string)."` - Directory string `arg:"" help:"Target directory for extraction." type:"path"` + Namespace string `arg:"" help:"Namespace for organizing cache objects."` + Key PlatformKey `arg:"" help:"Object key (hex or string)."` + Directory string `arg:"" help:"Target directory for extraction." type:"path"` + ZstdThreads int `help:"Threads for zstd decompression (0 = all CPU cores)." default:"0"` } func (c *RestoreCmd) Run(ctx context.Context, cache cache.Cache) error { fmt.Fprintf(os.Stderr, "Restoring to %s...\n", c.Directory) //nolint:forbidigo namespacedCache := cache.Namespace(c.Namespace) - if err := snapshot.Restore(ctx, namespacedCache, c.Key.Key(), c.Directory); err != nil { + if err := snapshot.Restore(ctx, namespacedCache, c.Key.Key(), c.Directory, c.ZstdThreads); err != nil { return errors.Wrap(err, "failed to restore snapshot") } diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index 790e5e3..18bec45 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -57,6 +58,7 @@ type Config struct { FetchInterval time.Duration `hcl:"fetch-interval,optional" help:"How often to fetch from upstream in minutes." default:"15m"` RefCheckInterval time.Duration `hcl:"ref-check-interval,optional" help:"How long to cache ref checks." default:"10s"` Maintenance bool `hcl:"maintenance,optional" help:"Enable git maintenance scheduling for mirror repos." default:"false"` + PackThreads int `hcl:"pack-threads,optional" help:"Threads for git pack operations (0 = all CPU cores)." default:"0"` } // CredentialProvider provides credentials for git operations. @@ -117,6 +119,10 @@ func NewManager(ctx context.Context, config Config, credentialProvider Credentia config.RefCheckInterval = 10 * time.Second } + if config.PackThreads <= 0 { + config.PackThreads = runtime.NumCPU() + } + if err := os.MkdirAll(config.MirrorRoot, 0o750); err != nil { return nil, errors.Wrap(err, "create root directory") } @@ -236,7 +242,7 @@ func (m *Manager) DiscoverExisting(ctx context.Context) ([]*Repository, error) { } repo.fetchSem <- struct{}{} - if err := configureMirror(ctx, path); err != nil { + if err := configureMirror(ctx, path, m.config.PackThreads); err != nil { return errors.Wrapf(err, "configure mirror for %s", upstreamURL) } @@ -344,7 +350,7 @@ func (r *Repository) Clone(ctx context.Context) error { // mirrorConfigSettings returns git config key-value pairs applied to mirror // clones to optimise upload-pack serving performance. -func mirrorConfigSettings() [][2]string { +func mirrorConfigSettings(packThreads int) [][2]string { return [][2]string{ // Protocol {"protocol.version", "2"}, @@ -366,7 +372,7 @@ func mirrorConfigSettings() [][2]string { // Disable auto GC {"gc.auto", "0"}, // Pack performance - {"pack.threads", "4"}, + {"pack.threads", strconv.Itoa(packThreads)}, {"pack.deltaCacheSize", "512m"}, {"pack.windowMemory", "1g"}, } @@ -394,8 +400,8 @@ func startMaintenance(ctx context.Context) error { return nil } -func configureMirror(ctx context.Context, repoPath string) error { - for _, kv := range mirrorConfigSettings() { +func configureMirror(ctx context.Context, repoPath string, packThreads int) error { + for _, kv := range mirrorConfigSettings(packThreads) { // #nosec G204 - repoPath and config values are controlled by us cmd := exec.CommandContext(ctx, "git", "-C", repoPath, "config", kv[0], kv[1]) output, err := cmd.CombinedOutput() @@ -430,7 +436,7 @@ func (r *Repository) executeClone(ctx context.Context) error { return errors.Wrapf(err, "git clone --mirror: %s", string(output)) } - if err := configureMirror(ctx, r.path); err != nil { + if err := configureMirror(ctx, r.path, r.config.PackThreads); err != nil { return errors.Wrap(err, "configure mirror") } diff --git a/internal/gitclone/manager_test.go b/internal/gitclone/manager_test.go index 2578e99..04777c8 100644 --- a/internal/gitclone/manager_test.go +++ b/internal/gitclone/manager_test.go @@ -188,7 +188,7 @@ func TestManager_DiscoverExisting(t *testing.T) { // Verify mirror config was applied to discovered repos. for _, repoPath := range repoPaths { - for _, kv := range mirrorConfigSettings() { + for _, kv := range mirrorConfigSettings(manager.Config().PackThreads) { cmd := exec.Command("git", "-C", repoPath, "config", "--get", kv[0]) output, err := cmd.Output() assert.NoError(t, err, "config key %s in %s", kv[0], repoPath) @@ -307,6 +307,7 @@ func TestRepository_CloneSetsMirrorConfig(t *testing.T) { clonePath := filepath.Join(tmpDir, "clone") repo := &Repository{ state: StateEmpty, + config: Config{PackThreads: 4}, path: clonePath, upstreamURL: upstreamPath, fetchSem: make(chan struct{}, 1), @@ -316,7 +317,7 @@ func TestRepository_CloneSetsMirrorConfig(t *testing.T) { assert.NoError(t, repo.Clone(ctx)) assert.Equal(t, StateReady, repo.State()) - for _, kv := range mirrorConfigSettings() { + for _, kv := range mirrorConfigSettings(4) { cmd := exec.Command("git", "-C", clonePath, "config", "--get", kv[0]) output, err := cmd.Output() assert.NoError(t, err, "config key %s", kv[0]) diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 58fa8da..6ebfedd 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -9,6 +9,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "time" "github.com/alecthomas/errors" @@ -21,7 +22,12 @@ import ( // The archive preserves all file permissions, ownership, and symlinks. // The operation is fully streaming - no temporary files are created. // Exclude patterns use tar's --exclude syntax. -func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string) error { +// threads controls zstd parallelism; 0 uses all available CPU cores. +func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int) error { + if threads <= 0 { + threads = runtime.NumCPU() + } + // Verify directory exists if info, err := os.Stat(directory); err != nil { return errors.Wrap(err, "failed to stat directory") @@ -45,7 +51,7 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st tarArgs = append(tarArgs, ".") tarCmd := exec.CommandContext(ctx, "tar", tarArgs...) - zstdCmd := exec.CommandContext(ctx, "zstd", "-c", "-T4") + zstdCmd := exec.CommandContext(ctx, "zstd", "-c", fmt.Sprintf("-T%d", threads)) //nolint:gosec // threads is a validated integer, not user input tarStdout, err := tarCmd.StdoutPipe() if err != nil { @@ -90,7 +96,12 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st // The archive is decompressed with zstd and extracted with tar, preserving // all file permissions, ownership, and symlinks. // The operation is fully streaming - no temporary files are created. -func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string) error { +// threads controls zstd parallelism; 0 uses all available CPU cores. +func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string, threads int) error { + if threads <= 0 { + threads = runtime.NumCPU() + } + rc, _, err := remote.Open(ctx, key) if err != nil { return errors.Wrap(err, "failed to open object") @@ -102,7 +113,7 @@ func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory s return errors.Wrap(err, "failed to create target directory") } - zstdCmd := exec.CommandContext(ctx, "zstd", "-dc", "-T4") + zstdCmd := exec.CommandContext(ctx, "zstd", "-dc", fmt.Sprintf("-T%d", threads)) //nolint:gosec // threads is a validated integer, not user input tarCmd := exec.CommandContext(ctx, "tar", "-xpf", "-", "-C", directory) zstdCmd.Stdin = rc diff --git a/internal/snapshot/snapshot_test.go b/internal/snapshot/snapshot_test.go index b691947..9a34756 100644 --- a/internal/snapshot/snapshot_test.go +++ b/internal/snapshot/snapshot_test.go @@ -30,7 +30,7 @@ func TestCreateAndRestoreRoundTrip(t *testing.T) { assert.NoError(t, os.Mkdir(filepath.Join(srcDir, "subdir"), 0o755)) assert.NoError(t, os.WriteFile(filepath.Join(srcDir, "subdir", "file3.txt"), []byte("content3"), 0o644)) - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil, 0) assert.NoError(t, err) headers, err := mem.Stat(ctx, key) @@ -38,7 +38,7 @@ func TestCreateAndRestoreRoundTrip(t *testing.T) { assert.Equal(t, "application/zstd", headers.Get("Content-Type")) dstDir := t.TempDir() - err = snapshot.Restore(ctx, mem, key, dstDir) + err = snapshot.Restore(ctx, mem, key, dstDir, 0) assert.NoError(t, err) content1, err := os.ReadFile(filepath.Join(dstDir, "file1.txt")) @@ -71,11 +71,11 @@ func TestCreateWithExcludePatterns(t *testing.T) { assert.NoError(t, os.Mkdir(filepath.Join(srcDir, "logs"), 0o755)) assert.NoError(t, os.WriteFile(filepath.Join(srcDir, "logs", "app.log"), []byte("excluded"), 0o644)) - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, []string{"*.log", "logs"}) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, []string{"*.log", "logs"}, 0) assert.NoError(t, err) dstDir := t.TempDir() - err = snapshot.Restore(ctx, mem, key, dstDir) + err = snapshot.Restore(ctx, mem, key, dstDir, 0) assert.NoError(t, err) _, err = os.Stat(filepath.Join(dstDir, "include.txt")) @@ -99,11 +99,11 @@ func TestCreatePreservesSymlinks(t *testing.T) { assert.NoError(t, os.WriteFile(filepath.Join(srcDir, "target.txt"), []byte("target"), 0o644)) assert.NoError(t, os.Symlink("target.txt", filepath.Join(srcDir, "link.txt"))) - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil, 0) assert.NoError(t, err) dstDir := t.TempDir() - err = snapshot.Restore(ctx, mem, key, dstDir) + err = snapshot.Restore(ctx, mem, key, dstDir, 0) assert.NoError(t, err) info, err := os.Lstat(filepath.Join(dstDir, "link.txt")) @@ -122,7 +122,7 @@ func TestCreateNonexistentDirectory(t *testing.T) { defer mem.Close() key := cache.Key{1, 2, 3} - err = snapshot.Create(ctx, mem, key, "/nonexistent/directory", time.Hour, nil) + err = snapshot.Create(ctx, mem, key, "/nonexistent/directory", time.Hour, nil, 0) assert.Error(t, err) } @@ -136,7 +136,7 @@ func TestCreateNotADirectory(t *testing.T) { tmpFile := filepath.Join(t.TempDir(), "file.txt") assert.NoError(t, os.WriteFile(tmpFile, []byte("content"), 0o644)) - err = snapshot.Create(ctx, mem, key, tmpFile, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, tmpFile, time.Hour, nil, 0) assert.Error(t, err) assert.Contains(t, err.Error(), "not a directory") } @@ -158,7 +158,7 @@ func TestCreateContextCancellation(t *testing.T) { cancelCtx, cancel := context.WithCancel(context.Background()) cancel() - err = snapshot.Create(cancelCtx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(cancelCtx, mem, key, srcDir, time.Hour, nil, 0) assert.Error(t, err) } @@ -170,7 +170,7 @@ func TestRestoreNonexistentKey(t *testing.T) { key := cache.Key{1, 2, 3} dstDir := t.TempDir() - err = snapshot.Restore(ctx, mem, key, dstDir) + err = snapshot.Restore(ctx, mem, key, dstDir, 0) assert.Error(t, err) } @@ -184,11 +184,11 @@ func TestRestoreCreatesTargetDirectory(t *testing.T) { srcDir := t.TempDir() assert.NoError(t, os.WriteFile(filepath.Join(srcDir, "file.txt"), []byte("content"), 0o644)) - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil, 0) assert.NoError(t, err) dstDir := filepath.Join(t.TempDir(), "nested", "target") - err = snapshot.Restore(ctx, mem, key, dstDir) + err = snapshot.Restore(ctx, mem, key, dstDir, 0) assert.NoError(t, err) content, err := os.ReadFile(filepath.Join(dstDir, "file.txt")) @@ -210,14 +210,14 @@ func TestRestoreContextCancellation(t *testing.T) { assert.NoError(t, os.WriteFile(filename, content, 0o644)) } - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil, 0) assert.NoError(t, err) cancelCtx, cancel := context.WithCancel(context.Background()) cancel() dstDir := t.TempDir() - err = snapshot.Restore(cancelCtx, mem, key, dstDir) + err = snapshot.Restore(cancelCtx, mem, key, dstDir, 0) assert.Error(t, err) } @@ -230,11 +230,11 @@ func TestCreateEmptyDirectory(t *testing.T) { srcDir := t.TempDir() - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil, 0) assert.NoError(t, err) dstDir := t.TempDir() - err = snapshot.Restore(ctx, mem, key, dstDir) + err = snapshot.Restore(ctx, mem, key, dstDir, 0) assert.NoError(t, err) entries, err := os.ReadDir(dstDir) @@ -254,11 +254,11 @@ func TestCreateWithNestedDirectories(t *testing.T) { assert.NoError(t, os.MkdirAll(deepPath, 0o755)) assert.NoError(t, os.WriteFile(filepath.Join(deepPath, "deep.txt"), []byte("deep content"), 0o644)) - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil, 0) assert.NoError(t, err) dstDir := t.TempDir() - err = snapshot.Restore(ctx, mem, key, dstDir) + err = snapshot.Restore(ctx, mem, key, dstDir, 0) assert.NoError(t, err) content, err := os.ReadFile(filepath.Join(dstDir, "a", "b", "c", "d", "e", "deep.txt")) @@ -276,7 +276,7 @@ func TestCreateSetsCorrectHeaders(t *testing.T) { srcDir := t.TempDir() assert.NoError(t, os.WriteFile(filepath.Join(srcDir, "file.txt"), []byte("content"), 0o644)) - err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil) + err = snapshot.Create(ctx, mem, key, srcDir, time.Hour, nil, 0) assert.NoError(t, err) headers, err := mem.Stat(ctx, key) diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index bd81604..231d9a8 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -37,7 +37,8 @@ type Config struct { SnapshotInterval time.Duration `hcl:"snapshot-interval,optional" help:"How often to generate tar.zstd snapshots. 0 disables snapshots." default:"0"` RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"` // ServerURL is embedded as remote.origin.url in snapshots so git pull goes through cachew. - ServerURL string `hcl:"server-url,optional" help:"Base URL of this cachew instance, embedded in snapshot remote URLs." default:"${CACHEW_URL}"` + ServerURL string `hcl:"server-url,optional" help:"Base URL of this cachew instance, embedded in snapshot remote URLs." default:"${CACHEW_URL}"` + ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression (0 = all CPU cores)." default:"0"` } type Strategy struct { diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 3985d35..b7ebc80 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -83,7 +83,7 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone ttl := 7 * 24 * time.Hour excludePatterns := []string{"*.lock"} - err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns) + err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns, s.config.ZstdThreads) // Always clean up the snapshot working directory. if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index f9c5133..38f4470 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -147,7 +147,7 @@ func TestSnapshotGenerationViaLocalClone(t *testing.T) { // Restore the snapshot and verify it is a working (non-bare) checkout. restoreDir := filepath.Join(tmpDir, "restored") - err = snapshot.Restore(ctx, memCache, cacheKey, restoreDir) + err = snapshot.Restore(ctx, memCache, cacheKey, restoreDir, 0) assert.NoError(t, err) // A non-bare clone has a .git directory (not a bare repo). @@ -203,7 +203,7 @@ func TestSnapshotRemoteURLUsesServerURL(t *testing.T) { cacheKey := cache.NewKey(upstreamURL + ".snapshot") restoreDir := filepath.Join(tmpDir, "restored") - err = snapshot.Restore(ctx, memCache, cacheKey, restoreDir) + err = snapshot.Restore(ctx, memCache, cacheKey, restoreDir, 0) assert.NoError(t, err) cmd := exec.Command("git", "-C", restoreDir, "remote", "get-url", "origin")