From 75c098d717add10c083a3bda53a4aa46157e74de Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 11:33:07 +0100 Subject: [PATCH 01/15] hotfix: add indexes for jobs/preps FKs --- model/preparation.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/model/preparation.go b/model/preparation.go index c7e97040..469ce103 100644 --- a/model/preparation.go +++ b/model/preparation.go @@ -183,9 +183,9 @@ type Job struct { ErrorStackTrace string `json:"errorStackTrace" table:"verbose"` // Associations - WorkerID *string `gorm:"size:63" json:"workerId,omitempty"` + WorkerID *string `gorm:"size:63;index" json:"workerId,omitempty"` Worker *Worker `gorm:"foreignKey:WorkerID;references:ID;constraint:OnDelete:SET NULL" json:"worker,omitempty" swaggerignore:"true" table:"verbose;expand"` - AttachmentID SourceAttachmentID `json:"attachmentId" table:"verbose"` + AttachmentID SourceAttachmentID `gorm:"index" json:"attachmentId" table:"verbose"` Attachment *SourceAttachment `gorm:"foreignKey:AttachmentID;constraint:OnDelete:CASCADE" json:"attachment,omitempty" swaggerignore:"true" table:"expand"` FileRanges []FileRange `gorm:"foreignKey:JobID;constraint:OnDelete:SET NULL" json:"fileRanges,omitempty" swaggerignore:"true" table:"-"` } @@ -266,17 +266,17 @@ type Car struct { RootCID CID `cbor:"3,keyasint,omitempty" gorm:"column:root_cid;type:bytes" json:"rootCid" swaggertype:"string"` FileSize int64 `cbor:"4,keyasint,omitempty" json:"fileSize"` MinPieceSizePadding int64 `cbor:"5,keyasint,omitempty" json:"minPieceSizePadding"` // MinPieceSizePadding tracks virtual padding for inline mode only. Inline: stores padding amount, PieceReader serves zeros virtually. Non-inline: always 0, literal zeros are written to CAR file for Curio TreeD compatibility. - StorageID *StorageID `cbor:"-" json:"storageId" table:"verbose"` + StorageID *StorageID `cbor:"-" gorm:"index" json:"storageId" table:"verbose"` Storage *Storage `cbor:"-" gorm:"foreignKey:StorageID;constraint:OnDelete:SET NULL" json:"storage,omitempty" swaggerignore:"true" table:"expand"` StoragePath string `cbor:"-" json:"storagePath"` // StoragePath is the path to the CAR file inside the storage. If the StorageID is nil but StoragePath is not empty, it means the CAR file is stored at the local absolute path. NumOfFiles int64 `cbor:"-" json:"numOfFiles" table:"verbose"` // Association - PreparationID PreparationID `cbor:"-" json:"preparationId" table:"-"` + PreparationID PreparationID `cbor:"-" gorm:"index" json:"preparationId" table:"-"` Preparation *Preparation `cbor:"-" gorm:"foreignKey:PreparationID;constraint:OnDelete:CASCADE" json:"preparation,omitempty" swaggerignore:"true" table:"-"` - AttachmentID *SourceAttachmentID `cbor:"-" json:"attachmentId" table:"-"` + AttachmentID *SourceAttachmentID `cbor:"-" gorm:"index" json:"attachmentId" table:"-"` Attachment *SourceAttachment `cbor:"-" gorm:"foreignKey:AttachmentID;constraint:OnDelete:CASCADE" json:"attachment,omitempty" swaggerignore:"true" table:"-"` - JobID *JobID `cbor:"-" json:"jobId,omitempty" table:"-"` + JobID *JobID `cbor:"-" gorm:"index" json:"jobId,omitempty" table:"-"` Job *Job `cbor:"-" gorm:"foreignKey:JobID;constraint:OnDelete:SET NULL" json:"job,omitempty" swaggerignore:"true" table:"-"` } From fbc08e912a618bb17cf2e5c843543b660ac5bbe3 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 12:09:02 +0100 Subject: [PATCH 02/15] use SKIP LOCKED for job claiming, break up job cleanup --- handler/job/pack.go | 100 +++++++++++++++++++---------- service/datasetworker/find.go | 7 +- service/healthcheck/healthcheck.go | 76 ++++++++++++++++++---- 3 files changed, 138 insertions(+), 45 deletions(-) diff --git a/handler/job/pack.go b/handler/job/pack.go index 8e0761f6..1cd2904c 100644 --- a/handler/job/pack.go +++ b/handler/job/pack.go @@ -11,8 +11,8 @@ import ( "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/pack" "github.com/data-preservation-programs/singularity/scan" - "github.com/data-preservation-programs/singularity/util" "gorm.io/gorm" + "gorm.io/gorm/clause" ) var ( @@ -54,27 +54,44 @@ func (DefaultHandler) StartPackHandler( var jobs []model.Job if jobID == 0 { err = database.DoRetry(ctx, func() error { - err := db.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, startableStatesForPack, sourceAttachment.ID).Find(&jobs).Error - if err != nil { - return errors.WithStack(err) - } - var jobIDs []model.JobID - for i, job := range jobs { - jobIDs = append(jobIDs, job.ID) - jobs[i].State = model.Ready - } - jobIDChunks := util.ChunkSlice(jobIDs, util.BatchSize) - for _, jobIDs := range jobIDChunks { - err = db.Model(&model.Job{}).Where("id IN ?", jobIDs).Updates(map[string]any{ - "state": model.Ready, - "error_message": "", - "error_stack_trace": "", - }).Error + return db.Transaction(func(tx *gorm.DB) error { + // Find all eligible jobs + err := tx.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, startableStatesForPack, sourceAttachment.ID).Find(&jobs).Error if err != nil { return errors.WithStack(err) } - } - return nil + + // Update jobs one at a time with SKIP LOCKED to avoid deadlock with worker cleanup + updatedJobs := make([]model.Job, 0, len(jobs)) + for _, job := range jobs { + // Try to lock and update this job + err = tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Model(&model.Job{}). + Where("id = ?", job.ID). + Updates(map[string]any{ + "state": model.Ready, + "error_message": "", + "error_stack_trace": "", + }).Error + + if err != nil { + return errors.WithStack(err) + } + + // Check if we actually updated the job (RowsAffected would tell us) + // If SKIP LOCKED caused us to skip it, RowsAffected will be 0 + if tx.RowsAffected > 0 { + job.State = model.Ready + updatedJobs = append(updatedJobs, job) + } + } + + // Replace jobs with only the ones we successfully updated + jobs = updatedJobs + return nil + }) }) if err != nil { return nil, errors.WithStack(err) @@ -162,23 +179,40 @@ func (DefaultHandler) PausePackHandler( var jobs []model.Job if jobID == 0 { err = database.DoRetry(ctx, func() error { - err := db.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, pausableStatesForPack, sourceAttachment.ID).Find(&jobs).Error - if err != nil { - return errors.WithStack(err) - } - var jobIDs []model.JobID - for i, job := range jobs { - jobIDs = append(jobIDs, job.ID) - jobs[i].State = model.Paused - } - jobIDChunks := util.ChunkSlice(jobIDs, util.BatchSize) - for _, jobIDs := range jobIDChunks { - err = db.Model(&model.Job{}).Where("id IN ?", jobIDs).Update("state", model.Paused).Error + return db.Transaction(func(tx *gorm.DB) error { + // Find all eligible jobs + err := tx.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, pausableStatesForPack, sourceAttachment.ID).Find(&jobs).Error if err != nil { return errors.WithStack(err) } - } - return nil + + // Update jobs one at a time with SKIP LOCKED to avoid deadlock with worker cleanup + updatedJobs := make([]model.Job, 0, len(jobs)) + for _, job := range jobs { + // Try to lock and update this job + err = tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Model(&model.Job{}). + Where("id = ?", job.ID). + Update("state", model.Paused).Error + + if err != nil { + return errors.WithStack(err) + } + + // Check if we actually updated the job (RowsAffected would tell us) + // If SKIP LOCKED caused us to skip it, RowsAffected will be 0 + if tx.RowsAffected > 0 { + job.State = model.Paused + updatedJobs = append(updatedJobs, job) + } + } + + // Replace jobs with only the ones we successfully updated + jobs = updatedJobs + return nil + }) }) if err != nil { return nil, errors.WithStack(err) diff --git a/service/datasetworker/find.go b/service/datasetworker/find.go index 4e96fbe7..7ded9956 100644 --- a/service/datasetworker/find.go +++ b/service/datasetworker/find.go @@ -8,6 +8,7 @@ import ( "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/model" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // findJob searches for a Job from the database based on the ordered list of job types provided. @@ -26,13 +27,17 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo db := w.dbNoContext.WithContext(ctx) txOpts := &sql.TxOptions{ - Isolation: sql.LevelSerializable, + Isolation: sql.LevelReadCommitted, } var job model.Job for _, jobType := range typesOrdered { err := database.DoRetry(ctx, func() error { return db.Transaction(func(db *gorm.DB) error { err := db.Preload("Attachment.Preparation.OutputStorages").Preload("Attachment.Storage"). + Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }). Where("type = ? AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). First(&job).Error if err != nil { diff --git a/service/healthcheck/healthcheck.go b/service/healthcheck/healthcheck.go index fb148f9f..0aef8526 100644 --- a/service/healthcheck/healthcheck.go +++ b/service/healthcheck/healthcheck.go @@ -59,11 +59,9 @@ func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) { // HealthCheckCleanup is a function that cleans up stale workers and work files in the database. // -// It first removes all workers that haven't sent a heartbeat for a certain threshold (staleThreshold). -// If there's an error removing the workers, it logs the error and continues. -// -// Then, it resets the state of any jobs that are marked as being processed by a worker that no longer exists. -// If there's an error updating the sources, it logs the error and continues. +// It first finds workers that haven't sent a heartbeat for a certain threshold (staleThreshold). +// Then it explicitly updates jobs owned by those workers to prevent CASCADE lock contention. +// Finally, it removes the stale workers. // // All database operations are retried on failure using the DoRetry function. // @@ -72,17 +70,73 @@ func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) { func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { db = db.WithContext(ctx) logger.Debugw("running healthcheck cleanup") - // Remove all workers that haven't sent heartbeat for 5 minutes. + + // Clean up stale workers and their jobs in a single transaction to ensure consistency + // and avoid FK CASCADE deadlocks by explicitly updating jobs first with SKIP LOCKED. err := database.DoRetry(ctx, func() error { - return db.Where("last_heartbeat < ?", time.Now().UTC().Add(-staleThreshold)).Delete(&model.Worker{}).Error + return db.Transaction(func(tx *gorm.DB) error { + // Find stale workers + var staleWorkers []model.Worker + err := tx.Where("last_heartbeat < ?", time.Now().UTC().Add(-staleThreshold)).Find(&staleWorkers).Error + if err != nil { + return errors.WithStack(err) + } + + if len(staleWorkers) == 0 { + return nil + } + + staleWorkerIDs := make([]string, len(staleWorkers)) + for i, w := range staleWorkers { + staleWorkerIDs[i] = w.ID + } + + // Explicitly update jobs owned by stale workers with SKIP LOCKED to avoid deadlock + // with concurrent bulk job updates. We use a two-step approach (SELECT then UPDATE) + // to be compatible with both PostgreSQL and MySQL/MariaDB. + var jobsToUpdate []model.Job + err = tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Select("id"). + Where("worker_id IN ? AND state = ?", staleWorkerIDs, model.Processing). + Find(&jobsToUpdate).Error + if err != nil { + return errors.WithStack(err) + } + + // Update the jobs we successfully locked + if len(jobsToUpdate) > 0 { + jobIDsToUpdate := make([]model.JobID, len(jobsToUpdate)) + for i, job := range jobsToUpdate { + jobIDsToUpdate[i] = job.ID + } + + err = tx.Model(&model.Job{}). + Where("id IN ?", jobIDsToUpdate). + Updates(map[string]any{ + "worker_id": nil, + "state": model.Ready, + }).Error + if err != nil { + return errors.WithStack(err) + } + } + + // Now delete the workers. The FK CASCADE will be mostly a no-op since we already + // nullified worker_id on jobs we could lock. Any jobs we couldn't lock will be + // handled by the CASCADE (may still cause some lock contention but reduced). + return errors.WithStack(tx.Where("id IN ?", staleWorkerIDs).Delete(&model.Worker{}).Error) + }) }) if err != nil && !errors.Is(err, context.Canceled) { - log.Logger("healthcheck").Errorw("failed to remove dead workers", "error", err) + logger.Errorw("failed to cleanup dead workers", "error", err) } - // In case there are some works that have stale foreign key referenced to dead workers, we need to remove them + // Safety check: clean up any orphaned jobs that might have been missed + // (e.g., jobs we couldn't lock due to SKIP LOCKED, or jobs left in inconsistent state) err = database.DoRetry(ctx, func() error { - return db.Model(&model.Job{}).Where("(worker_id NOT IN (?) OR worker_id IS NULL) AND state = ?", + return db.Model(&model.Job{}).Where("worker_id NOT IN (?) AND state = ?", db.Table("workers").Select("id"), model.Processing). Updates(map[string]any{ "worker_id": nil, @@ -90,7 +144,7 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { }).Error }) if err != nil && !errors.Is(err, context.Canceled) { - log.Logger("healthcheck").Errorw("failed to remove stale workers", "error", err) + logger.Errorw("failed to cleanup orphaned jobs", "error", err) } } From 88bc3561ca7625c70c33569ab44b5033535400fa Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 12:19:04 +0100 Subject: [PATCH 03/15] explicit fix for prep removal, delete in order --- handler/dataprep/remove.go | 113 +++++++++++++++++++++++++++++++++++-- 1 file changed, 107 insertions(+), 6 deletions(-) diff --git a/handler/dataprep/remove.go b/handler/dataprep/remove.go index 4d79fcdc..7a0c7475 100644 --- a/handler/dataprep/remove.go +++ b/handler/dataprep/remove.go @@ -53,12 +53,113 @@ func (DefaultHandler) RemovePreparationHandler(ctx context.Context, db *gorm.DB, } err = database.DoRetry(ctx, func() error { - return db.Transaction(func(db *gorm.DB) error { - // Use Select to control deletion order and avoid circular cascade deadlocks. - // GORM v1.31+ handles this by deleting associations in specified order, - // preventing Postgres deadlocks from multiple cascade paths to Files table. - // See: https://github.com/data-preservation-programs/singularity/pull/583 - return db.Select("Wallets", "SourceStorages", "OutputStorages").Delete(&preparation).Error + return db.Transaction(func(tx *gorm.DB) error { + // Explicitly delete child records to avoid CASCADE deadlocks with concurrent operations. + // We materialize IDs first to avoid nested subqueries that cause MySQL deadlocks. + + // Step 1: Get all attachment IDs for this preparation + var attachmentIDs []model.SourceAttachmentID + err := tx.Table("source_attachments").Select("id"). + Where("preparation_id = ?", preparation.ID). + Find(&attachmentIDs).Error + if err != nil { + return errors.WithStack(err) + } + + if len(attachmentIDs) == 0 { + // No attachments, just delete the preparation + return tx.Select("Wallets", "SourceStorages", "OutputStorages").Delete(&preparation).Error + } + + // Step 2: Get all car IDs + var carIDs []model.CarID + err = tx.Table("cars").Select("id"). + Where("preparation_id = ?", preparation.ID). + Find(&carIDs).Error + if err != nil { + return errors.WithStack(err) + } + + // Step 3: Get all job IDs + var jobIDs []model.JobID + err = tx.Table("jobs").Select("id"). + Where("attachment_id IN ?", attachmentIDs). + Find(&jobIDs).Error + if err != nil { + return errors.WithStack(err) + } + + // Step 4: Get all file IDs + var fileIDs []model.FileID + err = tx.Table("files").Select("id"). + Where("attachment_id IN ?", attachmentIDs). + Find(&fileIDs).Error + if err != nil { + return errors.WithStack(err) + } + + // Now delete in leaf-to-root order using materialized IDs: + + // 1. Delete car_blocks (leaf node) + if len(carIDs) > 0 { + err = tx.Where("car_id IN ?", carIDs).Delete(&model.CarBlock{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 2. Delete cars + if len(carIDs) > 0 { + err = tx.Where("id IN ?", carIDs).Delete(&model.Car{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 3. Delete file_ranges (from jobs) + if len(jobIDs) > 0 { + err = tx.Where("job_id IN ?", jobIDs).Delete(&model.FileRange{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 4. Delete file_ranges (from files) + if len(fileIDs) > 0 { + err = tx.Where("file_id IN ?", fileIDs).Delete(&model.FileRange{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 5. Delete files (before directories to avoid circular cascade) + if len(fileIDs) > 0 { + err = tx.Where("id IN ?", fileIDs).Delete(&model.File{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 6. Delete directories + err = tx.Where("attachment_id IN ?", attachmentIDs).Delete(&model.Directory{}).Error + if err != nil { + return errors.WithStack(err) + } + + // 7. Delete jobs + if len(jobIDs) > 0 { + err = tx.Where("id IN ?", jobIDs).Delete(&model.Job{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 8. Now delete the preparation itself, which will cascade to: + // - wallet_assignments (many2many, small table) + // - source_attachments (now empty, no more cascades) + // - output_attachments (many2many, small table) + // These cascades are safe because we've already deleted all the heavy child tables. + return tx.Select("Wallets", "SourceStorages", "OutputStorages").Delete(&preparation).Error }) }) if err != nil { From 6dc01a150385f7db662b8cb38a7968a1db15f664 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 12:40:42 +0100 Subject: [PATCH 04/15] add index on files.attachment_id so prep deletes actually complete --- model/preparation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model/preparation.go b/model/preparation.go index 469ce103..3d5f16b5 100644 --- a/model/preparation.go +++ b/model/preparation.go @@ -204,7 +204,7 @@ type File struct { LastModifiedNano int64 `cbor:"5,keyasint,omitempty" json:"lastModifiedNano"` // Associations - AttachmentID SourceAttachmentID `cbor:"-" json:"attachmentId"` + AttachmentID SourceAttachmentID `cbor:"-" gorm:"index" json:"attachmentId"` Attachment *SourceAttachment `cbor:"-" gorm:"foreignKey:AttachmentID;constraint:OnDelete:CASCADE" json:"attachment,omitempty" swaggerignore:"true"` DirectoryID *DirectoryID `cbor:"-" gorm:"index" json:"directoryId"` Directory *Directory `cbor:"-" gorm:"foreignKey:DirectoryID;constraint:OnDelete:CASCADE" json:"directory,omitempty" swaggerignore:"true"` From 0ae072c01e72975aa4cf006cc89c5325b30006a0 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 12:46:23 +0100 Subject: [PATCH 05/15] tests for worker and prep deadlocks --- handler/dataprep/remove_deadlock_test.go | 343 +++++++++++++++++++++++ model/migrate_integration_test.go | 3 + 2 files changed, 346 insertions(+) create mode 100644 handler/dataprep/remove_deadlock_test.go create mode 100644 model/migrate_integration_test.go diff --git a/handler/dataprep/remove_deadlock_test.go b/handler/dataprep/remove_deadlock_test.go new file mode 100644 index 00000000..9c8c0b0d --- /dev/null +++ b/handler/dataprep/remove_deadlock_test.go @@ -0,0 +1,343 @@ +package dataprep + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +// TestRemovePreparationNoDeadlock verifies that preparation deletion does not deadlock +// with concurrent worker operations. The test simulates multiple preparations sharing one +// storage. Deleting one preparation cascades to shared tables while workers operate on +// other preparations. The fix uses explicit ordered deletion instead of FK CASCADE. +func TestRemovePreparationNoDeadlock(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + req := require.New(t) + + // Create test data: multiple preparations sharing the SAME storage (realistic production scenario) + // When deleting one prep, CASCADE affects tables shared with other preps on same storage + const numPreparations = 3 + const numFilesPerPrep = 50 + const numDirsPerPrep = 10 + const numJobsPerPrep = 10 + const numCarsPerPrep = 5 + const numBlocksPerCar = 20 + + // Create ONE shared storage (this is the key to reproducing the real deadlock) + sharedStorage := model.Storage{ + Name: "shared-storage-" + uuid.New().String(), + Type: "local", + Path: "/tmp/test", + } + err := db.Create(&sharedStorage).Error + req.NoError(err) + + preparations := make([]model.Preparation, numPreparations) + attachments := make([]model.SourceAttachment, numPreparations) + allJobIDs := make([]model.JobID, 0, numPreparations*numJobsPerPrep) + allFileIDs := make([]model.FileID, 0, numPreparations*numFilesPerPrep) + allCarIDs := make([]model.CarID, 0, numPreparations*numCarsPerPrep) + + for i := range numPreparations { + // Create preparation + prep := model.Preparation{ + Name: "test-prep-" + uuid.New().String(), + MaxSize: 1 << 30, + PieceSize: 1 << 30, + } + err := db.Create(&prep).Error + req.NoError(err) + preparations[i] = prep + + // ALL preparations attach to the SAME storage (production pattern!) + attachment := model.SourceAttachment{ + PreparationID: prep.ID, + StorageID: sharedStorage.ID, + } + err = db.Create(&attachment).Error + req.NoError(err) + attachments[i] = attachment + + // Create directories (will have circular references with files) + directories := make([]model.Directory, numDirsPerPrep) + for j := range numDirsPerPrep { + dir := model.Directory{ + Name: "dir-" + uuid.New().String()[:8], + AttachmentID: attachment.ID, + CID: model.CID(testutil.TestCid), + Data: []byte("test"), + } + err = db.Create(&dir).Error + req.NoError(err) + directories[j] = dir + } + + // Create files + for j := range numFilesPerPrep { + dirIdx := j % numDirsPerPrep + file := model.File{ + Path: "/test/file-" + uuid.New().String()[:8], + Size: 1024, + AttachmentID: attachment.ID, + DirectoryID: &directories[dirIdx].ID, + CID: model.CID(testutil.TestCid), + Hash: "test-hash", + LastModifiedNano: time.Now().UnixNano(), + } + err = db.Create(&file).Error + req.NoError(err) + allFileIDs = append(allFileIDs, file.ID) + } + + // Create jobs + for j := range numJobsPerPrep { + job := model.Job{ + Type: model.Pack, + State: model.Ready, + AttachmentID: attachment.ID, + } + err = db.Create(&job).Error + req.NoError(err) + allJobIDs = append(allJobIDs, job.ID) + + // Create file ranges for some jobs + if j%2 == 0 && len(allFileIDs) > 0 { + fileIdx := (i*numJobsPerPrep + j) % len(allFileIDs) + fileRange := model.FileRange{ + JobID: &job.ID, + FileID: allFileIDs[fileIdx], + Offset: 0, + Length: 1024, + CID: model.CID(testutil.TestCid), + } + err = db.Create(&fileRange).Error + req.NoError(err) + } + } + + // Create cars + for range numCarsPerPrep { + car := model.Car{ + PreparationID: prep.ID, + AttachmentID: &attachment.ID, + PieceCID: model.CID(testutil.TestCid), + PieceSize: 1 << 20, + RootCID: model.CID(testutil.TestCid), + FileSize: 1024, + StoragePath: "/test/car-" + uuid.New().String()[:8], + } + err = db.Create(&car).Error + req.NoError(err) + allCarIDs = append(allCarIDs, car.ID) + + // Create car blocks + for k := range numBlocksPerCar { + block := model.CarBlock{ + CarID: car.ID, + CID: model.CID(testutil.TestCid), + CarOffset: int64(k * 1024), + CarBlockLength: 1024, + Varint: []byte{0x01}, + RawBlock: []byte("test-block"), + FileOffset: int64(k * 1024), + } + err = db.Create(&block).Error + req.NoError(err) + } + } + } + + // Run concurrent operations that could previously cause deadlock + const numIterations = 15 + var wg sync.WaitGroup + errChan := make(chan error, numIterations*4) + + for i := range numIterations { + wg.Add(4) + + // Goroutine 1: Delete preparations (triggers CASCADE) + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%3) * time.Millisecond) + + deleteCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + prepIdx := iteration % numPreparations + handler := DefaultHandler{} + err := handler.RemovePreparationHandler(deleteCtx, db, preparations[prepIdx].Name, RemoveRequest{}) + if err != nil && deleteCtx.Err() == nil { + // Ignore expected errors: + // - "not found" errors (prep already deleted by another goroutine) + // - "active jobs" errors (concurrent job updates made jobs active) + if !isNotFoundError(err) && !isActiveJobsError(err) { + errChan <- err + } + } + }(i) + + // Goroutine 2: Bulk job updates (simulating pack/pause operations) + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%5) * time.Millisecond) + + updateCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Update a batch of jobs + startIdx := (iteration * 5) % len(allJobIDs) + endIdx := min(startIdx+8, len(allJobIDs)) + batchJobIDs := allJobIDs[startIdx:endIdx] + + err := db.WithContext(updateCtx).Transaction(func(tx *gorm.DB) error { + for _, jobID := range batchJobIDs { + err := tx.Model(&model.Job{}). + Where("id = ?", jobID). + Updates(map[string]any{ + "state": model.Processing, + "error_message": "", + "error_stack_trace": "", + }).Error + if err != nil { + return err + } + } + return nil + }) + + if err != nil && updateCtx.Err() == nil { + errChan <- err + } + }(i) + + // Goroutine 3: File updates (simulating scan operations) + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%7) * time.Millisecond) + + updateCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Update some files + if len(allFileIDs) > 0 { + fileIdx := iteration % len(allFileIDs) + err := db.WithContext(updateCtx).Model(&model.File{}). + Where("id = ?", allFileIDs[fileIdx]). + Update("size", 2048).Error + + if err != nil && updateCtx.Err() == nil { + errChan <- err + } + } + }(i) + + // Goroutine 4: Car block creation (simulating pack completion) + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%4) * time.Millisecond) + + createCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Try to create a car block (might fail if car already deleted) + if len(allCarIDs) > 0 { + carIdx := iteration % len(allCarIDs) + block := model.CarBlock{ + CarID: allCarIDs[carIdx], + CID: model.CID(testutil.TestCid), + CarOffset: int64(1000 + iteration), + CarBlockLength: 512, + Varint: []byte{0x02}, + RawBlock: []byte("new-block"), + FileOffset: int64(1000 + iteration), + } + err := db.WithContext(createCtx).Create(&block).Error + + // Ignore FK errors (car might have been deleted) + if err != nil && createCtx.Err() == nil && !isFKError(err) { + errChan <- err + } + } + }(i) + } + + // Wait for all operations to complete + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + // Wait with timeout + select { + case <-done: + // Success - no deadlock + case err := <-errChan: + req.NoError(err, "Unexpected error during concurrent operations") + case <-time.After(45 * time.Second): + req.Fail("Test timed out - likely deadlock occurred") + } + + // Verify no deadlock occurred (test completed successfully) + // We don't verify complete cleanup because: + // - Some deletions may have been blocked by active jobs (expected) + // - Some operations may still be pending when test ends + // - The important thing is we didn't deadlock + // + // If we want to verify actual cleanup, we'd need to: + // 1. Pause all jobs first + // 2. Wait for all deletions to complete + // 3. Then verify cleanup + // But that's not the point of this test - we're testing for deadlocks, not cleanup. + + t.Logf("Test completed without deadlock") + }) +} + +// isNotFoundError checks if error is a "not found" error +func isNotFoundError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return contains(errStr, "not found") || contains(errStr, "does not exist") +} + +// isActiveJobsError checks if error is about active jobs preventing deletion +func isActiveJobsError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return contains(errStr, "active jobs") +} + +// isFKError checks if error is a foreign key constraint violation +func isFKError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return contains(errStr, "foreign key") || contains(errStr, "FOREIGN KEY") || + contains(errStr, "violates") || contains(errStr, "constraint") +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) +} + +func findSubstring(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/model/migrate_integration_test.go b/model/migrate_integration_test.go new file mode 100644 index 00000000..28850cd8 --- /dev/null +++ b/model/migrate_integration_test.go @@ -0,0 +1,3 @@ +package model + +// moved to handler/admin to avoid import cycle From a22c7e9afdf155a63e47ac973dd86d65b9bdffff Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 12:48:38 +0100 Subject: [PATCH 06/15] stray file --- model/migrate_integration_test.go | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 model/migrate_integration_test.go diff --git a/model/migrate_integration_test.go b/model/migrate_integration_test.go deleted file mode 100644 index 28850cd8..00000000 --- a/model/migrate_integration_test.go +++ /dev/null @@ -1,3 +0,0 @@ -package model - -// moved to handler/admin to avoid import cycle From 77d9debc555ac705c160a4993a55215feeca87e7 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 12:49:15 +0100 Subject: [PATCH 07/15] correct test file for healthcheck --- .../healthcheck/healthcheck_deadlock_test.go | 181 ++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 service/healthcheck/healthcheck_deadlock_test.go diff --git a/service/healthcheck/healthcheck_deadlock_test.go b/service/healthcheck/healthcheck_deadlock_test.go new file mode 100644 index 00000000..0e03cbed --- /dev/null +++ b/service/healthcheck/healthcheck_deadlock_test.go @@ -0,0 +1,181 @@ +package healthcheck + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +// TestHealthCheckCleanupNoDeadlock verifies that concurrent worker cleanup and bulk job +// updates do not deadlock. Worker cleanup triggers FK CASCADE to jobs while bulk updates +// modify jobs directly. The fix uses FOR UPDATE SKIP LOCKED. +func TestHealthCheckCleanupNoDeadlock(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + req := require.New(t) + + // Create test preparation and storage for jobs + preparation := model.Preparation{ + Name: "test-prep-" + uuid.New().String(), + MaxSize: 1 << 30, + PieceSize: 1 << 30, + } + err := db.Create(&preparation).Error + req.NoError(err) + + storage := model.Storage{ + Name: "test-storage-" + uuid.New().String(), + Type: "local", + Path: "/tmp/test", + } + err = db.Create(&storage).Error + req.NoError(err) + + attachment := model.SourceAttachment{ + PreparationID: preparation.ID, + StorageID: storage.ID, + } + err = db.Create(&attachment).Error + req.NoError(err) + + // Create workers and jobs assigned to them + const numWorkers = 5 + const numJobsPerWorker = 10 + workerIDs := make([]string, numWorkers) + jobIDs := make([]model.JobID, 0, numWorkers*numJobsPerWorker) + + for i := range numWorkers { + workerID := uuid.New().String() + workerIDs[i] = workerID + + // Create worker with old heartbeat (will be considered stale) + worker := model.Worker{ + ID: workerID, + LastHeartbeat: time.Now().UTC().Add(-10 * time.Minute), // Stale + Hostname: "test-host", + Type: model.DatasetWorker, + } + err = db.Create(&worker).Error + req.NoError(err) + + // Create jobs assigned to this worker + for range numJobsPerWorker { + job := model.Job{ + Type: model.Pack, + State: model.Processing, + WorkerID: &workerID, + AttachmentID: attachment.ID, + } + err = db.Create(&job).Error + req.NoError(err) + jobIDs = append(jobIDs, job.ID) + } + } + + // Run concurrent operations that could previously cause deadlock + const numIterations = 20 + var wg sync.WaitGroup + errChan := make(chan error, numIterations*2) + + // Override stale threshold temporarily for this test + oldThreshold := staleThreshold + staleThreshold = 5 * time.Minute + defer func() { + staleThreshold = oldThreshold + }() + + for i := range numIterations { + wg.Add(2) + + // Goroutine 1: Worker cleanup (deletes workers, triggers CASCADE) + go func(iteration int) { + defer wg.Done() + // Add slight randomization to increase chance of lock conflicts + time.Sleep(time.Duration(iteration%3) * time.Millisecond) + + cleanupCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + HealthCheckCleanup(cleanupCtx, db) + }(i) + + // Goroutine 2: Bulk job update (updates multiple jobs) + go func(iteration int) { + defer wg.Done() + // Add slight randomization to increase chance of lock conflicts + time.Sleep(time.Duration(iteration%5) * time.Millisecond) + + updateCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // Simulate bulk update like in handler/job/pack.go + // Update a subset of jobs to Ready state + startIdx := (iteration * 5) % len(jobIDs) + endIdx := min(startIdx+10, len(jobIDs)) + batchJobIDs := jobIDs[startIdx:endIdx] + + err := db.WithContext(updateCtx).Transaction(func(tx *gorm.DB) error { + // This pattern previously could deadlock with worker cleanup + for _, jobID := range batchJobIDs { + err := tx.Model(&model.Job{}). + Where("id = ?", jobID). + Updates(map[string]any{ + "state": model.Ready, + "error_message": "", + "error_stack_trace": "", + }).Error + if err != nil { + return err + } + } + return nil + }) + + if err != nil && ctx.Err() == nil { + errChan <- err + } + }(i) + } + + // Wait for all operations to complete + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + // Wait with timeout + select { + case <-done: + // Success - no deadlock + case err := <-errChan: + req.NoError(err, "Unexpected error during concurrent operations") + case <-time.After(30 * time.Second): + req.Fail("Test timed out - likely deadlock occurred") + } + + // Verify final state consistency + // All workers should be deleted + var remainingWorkers []model.Worker + err = db.Where("id IN ?", workerIDs).Find(&remainingWorkers).Error + req.NoError(err) + req.Empty(remainingWorkers, "All stale workers should be deleted") + + // All jobs should have worker_id set to NULL and be in Ready or Complete state + var remainingJobs []model.Job + err = db.Where("id IN ?", jobIDs).Find(&remainingJobs).Error + req.NoError(err) + + for _, job := range remainingJobs { + req.Nil(job.WorkerID, "Job %d should have NULL worker_id", job.ID) + req.Contains([]model.JobState{model.Ready, model.Complete}, job.State, + "Job %d should be in Ready or Complete state", job.ID) + } + }) +} From 5e59ea2d38b28f8238ee872b02644b24f962795e Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 13:00:11 +0100 Subject: [PATCH 08/15] test cleanup --- handler/dataprep/remove_deadlock_test.go | 120 ++++++------------ .../healthcheck/healthcheck_deadlock_test.go | 31 +---- 2 files changed, 43 insertions(+), 108 deletions(-) diff --git a/handler/dataprep/remove_deadlock_test.go b/handler/dataprep/remove_deadlock_test.go index 9c8c0b0d..4c62c924 100644 --- a/handler/dataprep/remove_deadlock_test.go +++ b/handler/dataprep/remove_deadlock_test.go @@ -2,6 +2,7 @@ package dataprep import ( "context" + "strings" "sync" "testing" "time" @@ -21,8 +22,7 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { req := require.New(t) - // Create test data: multiple preparations sharing the SAME storage (realistic production scenario) - // When deleting one prep, CASCADE affects tables shared with other preps on same storage + // Create test data: multiple preparations sharing one storage const numPreparations = 3 const numFilesPerPrep = 50 const numDirsPerPrep = 10 @@ -30,7 +30,7 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { const numCarsPerPrep = 5 const numBlocksPerCar = 20 - // Create ONE shared storage (this is the key to reproducing the real deadlock) + // Create shared storage sharedStorage := model.Storage{ Name: "shared-storage-" + uuid.New().String(), Type: "local", @@ -56,7 +56,7 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { req.NoError(err) preparations[i] = prep - // ALL preparations attach to the SAME storage (production pattern!) + // Attach to shared storage attachment := model.SourceAttachment{ PreparationID: prep.ID, StorageID: sharedStorage.ID, @@ -65,7 +65,7 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { req.NoError(err) attachments[i] = attachment - // Create directories (will have circular references with files) + // Create directories directories := make([]model.Directory, numDirsPerPrep) for j := range numDirsPerPrep { dir := model.Directory{ @@ -154,7 +154,6 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { } } - // Run concurrent operations that could previously cause deadlock const numIterations = 15 var wg sync.WaitGroup errChan := make(chan error, numIterations*4) @@ -162,7 +161,7 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { for i := range numIterations { wg.Add(4) - // Goroutine 1: Delete preparations (triggers CASCADE) + // Goroutine 1: Delete preparations go func(iteration int) { defer wg.Done() time.Sleep(time.Duration(iteration%3) * time.Millisecond) @@ -174,16 +173,14 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { handler := DefaultHandler{} err := handler.RemovePreparationHandler(deleteCtx, db, preparations[prepIdx].Name, RemoveRequest{}) if err != nil && deleteCtx.Err() == nil { - // Ignore expected errors: - // - "not found" errors (prep already deleted by another goroutine) - // - "active jobs" errors (concurrent job updates made jobs active) + // Ignore "not found" and "active jobs" errors if !isNotFoundError(err) && !isActiveJobsError(err) { errChan <- err } } }(i) - // Goroutine 2: Bulk job updates (simulating pack/pause operations) + // Goroutine 2: Bulk job updates go func(iteration int) { defer wg.Done() time.Sleep(time.Duration(iteration%5) * time.Millisecond) @@ -191,7 +188,6 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { updateCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - // Update a batch of jobs startIdx := (iteration * 5) % len(allJobIDs) endIdx := min(startIdx+8, len(allJobIDs)) batchJobIDs := allJobIDs[startIdx:endIdx] @@ -217,7 +213,7 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { } }(i) - // Goroutine 3: File updates (simulating scan operations) + // Goroutine 3: File updates go func(iteration int) { defer wg.Done() time.Sleep(time.Duration(iteration%7) * time.Millisecond) @@ -225,20 +221,17 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { updateCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - // Update some files - if len(allFileIDs) > 0 { - fileIdx := iteration % len(allFileIDs) - err := db.WithContext(updateCtx).Model(&model.File{}). - Where("id = ?", allFileIDs[fileIdx]). - Update("size", 2048).Error + fileIdx := iteration % len(allFileIDs) + err := db.WithContext(updateCtx).Model(&model.File{}). + Where("id = ?", allFileIDs[fileIdx]). + Update("size", 2048).Error - if err != nil && updateCtx.Err() == nil { - errChan <- err - } + if err != nil && updateCtx.Err() == nil { + errChan <- err } }(i) - // Goroutine 4: Car block creation (simulating pack completion) + // Goroutine 4: Car block creation go func(iteration int) { defer wg.Done() time.Sleep(time.Duration(iteration%4) * time.Millisecond) @@ -246,98 +239,57 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { createCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - // Try to create a car block (might fail if car already deleted) - if len(allCarIDs) > 0 { - carIdx := iteration % len(allCarIDs) - block := model.CarBlock{ - CarID: allCarIDs[carIdx], - CID: model.CID(testutil.TestCid), - CarOffset: int64(1000 + iteration), - CarBlockLength: 512, - Varint: []byte{0x02}, - RawBlock: []byte("new-block"), - FileOffset: int64(1000 + iteration), - } - err := db.WithContext(createCtx).Create(&block).Error + carIdx := iteration % len(allCarIDs) + block := model.CarBlock{ + CarID: allCarIDs[carIdx], + CID: model.CID(testutil.TestCid), + CarOffset: int64(1000 + iteration), + CarBlockLength: 512, + Varint: []byte{0x02}, + RawBlock: []byte("new-block"), + FileOffset: int64(1000 + iteration), + } + err := db.WithContext(createCtx).Create(&block).Error - // Ignore FK errors (car might have been deleted) - if err != nil && createCtx.Err() == nil && !isFKError(err) { - errChan <- err - } + // Ignore FK errors + if err != nil && createCtx.Err() == nil && !isFKError(err) { + errChan <- err } }(i) } - // Wait for all operations to complete done := make(chan struct{}) go func() { wg.Wait() close(done) }() - // Wait with timeout select { case <-done: - // Success - no deadlock case err := <-errChan: req.NoError(err, "Unexpected error during concurrent operations") case <-time.After(45 * time.Second): req.Fail("Test timed out - likely deadlock occurred") } - // Verify no deadlock occurred (test completed successfully) - // We don't verify complete cleanup because: - // - Some deletions may have been blocked by active jobs (expected) - // - Some operations may still be pending when test ends - // - The important thing is we didn't deadlock - // - // If we want to verify actual cleanup, we'd need to: - // 1. Pause all jobs first - // 2. Wait for all deletions to complete - // 3. Then verify cleanup - // But that's not the point of this test - we're testing for deadlocks, not cleanup. - + // We don't verify complete cleanup because some deletions may be blocked by active jobs. t.Logf("Test completed without deadlock") }) } -// isNotFoundError checks if error is a "not found" error func isNotFoundError(err error) bool { - if err == nil { - return false - } - errStr := err.Error() - return contains(errStr, "not found") || contains(errStr, "does not exist") + return err != nil && (strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "does not exist")) } -// isActiveJobsError checks if error is about active jobs preventing deletion func isActiveJobsError(err error) bool { - if err == nil { - return false - } - errStr := err.Error() - return contains(errStr, "active jobs") + return err != nil && strings.Contains(err.Error(), "active jobs") } -// isFKError checks if error is a foreign key constraint violation func isFKError(err error) bool { if err == nil { return false } - errStr := err.Error() - return contains(errStr, "foreign key") || contains(errStr, "FOREIGN KEY") || - contains(errStr, "violates") || contains(errStr, "constraint") -} - -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) -} - -func findSubstring(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } - } - return false + s := err.Error() + return strings.Contains(s, "foreign key") || strings.Contains(s, "FOREIGN KEY") || + strings.Contains(s, "violates") || strings.Contains(s, "constraint") } diff --git a/service/healthcheck/healthcheck_deadlock_test.go b/service/healthcheck/healthcheck_deadlock_test.go index 0e03cbed..1041fb4a 100644 --- a/service/healthcheck/healthcheck_deadlock_test.go +++ b/service/healthcheck/healthcheck_deadlock_test.go @@ -54,10 +54,10 @@ func TestHealthCheckCleanupNoDeadlock(t *testing.T) { workerID := uuid.New().String() workerIDs[i] = workerID - // Create worker with old heartbeat (will be considered stale) + // Create worker with old heartbeat worker := model.Worker{ ID: workerID, - LastHeartbeat: time.Now().UTC().Add(-10 * time.Minute), // Stale + LastHeartbeat: time.Now().UTC().Add(-10 * time.Minute), Hostname: "test-host", Type: model.DatasetWorker, } @@ -78,25 +78,16 @@ func TestHealthCheckCleanupNoDeadlock(t *testing.T) { } } - // Run concurrent operations that could previously cause deadlock const numIterations = 20 var wg sync.WaitGroup errChan := make(chan error, numIterations*2) - // Override stale threshold temporarily for this test - oldThreshold := staleThreshold - staleThreshold = 5 * time.Minute - defer func() { - staleThreshold = oldThreshold - }() - for i := range numIterations { wg.Add(2) - // Goroutine 1: Worker cleanup (deletes workers, triggers CASCADE) + // Goroutine 1: Worker cleanup go func(iteration int) { defer wg.Done() - // Add slight randomization to increase chance of lock conflicts time.Sleep(time.Duration(iteration%3) * time.Millisecond) cleanupCtx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -105,23 +96,19 @@ func TestHealthCheckCleanupNoDeadlock(t *testing.T) { HealthCheckCleanup(cleanupCtx, db) }(i) - // Goroutine 2: Bulk job update (updates multiple jobs) + // Goroutine 2: Bulk job update go func(iteration int) { defer wg.Done() - // Add slight randomization to increase chance of lock conflicts time.Sleep(time.Duration(iteration%5) * time.Millisecond) updateCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - // Simulate bulk update like in handler/job/pack.go - // Update a subset of jobs to Ready state startIdx := (iteration * 5) % len(jobIDs) endIdx := min(startIdx+10, len(jobIDs)) batchJobIDs := jobIDs[startIdx:endIdx] err := db.WithContext(updateCtx).Transaction(func(tx *gorm.DB) error { - // This pattern previously could deadlock with worker cleanup for _, jobID := range batchJobIDs { err := tx.Model(&model.Job{}). Where("id = ?", jobID). @@ -137,37 +124,33 @@ func TestHealthCheckCleanupNoDeadlock(t *testing.T) { return nil }) - if err != nil && ctx.Err() == nil { + if err != nil && updateCtx.Err() == nil { errChan <- err } }(i) } - // Wait for all operations to complete done := make(chan struct{}) go func() { wg.Wait() close(done) }() - // Wait with timeout select { case <-done: - // Success - no deadlock case err := <-errChan: req.NoError(err, "Unexpected error during concurrent operations") case <-time.After(30 * time.Second): req.Fail("Test timed out - likely deadlock occurred") } - // Verify final state consistency - // All workers should be deleted + // Verify workers deleted var remainingWorkers []model.Worker err = db.Where("id IN ?", workerIDs).Find(&remainingWorkers).Error req.NoError(err) req.Empty(remainingWorkers, "All stale workers should be deleted") - // All jobs should have worker_id set to NULL and be in Ready or Complete state + // Verify jobs reset var remainingJobs []model.Job err = db.Where("id IN ?", jobIDs).Find(&remainingJobs).Error req.NoError(err) From 9e943ce8783298f92056af47acecc74b5bba3ccd Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 13:10:59 +0100 Subject: [PATCH 09/15] fix expectation, two-step lock+update --- handler/job/pack.go | 50 +++++++++++++------ service/healthcheck/healthcheck.go | 25 ++++++++-- .../healthcheck/healthcheck_deadlock_test.go | 18 +++---- 3 files changed, 63 insertions(+), 30 deletions(-) diff --git a/handler/job/pack.go b/handler/job/pack.go index 1cd2904c..d0b837ba 100644 --- a/handler/job/pack.go +++ b/handler/job/pack.go @@ -62,13 +62,26 @@ func (DefaultHandler) StartPackHandler( } // Update jobs one at a time with SKIP LOCKED to avoid deadlock with worker cleanup + // Use two-step approach: SELECT FOR UPDATE SKIP LOCKED, then UPDATE updatedJobs := make([]model.Job, 0, len(jobs)) for _, job := range jobs { - // Try to lock and update this job + // Try to lock this job + var lockedJob model.Job err = tx.Clauses(clause.Locking{ Strength: "UPDATE", Options: "SKIP LOCKED", - }).Model(&model.Job{}). + }).Where("id = ?", job.ID).First(&lockedJob).Error + + if errors.Is(err, gorm.ErrRecordNotFound) { + // Job was locked by someone else, skip it + continue + } + if err != nil { + return errors.WithStack(err) + } + + // Now update the job we successfully locked + err = tx.Model(&model.Job{}). Where("id = ?", job.ID). Updates(map[string]any{ "state": model.Ready, @@ -80,12 +93,8 @@ func (DefaultHandler) StartPackHandler( return errors.WithStack(err) } - // Check if we actually updated the job (RowsAffected would tell us) - // If SKIP LOCKED caused us to skip it, RowsAffected will be 0 - if tx.RowsAffected > 0 { - job.State = model.Ready - updatedJobs = append(updatedJobs, job) - } + job.State = model.Ready + updatedJobs = append(updatedJobs, job) } // Replace jobs with only the ones we successfully updated @@ -187,13 +196,26 @@ func (DefaultHandler) PausePackHandler( } // Update jobs one at a time with SKIP LOCKED to avoid deadlock with worker cleanup + // Use two-step approach: SELECT FOR UPDATE SKIP LOCKED, then UPDATE updatedJobs := make([]model.Job, 0, len(jobs)) for _, job := range jobs { - // Try to lock and update this job + // Try to lock this job + var lockedJob model.Job err = tx.Clauses(clause.Locking{ Strength: "UPDATE", Options: "SKIP LOCKED", - }).Model(&model.Job{}). + }).Where("id = ?", job.ID).First(&lockedJob).Error + + if errors.Is(err, gorm.ErrRecordNotFound) { + // Job was locked by someone else, skip it + continue + } + if err != nil { + return errors.WithStack(err) + } + + // Now update the job we successfully locked + err = tx.Model(&model.Job{}). Where("id = ?", job.ID). Update("state", model.Paused).Error @@ -201,12 +223,8 @@ func (DefaultHandler) PausePackHandler( return errors.WithStack(err) } - // Check if we actually updated the job (RowsAffected would tell us) - // If SKIP LOCKED caused us to skip it, RowsAffected will be 0 - if tx.RowsAffected > 0 { - job.State = model.Paused - updatedJobs = append(updatedJobs, job) - } + job.State = model.Paused + updatedJobs = append(updatedJobs, job) } // Replace jobs with only the ones we successfully updated diff --git a/service/healthcheck/healthcheck.go b/service/healthcheck/healthcheck.go index 0aef8526..4fceb107 100644 --- a/service/healthcheck/healthcheck.go +++ b/service/healthcheck/healthcheck.go @@ -123,10 +123,27 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { } } - // Now delete the workers. The FK CASCADE will be mostly a no-op since we already - // nullified worker_id on jobs we could lock. Any jobs we couldn't lock will be - // handled by the CASCADE (may still cause some lock contention but reduced). - return errors.WithStack(tx.Where("id IN ?", staleWorkerIDs).Delete(&model.Worker{}).Error) + // Only delete workers that have no remaining jobs with worker_id set + // This avoids deadlock from FK CASCADE trying to lock jobs that are locked by other transactions + for _, workerID := range staleWorkerIDs { + var remainingJobCount int64 + err = tx.Model(&model.Job{}). + Where("worker_id = ?", workerID). + Count(&remainingJobCount).Error + if err != nil { + return errors.WithStack(err) + } + + // Only delete if no jobs reference this worker anymore + if remainingJobCount == 0 { + err = tx.Where("id = ?", workerID).Delete(&model.Worker{}).Error + if err != nil { + return errors.WithStack(err) + } + } + // If remainingJobCount > 0, skip this worker and let next cleanup attempt handle it + } + return nil }) }) if err != nil && !errors.Is(err, context.Canceled) { diff --git a/service/healthcheck/healthcheck_deadlock_test.go b/service/healthcheck/healthcheck_deadlock_test.go index 1041fb4a..970a1892 100644 --- a/service/healthcheck/healthcheck_deadlock_test.go +++ b/service/healthcheck/healthcheck_deadlock_test.go @@ -144,21 +144,19 @@ func TestHealthCheckCleanupNoDeadlock(t *testing.T) { req.Fail("Test timed out - likely deadlock occurred") } - // Verify workers deleted - var remainingWorkers []model.Worker - err = db.Where("id IN ?", workerIDs).Find(&remainingWorkers).Error - req.NoError(err) - req.Empty(remainingWorkers, "All stale workers should be deleted") - - // Verify jobs reset + // Verify no deadlock - we don't require all workers to be deleted because + // workers with jobs locked by concurrent transactions will be skipped (correct behavior) var remainingJobs []model.Job err = db.Where("id IN ?", jobIDs).Find(&remainingJobs).Error req.NoError(err) + // All jobs should either be reset or still assigned (if locked during cleanup) for _, job := range remainingJobs { - req.Nil(job.WorkerID, "Job %d should have NULL worker_id", job.ID) - req.Contains([]model.JobState{model.Ready, model.Complete}, job.State, - "Job %d should be in Ready or Complete state", job.ID) + if job.WorkerID == nil { + req.Contains([]model.JobState{model.Ready, model.Complete}, job.State, + "Job %d with NULL worker_id should be Ready or Complete", job.ID) + } + // Jobs with non-null worker_id are jobs that were locked during cleanup - allowed } }) } From df5edebd26c58c092876096f6acb91e08154d9e4 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 13:25:22 +0100 Subject: [PATCH 10/15] fix job-in-use expectation, another select/update --- handler/dataprep/remove.go | 23 +++++++++++++++++++++++ handler/dataprep/remove_deadlock_test.go | 8 ++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/handler/dataprep/remove.go b/handler/dataprep/remove.go index 7a0c7475..56c3e730 100644 --- a/handler/dataprep/remove.go +++ b/handler/dataprep/remove.go @@ -12,6 +12,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rjNemo/underscore" "gorm.io/gorm" + "gorm.io/gorm/clause" ) type RemoveRequest struct { @@ -98,6 +99,28 @@ func (DefaultHandler) RemovePreparationHandler(ctx context.Context, db *gorm.DB, return errors.WithStack(err) } + // Step 5: Try to lock all jobs with SKIP LOCKED to detect concurrent activity + // This prevents deadlock with concurrent job updates + if len(jobIDs) > 0 { + var lockedJobs []model.Job + err = tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Select("id"). + Where("id IN ?", jobIDs). + Find(&lockedJobs).Error + if err != nil { + return errors.WithStack(err) + } + + // If we couldn't lock all jobs, some are being used by concurrent transactions + if len(lockedJobs) < len(jobIDs) { + return errors.Wrapf(handlererror.ErrInvalidParameter, + "preparation %s has jobs in use by concurrent operations (%d/%d locked)", + preparation.Name, len(lockedJobs), len(jobIDs)) + } + } + // Now delete in leaf-to-root order using materialized IDs: // 1. Delete car_blocks (leaf node) diff --git a/handler/dataprep/remove_deadlock_test.go b/handler/dataprep/remove_deadlock_test.go index 4c62c924..71659fa9 100644 --- a/handler/dataprep/remove_deadlock_test.go +++ b/handler/dataprep/remove_deadlock_test.go @@ -173,8 +173,8 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { handler := DefaultHandler{} err := handler.RemovePreparationHandler(deleteCtx, db, preparations[prepIdx].Name, RemoveRequest{}) if err != nil && deleteCtx.Err() == nil { - // Ignore "not found" and "active jobs" errors - if !isNotFoundError(err) && !isActiveJobsError(err) { + // Ignore expected errors from concurrent operations + if !isNotFoundError(err) && !isActiveJobsError(err) && !isJobsInUseError(err) { errChan <- err } } @@ -285,6 +285,10 @@ func isActiveJobsError(err error) bool { return err != nil && strings.Contains(err.Error(), "active jobs") } +func isJobsInUseError(err error) bool { + return err != nil && strings.Contains(err.Error(), "jobs in use") +} + func isFKError(err error) bool { if err == nil { return false From 9c74817bf0c3c95ded235230d9ea1b537480c9f4 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 13:46:19 +0100 Subject: [PATCH 11/15] defer loading associations after claiming --- service/datasetworker/find.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/service/datasetworker/find.go b/service/datasetworker/find.go index 7ded9956..92a79e92 100644 --- a/service/datasetworker/find.go +++ b/service/datasetworker/find.go @@ -33,11 +33,11 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo for _, jobType := range typesOrdered { err := database.DoRetry(ctx, func() error { return db.Transaction(func(db *gorm.DB) error { - err := db.Preload("Attachment.Preparation.OutputStorages").Preload("Attachment.Storage"). - Clauses(clause.Locking{ - Strength: "UPDATE", - Options: "SKIP LOCKED", - }). + // First, lock and claim the job without preloading (preload can interfere with locking) + err := db.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }). Where("type = ? AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). First(&job).Error if err != nil { @@ -48,12 +48,21 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo return errors.WithStack(err) } - return db.Model(&job). + // Update the job to claim it + err = db.Model(&job). Updates(map[string]any{ "state": model.Processing, "worker_id": w.id, "error_message": "", }).Error + if err != nil { + return errors.WithStack(err) + } + + // Now load the associations we need + err = db.Preload("Attachment.Preparation.OutputStorages").Preload("Attachment.Storage"). + First(&job, job.ID).Error + return errors.WithStack(err) }, txOpts) }) if err != nil { From d144bb13defd3b60c92efa1dfab822d5b1bbf84c Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 13:21:42 +0000 Subject: [PATCH 12/15] resolve non-deterministic deadlock --- service/healthcheck/healthcheck.go | 139 +++++++++++++---------------- 1 file changed, 60 insertions(+), 79 deletions(-) diff --git a/service/healthcheck/healthcheck.go b/service/healthcheck/healthcheck.go index 4fceb107..6c1df228 100644 --- a/service/healthcheck/healthcheck.go +++ b/service/healthcheck/healthcheck.go @@ -61,7 +61,7 @@ func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) { // // It first finds workers that haven't sent a heartbeat for a certain threshold (staleThreshold). // Then it explicitly updates jobs owned by those workers to prevent CASCADE lock contention. -// Finally, it removes the stale workers. +// Worker deletion is deferred to avoid deadlocks during concurrent operations. // // All database operations are retried on failure using the DoRetry function. // @@ -71,97 +71,78 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { db = db.WithContext(ctx) logger.Debugw("running healthcheck cleanup") - // Clean up stale workers and their jobs in a single transaction to ensure consistency - // and avoid FK CASCADE deadlocks by explicitly updating jobs first with SKIP LOCKED. - err := database.DoRetry(ctx, func() error { - return db.Transaction(func(tx *gorm.DB) error { - // Find stale workers - var staleWorkers []model.Worker - err := tx.Where("last_heartbeat < ?", time.Now().UTC().Add(-staleThreshold)).Find(&staleWorkers).Error - if err != nil { - return errors.WithStack(err) - } - - if len(staleWorkers) == 0 { - return nil - } - - staleWorkerIDs := make([]string, len(staleWorkers)) - for i, w := range staleWorkers { - staleWorkerIDs[i] = w.ID - } - - // Explicitly update jobs owned by stale workers with SKIP LOCKED to avoid deadlock - // with concurrent bulk job updates. We use a two-step approach (SELECT then UPDATE) - // to be compatible with both PostgreSQL and MySQL/MariaDB. - var jobsToUpdate []model.Job - err = tx.Clauses(clause.Locking{ - Strength: "UPDATE", - Options: "SKIP LOCKED", - }).Select("id"). - Where("worker_id IN ? AND state = ?", staleWorkerIDs, model.Processing). - Find(&jobsToUpdate).Error - if err != nil { - return errors.WithStack(err) - } + // Find stale workers + var staleWorkers []model.Worker + err := db.Where("last_heartbeat < ?", time.Now().UTC().Add(-staleThreshold)).Find(&staleWorkers).Error + if err != nil { + logger.Errorw("failed to find stale workers", "error", err) + return + } - // Update the jobs we successfully locked - if len(jobsToUpdate) > 0 { - jobIDsToUpdate := make([]model.JobID, len(jobsToUpdate)) - for i, job := range jobsToUpdate { - jobIDsToUpdate[i] = job.ID - } + if len(staleWorkers) == 0 { + return + } - err = tx.Model(&model.Job{}). - Where("id IN ?", jobIDsToUpdate). - Updates(map[string]any{ - "worker_id": nil, - "state": model.Ready, - }).Error + // Process each stale worker individually to avoid lock ordering issues + for _, worker := range staleWorkers { + // First, just update jobs without trying to delete the worker + err := database.DoRetry(ctx, func() error { + return db.Transaction(func(tx *gorm.DB) error { + // Lock and update jobs owned by this specific worker with SKIP LOCKED + var jobsToUpdate []model.Job + err := tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Select("id"). + Where("worker_id = ? AND state = ?", worker.ID, model.Processing). + Find(&jobsToUpdate).Error if err != nil { return errors.WithStack(err) } - } - // Only delete workers that have no remaining jobs with worker_id set - // This avoids deadlock from FK CASCADE trying to lock jobs that are locked by other transactions - for _, workerID := range staleWorkerIDs { - var remainingJobCount int64 - err = tx.Model(&model.Job{}). - Where("worker_id = ?", workerID). - Count(&remainingJobCount).Error - if err != nil { - return errors.WithStack(err) - } + // Update the jobs we successfully locked + if len(jobsToUpdate) > 0 { + jobIDsToUpdate := make([]model.JobID, len(jobsToUpdate)) + for i, job := range jobsToUpdate { + jobIDsToUpdate[i] = job.ID + } - // Only delete if no jobs reference this worker anymore - if remainingJobCount == 0 { - err = tx.Where("id = ?", workerID).Delete(&model.Worker{}).Error + err = tx.Model(&model.Job{}). + Where("id IN ?", jobIDsToUpdate). + Updates(map[string]any{ + "worker_id": nil, + "state": model.Ready, + }).Error if err != nil { return errors.WithStack(err) } } - // If remainingJobCount > 0, skip this worker and let next cleanup attempt handle it - } - return nil + return nil + }) }) - }) - if err != nil && !errors.Is(err, context.Canceled) { - logger.Errorw("failed to cleanup dead workers", "error", err) - } + if err != nil && !errors.Is(err, context.Canceled) { + logger.Errorw("failed to cleanup jobs for worker", "workerID", worker.ID, "error", err) + } - // Safety check: clean up any orphaned jobs that might have been missed - // (e.g., jobs we couldn't lock due to SKIP LOCKED, or jobs left in inconsistent state) - err = database.DoRetry(ctx, func() error { - return db.Model(&model.Job{}).Where("worker_id NOT IN (?) AND state = ?", - db.Table("workers").Select("id"), model.Processing). - Updates(map[string]any{ - "worker_id": nil, - "state": model.Ready, - }).Error - }) - if err != nil && !errors.Is(err, context.Canceled) { - logger.Errorw("failed to cleanup orphaned jobs", "error", err) + // Now check if the worker has any jobs left + var jobCount int64 + err = db.Model(&model.Job{}).Where("worker_id = ?", worker.ID).Count(&jobCount).Error + if err != nil { + logger.Errorw("failed to count jobs for worker", "workerID", worker.ID, "error", err) + continue + } + + // Only delete the worker if it has no jobs + if jobCount == 0 { + result := db.Where("id = ?", worker.ID).Delete(&model.Worker{}) + if result.Error != nil { + logger.Errorw("failed to delete worker", "workerID", worker.ID, "error", result.Error) + } else if result.RowsAffected > 0 { + logger.Debugw("deleted stale worker", "workerID", worker.ID) + } + } else { + logger.Debugw("worker still has jobs, will retry later", "workerID", worker.ID, "jobCount", jobCount) + } } } From dc8f04fec9213f526563d0e31f4552d17a30006b Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 14:23:57 +0100 Subject: [PATCH 13/15] actually print deadlocks when detected instead of guessing --- .devcontainer/start-mysql.sh | 1 + database/deadlock_debug.go | 81 +++++++++++++++++++ handler/dataprep/remove_deadlock_test.go | 8 ++ .../healthcheck/healthcheck_deadlock_test.go | 9 +++ util/testutil/deadlock.go | 36 +++++++++ 5 files changed, 135 insertions(+) create mode 100644 database/deadlock_debug.go create mode 100644 util/testutil/deadlock.go diff --git a/.devcontainer/start-mysql.sh b/.devcontainer/start-mysql.sh index 503a7d41..8a95e923 100755 --- a/.devcontainer/start-mysql.sh +++ b/.devcontainer/start-mysql.sh @@ -26,6 +26,7 @@ nohup mariadbd \ --port="${PORT}" \ --skip-name-resolve \ --log-error="${LOG_FILE}" \ + --innodb-print-all-deadlocks=ON \ >/dev/null 2>&1 & # Wait for MySQL to be ready diff --git a/database/deadlock_debug.go b/database/deadlock_debug.go new file mode 100644 index 00000000..ab48b3f1 --- /dev/null +++ b/database/deadlock_debug.go @@ -0,0 +1,81 @@ +package database + +import ( + "strings" + + "gorm.io/gorm" +) + +// PrintDeadlockInfo prints detailed deadlock information from MySQL/MariaDB InnoDB status. +// This should be called when a deadlock error is detected to help diagnose the issue. +// Returns the deadlock information as a string, or empty string if not available. +func PrintDeadlockInfo(db *gorm.DB) string { + // Check if this is MySQL/MariaDB + var dbType string + db.Raw("SELECT VERSION()").Scan(&dbType) + if !strings.Contains(strings.ToLower(dbType), "maria") && !strings.Contains(strings.ToLower(dbType), "mysql") { + return "" + } + + // Get InnoDB status + var results []map[string]interface{} + err := db.Raw("SHOW ENGINE INNODB STATUS").Scan(&results).Error + if err != nil || len(results) == 0 { + return "" + } + + // Extract status from result + status, ok := results[0]["Status"].(string) + if !ok { + return "" + } + + // Extract just the deadlock section + if idx := strings.Index(status, "LATEST DETECTED DEADLOCK"); idx >= 0 { + endIdx := strings.Index(status[idx:], "--------\nTRANSACTIONS") + if endIdx > 0 { + return status[idx : idx+endIdx] + } + // If no TRANSACTIONS section found, just return everything after deadlock + return status[idx:] + } + + return "" +} + +// EnableDeadlockLogging enables logging of all deadlocks to the MySQL error log. +// By default, MySQL/MariaDB only logs the most recent deadlock. +// This setting persists until the server is restarted. +func EnableDeadlockLogging(db *gorm.DB) error { + return db.Exec("SET GLOBAL innodb_print_all_deadlocks = ON").Error +} + +// CheckDeadlockLoggingEnabled checks if innodb_print_all_deadlocks is enabled. +func CheckDeadlockLoggingEnabled(db *gorm.DB) (bool, error) { + var value string + err := db.Raw("SHOW VARIABLES LIKE 'innodb_print_all_deadlocks'").Scan(&value).Error + if err != nil { + return false, err + } + return strings.ToLower(value) == "on", nil +} + +// GetDataLockWaits returns current lock wait information from performance_schema. +// This requires MySQL 8.0.30+ or MariaDB 10.5+. +func GetDataLockWaits(db *gorm.DB) ([]map[string]interface{}, error) { + var results []map[string]interface{} + err := db.Raw("SELECT * FROM performance_schema.data_lock_waits").Scan(&results).Error + return results, err +} + +// GetLockWaitTransactions returns transactions currently waiting for locks. +// This requires MySQL 8.0.30+ or MariaDB 10.5+. +func GetLockWaitTransactions(db *gorm.DB) ([]map[string]interface{}, error) { + var results []map[string]interface{} + err := db.Raw(` + SELECT * FROM performance_schema.events_transactions_current + WHERE STATE = 'ACTIVE' + AND AUTOCOMMIT = 'NO' + `).Scan(&results).Error + return results, err +} diff --git a/handler/dataprep/remove_deadlock_test.go b/handler/dataprep/remove_deadlock_test.go index 71659fa9..45f66d20 100644 --- a/handler/dataprep/remove_deadlock_test.go +++ b/handler/dataprep/remove_deadlock_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/util/testutil" "github.com/google/uuid" @@ -21,6 +22,7 @@ import ( func TestRemovePreparationNoDeadlock(t *testing.T) { testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { req := require.New(t) + testutil.EnableDeadlockLogging(t, db) // Create test data: multiple preparations sharing one storage const numPreparations = 3 @@ -209,6 +211,12 @@ func TestRemovePreparationNoDeadlock(t *testing.T) { }) if err != nil && updateCtx.Err() == nil { + // If it's a deadlock error, get InnoDB status + if strings.Contains(err.Error(), "Deadlock") { + if deadlockInfo := database.PrintDeadlockInfo(db); deadlockInfo != "" { + t.Logf("\n%s", deadlockInfo) + } + } errChan <- err } }(i) diff --git a/service/healthcheck/healthcheck_deadlock_test.go b/service/healthcheck/healthcheck_deadlock_test.go index 970a1892..04fb8ea5 100644 --- a/service/healthcheck/healthcheck_deadlock_test.go +++ b/service/healthcheck/healthcheck_deadlock_test.go @@ -2,10 +2,12 @@ package healthcheck import ( "context" + "strings" "sync" "testing" "time" + "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/util/testutil" "github.com/google/uuid" @@ -19,6 +21,7 @@ import ( func TestHealthCheckCleanupNoDeadlock(t *testing.T) { testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { req := require.New(t) + testutil.EnableDeadlockLogging(t, db) // Create test preparation and storage for jobs preparation := model.Preparation{ @@ -125,6 +128,12 @@ func TestHealthCheckCleanupNoDeadlock(t *testing.T) { }) if err != nil && updateCtx.Err() == nil { + // If it's a deadlock error, get InnoDB status + if strings.Contains(err.Error(), "Deadlock") { + if deadlockInfo := database.PrintDeadlockInfo(db); deadlockInfo != "" { + t.Logf("\n%s", deadlockInfo) + } + } errChan <- err } }(i) diff --git a/util/testutil/deadlock.go b/util/testutil/deadlock.go new file mode 100644 index 00000000..f2ea21e4 --- /dev/null +++ b/util/testutil/deadlock.go @@ -0,0 +1,36 @@ +package testutil + +import ( + "testing" + + "github.com/data-preservation-programs/singularity/database" + "gorm.io/gorm" +) + +// EnableDeadlockLogging enables comprehensive deadlock logging for MySQL/MariaDB tests. +// This should be called early in tests that may encounter deadlocks. +// It will: +// - Enable innodb_print_all_deadlocks (logs all deadlocks to error log, not just the last one) +// - Log the current state of deadlock logging +// +// Note: innodb_print_all_deadlocks requires SUPER privilege and persists until server restart. +func EnableDeadlockLogging(t *testing.T, db *gorm.DB) { + // Try to enable it (may fail if already enabled or insufficient privileges) + err := database.EnableDeadlockLogging(db) + if err != nil { + t.Logf("Note: Could not enable innodb_print_all_deadlocks: %v (may not have SUPER privilege)", err) + } + + // Check if it's enabled + enabled, err := database.CheckDeadlockLoggingEnabled(db) + if err != nil { + t.Logf("Note: Could not check innodb_print_all_deadlocks status: %v", err) + return + } + + if enabled { + t.Logf("Deadlock logging enabled: all deadlocks will be logged to MySQL error log") + } else { + t.Logf("Deadlock logging not enabled: only the most recent deadlock will be available") + } +} From 2b6bd2ada65c3b20734006a809074dd27dee4c71 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Tue, 11 Nov 2025 15:52:09 +0100 Subject: [PATCH 14/15] less dumb dialect check --- database/deadlock_debug.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/database/deadlock_debug.go b/database/deadlock_debug.go index ab48b3f1..ae0f9a5a 100644 --- a/database/deadlock_debug.go +++ b/database/deadlock_debug.go @@ -10,10 +10,7 @@ import ( // This should be called when a deadlock error is detected to help diagnose the issue. // Returns the deadlock information as a string, or empty string if not available. func PrintDeadlockInfo(db *gorm.DB) string { - // Check if this is MySQL/MariaDB - var dbType string - db.Raw("SELECT VERSION()").Scan(&dbType) - if !strings.Contains(strings.ToLower(dbType), "maria") && !strings.Contains(strings.ToLower(dbType), "mysql") { + if db.Dialector.Name() != "mysql" { return "" } From 5144599ce954186b9bdd09b31105953a7e45d020 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Wed, 12 Nov 2025 15:49:29 +0100 Subject: [PATCH 15/15] better debug sanity --- database/deadlock_debug.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/database/deadlock_debug.go b/database/deadlock_debug.go index ae0f9a5a..1a60a2e1 100644 --- a/database/deadlock_debug.go +++ b/database/deadlock_debug.go @@ -44,22 +44,34 @@ func PrintDeadlockInfo(db *gorm.DB) string { // By default, MySQL/MariaDB only logs the most recent deadlock. // This setting persists until the server is restarted. func EnableDeadlockLogging(db *gorm.DB) error { + if db.Dialector.Name() != "mysql" { + return nil + } return db.Exec("SET GLOBAL innodb_print_all_deadlocks = ON").Error } // CheckDeadlockLoggingEnabled checks if innodb_print_all_deadlocks is enabled. func CheckDeadlockLoggingEnabled(db *gorm.DB) (bool, error) { - var value string - err := db.Raw("SHOW VARIABLES LIKE 'innodb_print_all_deadlocks'").Scan(&value).Error + if db.Dialector.Name() != "mysql" { + return false, nil + } + var result struct { + VariableName string `gorm:"column:Variable_name"` + Value string `gorm:"column:Value"` + } + err := db.Raw("SHOW VARIABLES LIKE 'innodb_print_all_deadlocks'").Scan(&result).Error if err != nil { return false, err } - return strings.ToLower(value) == "on", nil + return strings.ToLower(result.Value) == "on", nil } // GetDataLockWaits returns current lock wait information from performance_schema. // This requires MySQL 8.0.30+ or MariaDB 10.5+. func GetDataLockWaits(db *gorm.DB) ([]map[string]interface{}, error) { + if db.Dialector.Name() != "mysql" { + return nil, nil + } var results []map[string]interface{} err := db.Raw("SELECT * FROM performance_schema.data_lock_waits").Scan(&results).Error return results, err @@ -68,6 +80,9 @@ func GetDataLockWaits(db *gorm.DB) ([]map[string]interface{}, error) { // GetLockWaitTransactions returns transactions currently waiting for locks. // This requires MySQL 8.0.30+ or MariaDB 10.5+. func GetLockWaitTransactions(db *gorm.DB) ([]map[string]interface{}, error) { + if db.Dialector.Name() != "mysql" { + return nil, nil + } var results []map[string]interface{} err := db.Raw(` SELECT * FROM performance_schema.events_transactions_current