diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index bd81604..8a3524f 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -51,6 +51,7 @@ type Strategy struct { spoolsMu sync.Mutex spools map[string]*RepoSpools tokenManager *githubapp.TokenManager + snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex } func New( @@ -373,47 +374,27 @@ func ExtractRepoPath(pathValue string) string { return repoPath } -func (s *Strategy) serveCachedArtifact(w http.ResponseWriter, r *http.Request, host, pathValue, urlSuffix, artifact string) { - ctx := r.Context() - logger := logging.FromContext(ctx) - - logger.DebugContext(ctx, artifact+" request", - slog.String("host", host), - slog.String("path", pathValue)) - - pathValue = strings.TrimSuffix(pathValue, "/"+urlSuffix) - repoPath := ExtractRepoPath(pathValue) - upstreamURL := "https://" + host + "/" + repoPath - cacheKey := cache.NewKey(upstreamURL + "." + artifact) - - reader, headers, err := s.cache.Open(ctx, cacheKey) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - logger.DebugContext(ctx, artifact+" not found in cache", - slog.String("upstream", upstreamURL)) - http.NotFound(w, r) - return - } - logger.ErrorContext(ctx, "Failed to open "+artifact+" from cache", - slog.String("upstream", upstreamURL), - slog.String("error", err.Error())) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - defer reader.Close() - - for key, values := range headers { - for _, value := range values { - w.Header().Add(key, value) +// ensureCloneReady blocks until the repository mirror is ready. If the mirror +// does not exist yet (StateEmpty), it triggers a clone synchronously. If another +// goroutine is already cloning (StateCloning), it polls until completion or the +// context is cancelled. Returns an error if the clone fails or the context is done. +func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Repository) error { + if repo.State() == gitclone.StateEmpty { + s.startClone(ctx, repo) + } + for repo.State() == gitclone.StateCloning { + t := time.NewTimer(500 * time.Millisecond) + select { + case <-ctx.Done(): + t.Stop() + return errors.Wrap(ctx.Err(), "cancelled waiting for clone") + case <-t.C: } } - - _, err = io.Copy(w, reader) - if err != nil { - logger.ErrorContext(ctx, "Failed to stream "+artifact, - slog.String("upstream", upstreamURL), - slog.String("error", err.Error())) + if repo.State() != gitclone.StateReady { + return errors.New("repository unavailable after clone attempt") } + return nil } func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 3985d35..4a84675 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -2,11 +2,14 @@ package git import ( "context" + "io" "log/slog" "net/http" "os" "os/exec" "path/filepath" + "strings" + "sync" "time" "github.com/alecthomas/errors" @@ -45,6 +48,10 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream)) + mu := s.snapshotMutexFor(upstream) + mu.Lock() + defer mu.Unlock() + mirrorRoot := s.cloneManager.Config().MirrorRoot snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream) if err != nil { @@ -104,6 +111,67 @@ func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) { }) } +func (s *Strategy) snapshotMutexFor(upstreamURL string) *sync.Mutex { + mu, _ := s.snapshotMu.LoadOrStore(upstreamURL, &sync.Mutex{}) + return mu.(*sync.Mutex) +} + func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { - s.serveCachedArtifact(w, r, host, pathValue, "snapshot.tar.zst", "snapshot") + ctx := r.Context() + logger := logging.FromContext(ctx) + + repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst")) + upstreamURL := "https://" + host + "/" + repoPath + cacheKey := cache.NewKey(upstreamURL + ".snapshot") + + reader, headers, err := s.cache.Open(ctx, cacheKey) + if errors.Is(err, os.ErrNotExist) { + repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) + if repoErr != nil { + logger.ErrorContext(ctx, "Failed to get or create clone", + slog.String("upstream", upstreamURL), + slog.String("error", repoErr.Error())) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil { + logger.ErrorContext(ctx, "Clone unavailable for snapshot", + slog.String("upstream", upstreamURL), + slog.String("error", cloneErr.Error())) + http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) + return + } + if genErr := s.generateAndUploadSnapshot(ctx, repo); genErr != nil { + logger.ErrorContext(ctx, "On-demand snapshot generation failed", + slog.String("upstream", upstreamURL), + slog.String("error", genErr.Error())) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + reader, headers, err = s.cache.Open(ctx, cacheKey) + } + if err != nil { + if errors.Is(err, os.ErrNotExist) { + logger.DebugContext(ctx, "snapshot not found in cache", slog.String("upstream", upstreamURL)) + http.NotFound(w, r) + return + } + logger.ErrorContext(ctx, "Failed to open snapshot from cache", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + defer reader.Close() + + for key, values := range headers { + for _, value := range values { + w.Header().Add(key, value) + } + } + if _, err = io.Copy(w, reader); err != nil { + logger.ErrorContext(ctx, "Failed to stream snapshot", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + } } diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index f9c5133..916dc20 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -66,16 +66,54 @@ func TestSnapshotHTTPEndpoint(t *testing.T) { assert.Equal(t, "application/zstd", w.Header().Get("Content-Type")) assert.Equal(t, snapshotData, w.Body.Bytes()) - // Test snapshot not found + // Test snapshot not found - repo has no mirror, so clone is attempted but + // fails immediately because the context is cancelled. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() req = httptest.NewRequest(http.MethodGet, "/git/github.com/org/nonexistent/snapshot.tar.zst", nil) - req = req.WithContext(ctx) + req = req.WithContext(cancelCtx) req.SetPathValue("host", "github.com") req.SetPathValue("path", "org/nonexistent/snapshot.tar.zst") w = httptest.NewRecorder() handler.ServeHTTP(w, req) - assert.Equal(t, 404, w.Code) + assert.Equal(t, 503, w.Code) +} + +func TestSnapshotOnDemandGenerationViaHTTP(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo") + createTestMirrorRepo(t, mirrorPath) + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + handler := mux.handlers["GET /git/{host}/{path...}"] + assert.NotZero(t, handler) + + req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot.tar.zst", nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", "org/repo/snapshot.tar.zst") + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + assert.Equal(t, 200, w.Code) + assert.Equal(t, "application/zstd", w.Header().Get("Content-Type")) + assert.NotZero(t, w.Body.Len()) } // createTestMirrorRepo creates a bare mirror-style repo at mirrorPath with one commit.