Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 19 additions & 38 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Strategy struct {
spoolsMu sync.Mutex
spools map[string]*RepoSpools
tokenManager *githubapp.TokenManager
snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex
}

func New(
Expand Down Expand Up @@ -373,47 +374,27 @@ func ExtractRepoPath(pathValue string) string {
return repoPath
}

func (s *Strategy) serveCachedArtifact(w http.ResponseWriter, r *http.Request, host, pathValue, urlSuffix, artifact string) {
ctx := r.Context()
logger := logging.FromContext(ctx)

logger.DebugContext(ctx, artifact+" request",
slog.String("host", host),
slog.String("path", pathValue))

pathValue = strings.TrimSuffix(pathValue, "/"+urlSuffix)
repoPath := ExtractRepoPath(pathValue)
upstreamURL := "https://" + host + "/" + repoPath
cacheKey := cache.NewKey(upstreamURL + "." + artifact)

reader, headers, err := s.cache.Open(ctx, cacheKey)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
logger.DebugContext(ctx, artifact+" not found in cache",
slog.String("upstream", upstreamURL))
http.NotFound(w, r)
return
}
logger.ErrorContext(ctx, "Failed to open "+artifact+" from cache",
slog.String("upstream", upstreamURL),
slog.String("error", err.Error()))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
defer reader.Close()

for key, values := range headers {
for _, value := range values {
w.Header().Add(key, value)
// ensureCloneReady blocks until the repository mirror is ready. If the mirror
// does not exist yet (StateEmpty), it triggers a clone synchronously. If another
// goroutine is already cloning (StateCloning), it polls until completion or the
// context is cancelled. Returns an error if the clone fails or the context is done.
func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Repository) error {
if repo.State() == gitclone.StateEmpty {
s.startClone(ctx, repo)
}
for repo.State() == gitclone.StateCloning {
t := time.NewTimer(500 * time.Millisecond)
select {
case <-ctx.Done():
t.Stop()
return errors.Wrap(ctx.Err(), "cancelled waiting for clone")
case <-t.C:
}
}

_, err = io.Copy(w, reader)
if err != nil {
logger.ErrorContext(ctx, "Failed to stream "+artifact,
slog.String("upstream", upstreamURL),
slog.String("error", err.Error()))
if repo.State() != gitclone.StateReady {
return errors.New("repository unavailable after clone attempt")
}
return nil
}

func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
Expand Down
70 changes: 69 additions & 1 deletion internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package git

import (
"context"
"io"
"log/slog"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"

"github.com/alecthomas/errors"
Expand Down Expand Up @@ -45,6 +48,10 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone

logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream))

mu := s.snapshotMutexFor(upstream)
mu.Lock()
defer mu.Unlock()

mirrorRoot := s.cloneManager.Config().MirrorRoot
snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream)
if err != nil {
Expand Down Expand Up @@ -104,6 +111,67 @@ func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) {
})
}

func (s *Strategy) snapshotMutexFor(upstreamURL string) *sync.Mutex {
mu, _ := s.snapshotMu.LoadOrStore(upstreamURL, &sync.Mutex{})
return mu.(*sync.Mutex)
}

func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
s.serveCachedArtifact(w, r, host, pathValue, "snapshot.tar.zst", "snapshot")
ctx := r.Context()
logger := logging.FromContext(ctx)

repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst"))
upstreamURL := "https://" + host + "/" + repoPath
cacheKey := cache.NewKey(upstreamURL + ".snapshot")

reader, headers, err := s.cache.Open(ctx, cacheKey)
if errors.Is(err, os.ErrNotExist) {
repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL)
if repoErr != nil {
logger.ErrorContext(ctx, "Failed to get or create clone",
slog.String("upstream", upstreamURL),
slog.String("error", repoErr.Error()))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil {
logger.ErrorContext(ctx, "Clone unavailable for snapshot",
slog.String("upstream", upstreamURL),
slog.String("error", cloneErr.Error()))
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
return
}
if genErr := s.generateAndUploadSnapshot(ctx, repo); genErr != nil {
logger.ErrorContext(ctx, "On-demand snapshot generation failed",
slog.String("upstream", upstreamURL),
slog.String("error", genErr.Error()))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
reader, headers, err = s.cache.Open(ctx, cacheKey)
}
if err != nil {
if errors.Is(err, os.ErrNotExist) {
logger.DebugContext(ctx, "snapshot not found in cache", slog.String("upstream", upstreamURL))
http.NotFound(w, r)
return
}
logger.ErrorContext(ctx, "Failed to open snapshot from cache",
slog.String("upstream", upstreamURL),
slog.String("error", err.Error()))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
defer reader.Close()

for key, values := range headers {
for _, value := range values {
w.Header().Add(key, value)
}
}
if _, err = io.Copy(w, reader); err != nil {
logger.ErrorContext(ctx, "Failed to stream snapshot",
slog.String("upstream", upstreamURL),
slog.String("error", err.Error()))
}
}
44 changes: 41 additions & 3 deletions internal/strategy/git/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,54 @@ func TestSnapshotHTTPEndpoint(t *testing.T) {
assert.Equal(t, "application/zstd", w.Header().Get("Content-Type"))
assert.Equal(t, snapshotData, w.Body.Bytes())

// Test snapshot not found
// Test snapshot not found - repo has no mirror, so clone is attempted but
// fails immediately because the context is cancelled.
cancelCtx, cancel := context.WithCancel(ctx)
cancel()
req = httptest.NewRequest(http.MethodGet, "/git/github.com/org/nonexistent/snapshot.tar.zst", nil)
req = req.WithContext(ctx)
req = req.WithContext(cancelCtx)
req.SetPathValue("host", "github.com")
req.SetPathValue("path", "org/nonexistent/snapshot.tar.zst")
w = httptest.NewRecorder()

handler.ServeHTTP(w, req)

assert.Equal(t, 404, w.Code)
assert.Equal(t, 503, w.Code)
}

func TestSnapshotOnDemandGenerationViaHTTP(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found in PATH")
}

_, ctx := logging.Configure(context.Background(), logging.Config{})
tmpDir := t.TempDir()
mirrorRoot := filepath.Join(tmpDir, "mirrors")
mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo")
createTestMirrorRepo(t, mirrorPath)

memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
mux := newTestMux()

cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

handler := mux.handlers["GET /git/{host}/{path...}"]
assert.NotZero(t, handler)

req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot.tar.zst", nil)
req = req.WithContext(ctx)
req.SetPathValue("host", "github.com")
req.SetPathValue("path", "org/repo/snapshot.tar.zst")
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)

assert.Equal(t, 200, w.Code)
assert.Equal(t, "application/zstd", w.Header().Get("Content-Type"))
assert.NotZero(t, w.Body.Len())
}

// createTestMirrorRepo creates a bare mirror-style repo at mirrorPath with one commit.
Expand Down