diff --git a/.devcontainer/start-mysql.sh b/.devcontainer/start-mysql.sh index 503a7d41..8a95e923 100755 --- a/.devcontainer/start-mysql.sh +++ b/.devcontainer/start-mysql.sh @@ -26,6 +26,7 @@ nohup mariadbd \ --port="${PORT}" \ --skip-name-resolve \ --log-error="${LOG_FILE}" \ + --innodb-print-all-deadlocks=ON \ >/dev/null 2>&1 & # Wait for MySQL to be ready diff --git a/database/deadlock_debug.go b/database/deadlock_debug.go new file mode 100644 index 00000000..1a60a2e1 --- /dev/null +++ b/database/deadlock_debug.go @@ -0,0 +1,93 @@ +package database + +import ( + "strings" + + "gorm.io/gorm" +) + +// PrintDeadlockInfo prints detailed deadlock information from MySQL/MariaDB InnoDB status. +// This should be called when a deadlock error is detected to help diagnose the issue. +// Returns the deadlock information as a string, or empty string if not available. +func PrintDeadlockInfo(db *gorm.DB) string { + if db.Dialector.Name() != "mysql" { + return "" + } + + // Get InnoDB status + var results []map[string]interface{} + err := db.Raw("SHOW ENGINE INNODB STATUS").Scan(&results).Error + if err != nil || len(results) == 0 { + return "" + } + + // Extract status from result + status, ok := results[0]["Status"].(string) + if !ok { + return "" + } + + // Extract just the deadlock section + if idx := strings.Index(status, "LATEST DETECTED DEADLOCK"); idx >= 0 { + endIdx := strings.Index(status[idx:], "--------\nTRANSACTIONS") + if endIdx > 0 { + return status[idx : idx+endIdx] + } + // If no TRANSACTIONS section found, just return everything after deadlock + return status[idx:] + } + + return "" +} + +// EnableDeadlockLogging enables logging of all deadlocks to the MySQL error log. +// By default, MySQL/MariaDB only logs the most recent deadlock. +// This setting persists until the server is restarted. +func EnableDeadlockLogging(db *gorm.DB) error { + if db.Dialector.Name() != "mysql" { + return nil + } + return db.Exec("SET GLOBAL innodb_print_all_deadlocks = ON").Error +} + +// CheckDeadlockLoggingEnabled checks if innodb_print_all_deadlocks is enabled. +func CheckDeadlockLoggingEnabled(db *gorm.DB) (bool, error) { + if db.Dialector.Name() != "mysql" { + return false, nil + } + var result struct { + VariableName string `gorm:"column:Variable_name"` + Value string `gorm:"column:Value"` + } + err := db.Raw("SHOW VARIABLES LIKE 'innodb_print_all_deadlocks'").Scan(&result).Error + if err != nil { + return false, err + } + return strings.ToLower(result.Value) == "on", nil +} + +// GetDataLockWaits returns current lock wait information from performance_schema. +// This requires MySQL 8.0.30+ or MariaDB 10.5+. +func GetDataLockWaits(db *gorm.DB) ([]map[string]interface{}, error) { + if db.Dialector.Name() != "mysql" { + return nil, nil + } + var results []map[string]interface{} + err := db.Raw("SELECT * FROM performance_schema.data_lock_waits").Scan(&results).Error + return results, err +} + +// GetLockWaitTransactions returns transactions currently waiting for locks. +// This requires MySQL 8.0.30+ or MariaDB 10.5+. +func GetLockWaitTransactions(db *gorm.DB) ([]map[string]interface{}, error) { + if db.Dialector.Name() != "mysql" { + return nil, nil + } + var results []map[string]interface{} + err := db.Raw(` + SELECT * FROM performance_schema.events_transactions_current + WHERE STATE = 'ACTIVE' + AND AUTOCOMMIT = 'NO' + `).Scan(&results).Error + return results, err +} diff --git a/handler/dataprep/remove.go b/handler/dataprep/remove.go index 4d79fcdc..56c3e730 100644 --- a/handler/dataprep/remove.go +++ b/handler/dataprep/remove.go @@ -12,6 +12,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rjNemo/underscore" "gorm.io/gorm" + "gorm.io/gorm/clause" ) type RemoveRequest struct { @@ -53,12 +54,135 @@ func (DefaultHandler) RemovePreparationHandler(ctx context.Context, db *gorm.DB, } err = database.DoRetry(ctx, func() error { - return db.Transaction(func(db *gorm.DB) error { - // Use Select to control deletion order and avoid circular cascade deadlocks. - // GORM v1.31+ handles this by deleting associations in specified order, - // preventing Postgres deadlocks from multiple cascade paths to Files table. - // See: https://github.com/data-preservation-programs/singularity/pull/583 - return db.Select("Wallets", "SourceStorages", "OutputStorages").Delete(&preparation).Error + return db.Transaction(func(tx *gorm.DB) error { + // Explicitly delete child records to avoid CASCADE deadlocks with concurrent operations. + // We materialize IDs first to avoid nested subqueries that cause MySQL deadlocks. + + // Step 1: Get all attachment IDs for this preparation + var attachmentIDs []model.SourceAttachmentID + err := tx.Table("source_attachments").Select("id"). + Where("preparation_id = ?", preparation.ID). + Find(&attachmentIDs).Error + if err != nil { + return errors.WithStack(err) + } + + if len(attachmentIDs) == 0 { + // No attachments, just delete the preparation + return tx.Select("Wallets", "SourceStorages", "OutputStorages").Delete(&preparation).Error + } + + // Step 2: Get all car IDs + var carIDs []model.CarID + err = tx.Table("cars").Select("id"). + Where("preparation_id = ?", preparation.ID). + Find(&carIDs).Error + if err != nil { + return errors.WithStack(err) + } + + // Step 3: Get all job IDs + var jobIDs []model.JobID + err = tx.Table("jobs").Select("id"). + Where("attachment_id IN ?", attachmentIDs). + Find(&jobIDs).Error + if err != nil { + return errors.WithStack(err) + } + + // Step 4: Get all file IDs + var fileIDs []model.FileID + err = tx.Table("files").Select("id"). + Where("attachment_id IN ?", attachmentIDs). + Find(&fileIDs).Error + if err != nil { + return errors.WithStack(err) + } + + // Step 5: Try to lock all jobs with SKIP LOCKED to detect concurrent activity + // This prevents deadlock with concurrent job updates + if len(jobIDs) > 0 { + var lockedJobs []model.Job + err = tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Select("id"). + Where("id IN ?", jobIDs). + Find(&lockedJobs).Error + if err != nil { + return errors.WithStack(err) + } + + // If we couldn't lock all jobs, some are being used by concurrent transactions + if len(lockedJobs) < len(jobIDs) { + return errors.Wrapf(handlererror.ErrInvalidParameter, + "preparation %s has jobs in use by concurrent operations (%d/%d locked)", + preparation.Name, len(lockedJobs), len(jobIDs)) + } + } + + // Now delete in leaf-to-root order using materialized IDs: + + // 1. Delete car_blocks (leaf node) + if len(carIDs) > 0 { + err = tx.Where("car_id IN ?", carIDs).Delete(&model.CarBlock{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 2. Delete cars + if len(carIDs) > 0 { + err = tx.Where("id IN ?", carIDs).Delete(&model.Car{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 3. Delete file_ranges (from jobs) + if len(jobIDs) > 0 { + err = tx.Where("job_id IN ?", jobIDs).Delete(&model.FileRange{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 4. Delete file_ranges (from files) + if len(fileIDs) > 0 { + err = tx.Where("file_id IN ?", fileIDs).Delete(&model.FileRange{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 5. Delete files (before directories to avoid circular cascade) + if len(fileIDs) > 0 { + err = tx.Where("id IN ?", fileIDs).Delete(&model.File{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 6. Delete directories + err = tx.Where("attachment_id IN ?", attachmentIDs).Delete(&model.Directory{}).Error + if err != nil { + return errors.WithStack(err) + } + + // 7. Delete jobs + if len(jobIDs) > 0 { + err = tx.Where("id IN ?", jobIDs).Delete(&model.Job{}).Error + if err != nil { + return errors.WithStack(err) + } + } + + // 8. Now delete the preparation itself, which will cascade to: + // - wallet_assignments (many2many, small table) + // - source_attachments (now empty, no more cascades) + // - output_attachments (many2many, small table) + // These cascades are safe because we've already deleted all the heavy child tables. + return tx.Select("Wallets", "SourceStorages", "OutputStorages").Delete(&preparation).Error }) }) if err != nil { diff --git a/handler/dataprep/remove_deadlock_test.go b/handler/dataprep/remove_deadlock_test.go new file mode 100644 index 00000000..45f66d20 --- /dev/null +++ b/handler/dataprep/remove_deadlock_test.go @@ -0,0 +1,307 @@ +package dataprep + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +// TestRemovePreparationNoDeadlock verifies that preparation deletion does not deadlock +// with concurrent worker operations. The test simulates multiple preparations sharing one +// storage. Deleting one preparation cascades to shared tables while workers operate on +// other preparations. The fix uses explicit ordered deletion instead of FK CASCADE. +func TestRemovePreparationNoDeadlock(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + req := require.New(t) + testutil.EnableDeadlockLogging(t, db) + + // Create test data: multiple preparations sharing one storage + const numPreparations = 3 + const numFilesPerPrep = 50 + const numDirsPerPrep = 10 + const numJobsPerPrep = 10 + const numCarsPerPrep = 5 + const numBlocksPerCar = 20 + + // Create shared storage + sharedStorage := model.Storage{ + Name: "shared-storage-" + uuid.New().String(), + Type: "local", + Path: "/tmp/test", + } + err := db.Create(&sharedStorage).Error + req.NoError(err) + + preparations := make([]model.Preparation, numPreparations) + attachments := make([]model.SourceAttachment, numPreparations) + allJobIDs := make([]model.JobID, 0, numPreparations*numJobsPerPrep) + allFileIDs := make([]model.FileID, 0, numPreparations*numFilesPerPrep) + allCarIDs := make([]model.CarID, 0, numPreparations*numCarsPerPrep) + + for i := range numPreparations { + // Create preparation + prep := model.Preparation{ + Name: "test-prep-" + uuid.New().String(), + MaxSize: 1 << 30, + PieceSize: 1 << 30, + } + err := db.Create(&prep).Error + req.NoError(err) + preparations[i] = prep + + // Attach to shared storage + attachment := model.SourceAttachment{ + PreparationID: prep.ID, + StorageID: sharedStorage.ID, + } + err = db.Create(&attachment).Error + req.NoError(err) + attachments[i] = attachment + + // Create directories + directories := make([]model.Directory, numDirsPerPrep) + for j := range numDirsPerPrep { + dir := model.Directory{ + Name: "dir-" + uuid.New().String()[:8], + AttachmentID: attachment.ID, + CID: model.CID(testutil.TestCid), + Data: []byte("test"), + } + err = db.Create(&dir).Error + req.NoError(err) + directories[j] = dir + } + + // Create files + for j := range numFilesPerPrep { + dirIdx := j % numDirsPerPrep + file := model.File{ + Path: "/test/file-" + uuid.New().String()[:8], + Size: 1024, + AttachmentID: attachment.ID, + DirectoryID: &directories[dirIdx].ID, + CID: model.CID(testutil.TestCid), + Hash: "test-hash", + LastModifiedNano: time.Now().UnixNano(), + } + err = db.Create(&file).Error + req.NoError(err) + allFileIDs = append(allFileIDs, file.ID) + } + + // Create jobs + for j := range numJobsPerPrep { + job := model.Job{ + Type: model.Pack, + State: model.Ready, + AttachmentID: attachment.ID, + } + err = db.Create(&job).Error + req.NoError(err) + allJobIDs = append(allJobIDs, job.ID) + + // Create file ranges for some jobs + if j%2 == 0 && len(allFileIDs) > 0 { + fileIdx := (i*numJobsPerPrep + j) % len(allFileIDs) + fileRange := model.FileRange{ + JobID: &job.ID, + FileID: allFileIDs[fileIdx], + Offset: 0, + Length: 1024, + CID: model.CID(testutil.TestCid), + } + err = db.Create(&fileRange).Error + req.NoError(err) + } + } + + // Create cars + for range numCarsPerPrep { + car := model.Car{ + PreparationID: prep.ID, + AttachmentID: &attachment.ID, + PieceCID: model.CID(testutil.TestCid), + PieceSize: 1 << 20, + RootCID: model.CID(testutil.TestCid), + FileSize: 1024, + StoragePath: "/test/car-" + uuid.New().String()[:8], + } + err = db.Create(&car).Error + req.NoError(err) + allCarIDs = append(allCarIDs, car.ID) + + // Create car blocks + for k := range numBlocksPerCar { + block := model.CarBlock{ + CarID: car.ID, + CID: model.CID(testutil.TestCid), + CarOffset: int64(k * 1024), + CarBlockLength: 1024, + Varint: []byte{0x01}, + RawBlock: []byte("test-block"), + FileOffset: int64(k * 1024), + } + err = db.Create(&block).Error + req.NoError(err) + } + } + } + + const numIterations = 15 + var wg sync.WaitGroup + errChan := make(chan error, numIterations*4) + + for i := range numIterations { + wg.Add(4) + + // Goroutine 1: Delete preparations + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%3) * time.Millisecond) + + deleteCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + prepIdx := iteration % numPreparations + handler := DefaultHandler{} + err := handler.RemovePreparationHandler(deleteCtx, db, preparations[prepIdx].Name, RemoveRequest{}) + if err != nil && deleteCtx.Err() == nil { + // Ignore expected errors from concurrent operations + if !isNotFoundError(err) && !isActiveJobsError(err) && !isJobsInUseError(err) { + errChan <- err + } + } + }(i) + + // Goroutine 2: Bulk job updates + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%5) * time.Millisecond) + + updateCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + startIdx := (iteration * 5) % len(allJobIDs) + endIdx := min(startIdx+8, len(allJobIDs)) + batchJobIDs := allJobIDs[startIdx:endIdx] + + err := db.WithContext(updateCtx).Transaction(func(tx *gorm.DB) error { + for _, jobID := range batchJobIDs { + err := tx.Model(&model.Job{}). + Where("id = ?", jobID). + Updates(map[string]any{ + "state": model.Processing, + "error_message": "", + "error_stack_trace": "", + }).Error + if err != nil { + return err + } + } + return nil + }) + + if err != nil && updateCtx.Err() == nil { + // If it's a deadlock error, get InnoDB status + if strings.Contains(err.Error(), "Deadlock") { + if deadlockInfo := database.PrintDeadlockInfo(db); deadlockInfo != "" { + t.Logf("\n%s", deadlockInfo) + } + } + errChan <- err + } + }(i) + + // Goroutine 3: File updates + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%7) * time.Millisecond) + + updateCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + fileIdx := iteration % len(allFileIDs) + err := db.WithContext(updateCtx).Model(&model.File{}). + Where("id = ?", allFileIDs[fileIdx]). + Update("size", 2048).Error + + if err != nil && updateCtx.Err() == nil { + errChan <- err + } + }(i) + + // Goroutine 4: Car block creation + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%4) * time.Millisecond) + + createCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + carIdx := iteration % len(allCarIDs) + block := model.CarBlock{ + CarID: allCarIDs[carIdx], + CID: model.CID(testutil.TestCid), + CarOffset: int64(1000 + iteration), + CarBlockLength: 512, + Varint: []byte{0x02}, + RawBlock: []byte("new-block"), + FileOffset: int64(1000 + iteration), + } + err := db.WithContext(createCtx).Create(&block).Error + + // Ignore FK errors + if err != nil && createCtx.Err() == nil && !isFKError(err) { + errChan <- err + } + }(i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case err := <-errChan: + req.NoError(err, "Unexpected error during concurrent operations") + case <-time.After(45 * time.Second): + req.Fail("Test timed out - likely deadlock occurred") + } + + // We don't verify complete cleanup because some deletions may be blocked by active jobs. + t.Logf("Test completed without deadlock") + }) +} + +func isNotFoundError(err error) bool { + return err != nil && (strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "does not exist")) +} + +func isActiveJobsError(err error) bool { + return err != nil && strings.Contains(err.Error(), "active jobs") +} + +func isJobsInUseError(err error) bool { + return err != nil && strings.Contains(err.Error(), "jobs in use") +} + +func isFKError(err error) bool { + if err == nil { + return false + } + s := err.Error() + return strings.Contains(s, "foreign key") || strings.Contains(s, "FOREIGN KEY") || + strings.Contains(s, "violates") || strings.Contains(s, "constraint") +} diff --git a/handler/job/pack.go b/handler/job/pack.go index 8e0761f6..d0b837ba 100644 --- a/handler/job/pack.go +++ b/handler/job/pack.go @@ -11,8 +11,8 @@ import ( "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/pack" "github.com/data-preservation-programs/singularity/scan" - "github.com/data-preservation-programs/singularity/util" "gorm.io/gorm" + "gorm.io/gorm/clause" ) var ( @@ -54,27 +54,53 @@ func (DefaultHandler) StartPackHandler( var jobs []model.Job if jobID == 0 { err = database.DoRetry(ctx, func() error { - err := db.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, startableStatesForPack, sourceAttachment.ID).Find(&jobs).Error - if err != nil { - return errors.WithStack(err) - } - var jobIDs []model.JobID - for i, job := range jobs { - jobIDs = append(jobIDs, job.ID) - jobs[i].State = model.Ready - } - jobIDChunks := util.ChunkSlice(jobIDs, util.BatchSize) - for _, jobIDs := range jobIDChunks { - err = db.Model(&model.Job{}).Where("id IN ?", jobIDs).Updates(map[string]any{ - "state": model.Ready, - "error_message": "", - "error_stack_trace": "", - }).Error + return db.Transaction(func(tx *gorm.DB) error { + // Find all eligible jobs + err := tx.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, startableStatesForPack, sourceAttachment.ID).Find(&jobs).Error if err != nil { return errors.WithStack(err) } - } - return nil + + // Update jobs one at a time with SKIP LOCKED to avoid deadlock with worker cleanup + // Use two-step approach: SELECT FOR UPDATE SKIP LOCKED, then UPDATE + updatedJobs := make([]model.Job, 0, len(jobs)) + for _, job := range jobs { + // Try to lock this job + var lockedJob model.Job + err = tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Where("id = ?", job.ID).First(&lockedJob).Error + + if errors.Is(err, gorm.ErrRecordNotFound) { + // Job was locked by someone else, skip it + continue + } + if err != nil { + return errors.WithStack(err) + } + + // Now update the job we successfully locked + err = tx.Model(&model.Job{}). + Where("id = ?", job.ID). + Updates(map[string]any{ + "state": model.Ready, + "error_message": "", + "error_stack_trace": "", + }).Error + + if err != nil { + return errors.WithStack(err) + } + + job.State = model.Ready + updatedJobs = append(updatedJobs, job) + } + + // Replace jobs with only the ones we successfully updated + jobs = updatedJobs + return nil + }) }) if err != nil { return nil, errors.WithStack(err) @@ -162,23 +188,49 @@ func (DefaultHandler) PausePackHandler( var jobs []model.Job if jobID == 0 { err = database.DoRetry(ctx, func() error { - err := db.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, pausableStatesForPack, sourceAttachment.ID).Find(&jobs).Error - if err != nil { - return errors.WithStack(err) - } - var jobIDs []model.JobID - for i, job := range jobs { - jobIDs = append(jobIDs, job.ID) - jobs[i].State = model.Paused - } - jobIDChunks := util.ChunkSlice(jobIDs, util.BatchSize) - for _, jobIDs := range jobIDChunks { - err = db.Model(&model.Job{}).Where("id IN ?", jobIDs).Update("state", model.Paused).Error + return db.Transaction(func(tx *gorm.DB) error { + // Find all eligible jobs + err := tx.Where("type = ? AND state in ? AND attachment_id = ?", model.Pack, pausableStatesForPack, sourceAttachment.ID).Find(&jobs).Error if err != nil { return errors.WithStack(err) } - } - return nil + + // Update jobs one at a time with SKIP LOCKED to avoid deadlock with worker cleanup + // Use two-step approach: SELECT FOR UPDATE SKIP LOCKED, then UPDATE + updatedJobs := make([]model.Job, 0, len(jobs)) + for _, job := range jobs { + // Try to lock this job + var lockedJob model.Job + err = tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Where("id = ?", job.ID).First(&lockedJob).Error + + if errors.Is(err, gorm.ErrRecordNotFound) { + // Job was locked by someone else, skip it + continue + } + if err != nil { + return errors.WithStack(err) + } + + // Now update the job we successfully locked + err = tx.Model(&model.Job{}). + Where("id = ?", job.ID). + Update("state", model.Paused).Error + + if err != nil { + return errors.WithStack(err) + } + + job.State = model.Paused + updatedJobs = append(updatedJobs, job) + } + + // Replace jobs with only the ones we successfully updated + jobs = updatedJobs + return nil + }) }) if err != nil { return nil, errors.WithStack(err) diff --git a/model/preparation.go b/model/preparation.go index c7e97040..3d5f16b5 100644 --- a/model/preparation.go +++ b/model/preparation.go @@ -183,9 +183,9 @@ type Job struct { ErrorStackTrace string `json:"errorStackTrace" table:"verbose"` // Associations - WorkerID *string `gorm:"size:63" json:"workerId,omitempty"` + WorkerID *string `gorm:"size:63;index" json:"workerId,omitempty"` Worker *Worker `gorm:"foreignKey:WorkerID;references:ID;constraint:OnDelete:SET NULL" json:"worker,omitempty" swaggerignore:"true" table:"verbose;expand"` - AttachmentID SourceAttachmentID `json:"attachmentId" table:"verbose"` + AttachmentID SourceAttachmentID `gorm:"index" json:"attachmentId" table:"verbose"` Attachment *SourceAttachment `gorm:"foreignKey:AttachmentID;constraint:OnDelete:CASCADE" json:"attachment,omitempty" swaggerignore:"true" table:"expand"` FileRanges []FileRange `gorm:"foreignKey:JobID;constraint:OnDelete:SET NULL" json:"fileRanges,omitempty" swaggerignore:"true" table:"-"` } @@ -204,7 +204,7 @@ type File struct { LastModifiedNano int64 `cbor:"5,keyasint,omitempty" json:"lastModifiedNano"` // Associations - AttachmentID SourceAttachmentID `cbor:"-" json:"attachmentId"` + AttachmentID SourceAttachmentID `cbor:"-" gorm:"index" json:"attachmentId"` Attachment *SourceAttachment `cbor:"-" gorm:"foreignKey:AttachmentID;constraint:OnDelete:CASCADE" json:"attachment,omitempty" swaggerignore:"true"` DirectoryID *DirectoryID `cbor:"-" gorm:"index" json:"directoryId"` Directory *Directory `cbor:"-" gorm:"foreignKey:DirectoryID;constraint:OnDelete:CASCADE" json:"directory,omitempty" swaggerignore:"true"` @@ -266,17 +266,17 @@ type Car struct { RootCID CID `cbor:"3,keyasint,omitempty" gorm:"column:root_cid;type:bytes" json:"rootCid" swaggertype:"string"` FileSize int64 `cbor:"4,keyasint,omitempty" json:"fileSize"` MinPieceSizePadding int64 `cbor:"5,keyasint,omitempty" json:"minPieceSizePadding"` // MinPieceSizePadding tracks virtual padding for inline mode only. Inline: stores padding amount, PieceReader serves zeros virtually. Non-inline: always 0, literal zeros are written to CAR file for Curio TreeD compatibility. - StorageID *StorageID `cbor:"-" json:"storageId" table:"verbose"` + StorageID *StorageID `cbor:"-" gorm:"index" json:"storageId" table:"verbose"` Storage *Storage `cbor:"-" gorm:"foreignKey:StorageID;constraint:OnDelete:SET NULL" json:"storage,omitempty" swaggerignore:"true" table:"expand"` StoragePath string `cbor:"-" json:"storagePath"` // StoragePath is the path to the CAR file inside the storage. If the StorageID is nil but StoragePath is not empty, it means the CAR file is stored at the local absolute path. NumOfFiles int64 `cbor:"-" json:"numOfFiles" table:"verbose"` // Association - PreparationID PreparationID `cbor:"-" json:"preparationId" table:"-"` + PreparationID PreparationID `cbor:"-" gorm:"index" json:"preparationId" table:"-"` Preparation *Preparation `cbor:"-" gorm:"foreignKey:PreparationID;constraint:OnDelete:CASCADE" json:"preparation,omitempty" swaggerignore:"true" table:"-"` - AttachmentID *SourceAttachmentID `cbor:"-" json:"attachmentId" table:"-"` + AttachmentID *SourceAttachmentID `cbor:"-" gorm:"index" json:"attachmentId" table:"-"` Attachment *SourceAttachment `cbor:"-" gorm:"foreignKey:AttachmentID;constraint:OnDelete:CASCADE" json:"attachment,omitempty" swaggerignore:"true" table:"-"` - JobID *JobID `cbor:"-" json:"jobId,omitempty" table:"-"` + JobID *JobID `cbor:"-" gorm:"index" json:"jobId,omitempty" table:"-"` Job *Job `cbor:"-" gorm:"foreignKey:JobID;constraint:OnDelete:SET NULL" json:"job,omitempty" swaggerignore:"true" table:"-"` } diff --git a/service/datasetworker/find.go b/service/datasetworker/find.go index 4e96fbe7..92a79e92 100644 --- a/service/datasetworker/find.go +++ b/service/datasetworker/find.go @@ -8,6 +8,7 @@ import ( "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/model" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // findJob searches for a Job from the database based on the ordered list of job types provided. @@ -26,13 +27,17 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo db := w.dbNoContext.WithContext(ctx) txOpts := &sql.TxOptions{ - Isolation: sql.LevelSerializable, + Isolation: sql.LevelReadCommitted, } var job model.Job for _, jobType := range typesOrdered { err := database.DoRetry(ctx, func() error { return db.Transaction(func(db *gorm.DB) error { - err := db.Preload("Attachment.Preparation.OutputStorages").Preload("Attachment.Storage"). + // First, lock and claim the job without preloading (preload can interfere with locking) + err := db.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }). Where("type = ? AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). First(&job).Error if err != nil { @@ -43,12 +48,21 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo return errors.WithStack(err) } - return db.Model(&job). + // Update the job to claim it + err = db.Model(&job). Updates(map[string]any{ "state": model.Processing, "worker_id": w.id, "error_message": "", }).Error + if err != nil { + return errors.WithStack(err) + } + + // Now load the associations we need + err = db.Preload("Attachment.Preparation.OutputStorages").Preload("Attachment.Storage"). + First(&job, job.ID).Error + return errors.WithStack(err) }, txOpts) }) if err != nil { diff --git a/service/healthcheck/healthcheck.go b/service/healthcheck/healthcheck.go index fb148f9f..6c1df228 100644 --- a/service/healthcheck/healthcheck.go +++ b/service/healthcheck/healthcheck.go @@ -59,11 +59,9 @@ func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) { // HealthCheckCleanup is a function that cleans up stale workers and work files in the database. // -// It first removes all workers that haven't sent a heartbeat for a certain threshold (staleThreshold). -// If there's an error removing the workers, it logs the error and continues. -// -// Then, it resets the state of any jobs that are marked as being processed by a worker that no longer exists. -// If there's an error updating the sources, it logs the error and continues. +// It first finds workers that haven't sent a heartbeat for a certain threshold (staleThreshold). +// Then it explicitly updates jobs owned by those workers to prevent CASCADE lock contention. +// Worker deletion is deferred to avoid deadlocks during concurrent operations. // // All database operations are retried on failure using the DoRetry function. // @@ -72,25 +70,79 @@ func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) { func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { db = db.WithContext(ctx) logger.Debugw("running healthcheck cleanup") - // Remove all workers that haven't sent heartbeat for 5 minutes. - err := database.DoRetry(ctx, func() error { - return db.Where("last_heartbeat < ?", time.Now().UTC().Add(-staleThreshold)).Delete(&model.Worker{}).Error - }) - if err != nil && !errors.Is(err, context.Canceled) { - log.Logger("healthcheck").Errorw("failed to remove dead workers", "error", err) + + // Find stale workers + var staleWorkers []model.Worker + err := db.Where("last_heartbeat < ?", time.Now().UTC().Add(-staleThreshold)).Find(&staleWorkers).Error + if err != nil { + logger.Errorw("failed to find stale workers", "error", err) + return } - // In case there are some works that have stale foreign key referenced to dead workers, we need to remove them - err = database.DoRetry(ctx, func() error { - return db.Model(&model.Job{}).Where("(worker_id NOT IN (?) OR worker_id IS NULL) AND state = ?", - db.Table("workers").Select("id"), model.Processing). - Updates(map[string]any{ - "worker_id": nil, - "state": model.Ready, - }).Error - }) - if err != nil && !errors.Is(err, context.Canceled) { - log.Logger("healthcheck").Errorw("failed to remove stale workers", "error", err) + if len(staleWorkers) == 0 { + return + } + + // Process each stale worker individually to avoid lock ordering issues + for _, worker := range staleWorkers { + // First, just update jobs without trying to delete the worker + err := database.DoRetry(ctx, func() error { + return db.Transaction(func(tx *gorm.DB) error { + // Lock and update jobs owned by this specific worker with SKIP LOCKED + var jobsToUpdate []model.Job + err := tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Select("id"). + Where("worker_id = ? AND state = ?", worker.ID, model.Processing). + Find(&jobsToUpdate).Error + if err != nil { + return errors.WithStack(err) + } + + // Update the jobs we successfully locked + if len(jobsToUpdate) > 0 { + jobIDsToUpdate := make([]model.JobID, len(jobsToUpdate)) + for i, job := range jobsToUpdate { + jobIDsToUpdate[i] = job.ID + } + + err = tx.Model(&model.Job{}). + Where("id IN ?", jobIDsToUpdate). + Updates(map[string]any{ + "worker_id": nil, + "state": model.Ready, + }).Error + if err != nil { + return errors.WithStack(err) + } + } + return nil + }) + }) + if err != nil && !errors.Is(err, context.Canceled) { + logger.Errorw("failed to cleanup jobs for worker", "workerID", worker.ID, "error", err) + } + + // Now check if the worker has any jobs left + var jobCount int64 + err = db.Model(&model.Job{}).Where("worker_id = ?", worker.ID).Count(&jobCount).Error + if err != nil { + logger.Errorw("failed to count jobs for worker", "workerID", worker.ID, "error", err) + continue + } + + // Only delete the worker if it has no jobs + if jobCount == 0 { + result := db.Where("id = ?", worker.ID).Delete(&model.Worker{}) + if result.Error != nil { + logger.Errorw("failed to delete worker", "workerID", worker.ID, "error", result.Error) + } else if result.RowsAffected > 0 { + logger.Debugw("deleted stale worker", "workerID", worker.ID) + } + } else { + logger.Debugw("worker still has jobs, will retry later", "workerID", worker.ID, "jobCount", jobCount) + } } } diff --git a/service/healthcheck/healthcheck_deadlock_test.go b/service/healthcheck/healthcheck_deadlock_test.go new file mode 100644 index 00000000..04fb8ea5 --- /dev/null +++ b/service/healthcheck/healthcheck_deadlock_test.go @@ -0,0 +1,171 @@ +package healthcheck + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +// TestHealthCheckCleanupNoDeadlock verifies that concurrent worker cleanup and bulk job +// updates do not deadlock. Worker cleanup triggers FK CASCADE to jobs while bulk updates +// modify jobs directly. The fix uses FOR UPDATE SKIP LOCKED. +func TestHealthCheckCleanupNoDeadlock(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + req := require.New(t) + testutil.EnableDeadlockLogging(t, db) + + // Create test preparation and storage for jobs + preparation := model.Preparation{ + Name: "test-prep-" + uuid.New().String(), + MaxSize: 1 << 30, + PieceSize: 1 << 30, + } + err := db.Create(&preparation).Error + req.NoError(err) + + storage := model.Storage{ + Name: "test-storage-" + uuid.New().String(), + Type: "local", + Path: "/tmp/test", + } + err = db.Create(&storage).Error + req.NoError(err) + + attachment := model.SourceAttachment{ + PreparationID: preparation.ID, + StorageID: storage.ID, + } + err = db.Create(&attachment).Error + req.NoError(err) + + // Create workers and jobs assigned to them + const numWorkers = 5 + const numJobsPerWorker = 10 + workerIDs := make([]string, numWorkers) + jobIDs := make([]model.JobID, 0, numWorkers*numJobsPerWorker) + + for i := range numWorkers { + workerID := uuid.New().String() + workerIDs[i] = workerID + + // Create worker with old heartbeat + worker := model.Worker{ + ID: workerID, + LastHeartbeat: time.Now().UTC().Add(-10 * time.Minute), + Hostname: "test-host", + Type: model.DatasetWorker, + } + err = db.Create(&worker).Error + req.NoError(err) + + // Create jobs assigned to this worker + for range numJobsPerWorker { + job := model.Job{ + Type: model.Pack, + State: model.Processing, + WorkerID: &workerID, + AttachmentID: attachment.ID, + } + err = db.Create(&job).Error + req.NoError(err) + jobIDs = append(jobIDs, job.ID) + } + } + + const numIterations = 20 + var wg sync.WaitGroup + errChan := make(chan error, numIterations*2) + + for i := range numIterations { + wg.Add(2) + + // Goroutine 1: Worker cleanup + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%3) * time.Millisecond) + + cleanupCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + HealthCheckCleanup(cleanupCtx, db) + }(i) + + // Goroutine 2: Bulk job update + go func(iteration int) { + defer wg.Done() + time.Sleep(time.Duration(iteration%5) * time.Millisecond) + + updateCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + startIdx := (iteration * 5) % len(jobIDs) + endIdx := min(startIdx+10, len(jobIDs)) + batchJobIDs := jobIDs[startIdx:endIdx] + + err := db.WithContext(updateCtx).Transaction(func(tx *gorm.DB) error { + for _, jobID := range batchJobIDs { + err := tx.Model(&model.Job{}). + Where("id = ?", jobID). + Updates(map[string]any{ + "state": model.Ready, + "error_message": "", + "error_stack_trace": "", + }).Error + if err != nil { + return err + } + } + return nil + }) + + if err != nil && updateCtx.Err() == nil { + // If it's a deadlock error, get InnoDB status + if strings.Contains(err.Error(), "Deadlock") { + if deadlockInfo := database.PrintDeadlockInfo(db); deadlockInfo != "" { + t.Logf("\n%s", deadlockInfo) + } + } + errChan <- err + } + }(i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case err := <-errChan: + req.NoError(err, "Unexpected error during concurrent operations") + case <-time.After(30 * time.Second): + req.Fail("Test timed out - likely deadlock occurred") + } + + // Verify no deadlock - we don't require all workers to be deleted because + // workers with jobs locked by concurrent transactions will be skipped (correct behavior) + var remainingJobs []model.Job + err = db.Where("id IN ?", jobIDs).Find(&remainingJobs).Error + req.NoError(err) + + // All jobs should either be reset or still assigned (if locked during cleanup) + for _, job := range remainingJobs { + if job.WorkerID == nil { + req.Contains([]model.JobState{model.Ready, model.Complete}, job.State, + "Job %d with NULL worker_id should be Ready or Complete", job.ID) + } + // Jobs with non-null worker_id are jobs that were locked during cleanup - allowed + } + }) +} diff --git a/util/testutil/deadlock.go b/util/testutil/deadlock.go new file mode 100644 index 00000000..f2ea21e4 --- /dev/null +++ b/util/testutil/deadlock.go @@ -0,0 +1,36 @@ +package testutil + +import ( + "testing" + + "github.com/data-preservation-programs/singularity/database" + "gorm.io/gorm" +) + +// EnableDeadlockLogging enables comprehensive deadlock logging for MySQL/MariaDB tests. +// This should be called early in tests that may encounter deadlocks. +// It will: +// - Enable innodb_print_all_deadlocks (logs all deadlocks to error log, not just the last one) +// - Log the current state of deadlock logging +// +// Note: innodb_print_all_deadlocks requires SUPER privilege and persists until server restart. +func EnableDeadlockLogging(t *testing.T, db *gorm.DB) { + // Try to enable it (may fail if already enabled or insufficient privileges) + err := database.EnableDeadlockLogging(db) + if err != nil { + t.Logf("Note: Could not enable innodb_print_all_deadlocks: %v (may not have SUPER privilege)", err) + } + + // Check if it's enabled + enabled, err := database.CheckDeadlockLoggingEnabled(db) + if err != nil { + t.Logf("Note: Could not check innodb_print_all_deadlocks status: %v", err) + return + } + + if enabled { + t.Logf("Deadlock logging enabled: all deadlocks will be logged to MySQL error log") + } else { + t.Logf("Deadlock logging not enabled: only the most recent deadlock will be available") + } +}