diff --git a/pack/pack.go b/pack/pack.go index e0e2d390..cb1e5d34 100644 --- a/pack/pack.go +++ b/pack/pack.go @@ -21,6 +21,7 @@ import ( "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-log/v2" + "github.com/rclone/rclone/fs" "github.com/rjNemo/underscore" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -113,73 +114,113 @@ func Pack( var minPieceSizePadding int64 if storageWriter != nil { var carGenerated bool - reader := io.TeeReader(assembler, calc) filename = uuid.NewString() + ".car" - obj, err := storageWriter.Write(ctx, filename, reader) - defer func() { - if !carGenerated && obj != nil { - removeCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - err := storageWriter.Remove(removeCtx, obj) - if err != nil { - logger.Errorf("failed to remove temporary CAR file %s: %v", filename, err) - } - cancel() - } - }() - if err != nil { - return nil, errors.WithStack(err) - } - fileSize = obj.Size() - if assembler.carOffset <= 65 { - return nil, errors.WithStack(ErrNoContent) - } - pieceCid, finalPieceSize, err = GetCommp(calc, uint64(pieceSize)) - if err != nil { - return nil, errors.WithStack(err) + // Calculate total input size + var totalInputSize int64 + for _, fr := range job.FileRanges { + totalInputSize += fr.Length } - // Check if minPieceSize constraint forced larger piece size - naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize)) - if finalPieceSize > naturalPieceSize { - // Need to pad to (127/128) × piece_size due to Fr32 padding overhead - targetCarSize := (int64(finalPieceSize) * 127) / 128 - paddingNeeded := targetCarSize - fileSize + // Use temp file if input size suggests padding may be needed (< minPieceSize/2) + useTempFile := pieceSize > 0 && totalInputSize < int64(pieceSize)/2 + var obj fs.Object - // Find the output storage by ID - var outputStorage *model.Storage - for i := range job.Attachment.Preparation.OutputStorages { - if job.Attachment.Preparation.OutputStorages[i].ID == *storageID { - outputStorage = &job.Attachment.Preparation.OutputStorages[i] - break - } + if useTempFile { + // Write to temp file first to enable padding + tempFile, err := os.CreateTemp("", "car-*.car") + if err != nil { + return nil, errors.Wrap(err, "failed to create temp file for CAR") + } + tempPath := tempFile.Name() + defer os.Remove(tempPath) + + reader := io.TeeReader(assembler, calc) + _, err = io.Copy(tempFile, reader) + if err != nil { + tempFile.Close() + return nil, errors.Wrap(err, "failed to write CAR to temp file") + } + tempFile.Close() + + if assembler.carOffset <= 65 { + return nil, errors.WithStack(ErrNoContent) + } + + stat, err := os.Stat(tempPath) + if err != nil { + return nil, errors.WithStack(err) + } + fileSize = stat.Size() + + pieceCid, finalPieceSize, err = GetCommp(calc, uint64(pieceSize)) + if err != nil { + return nil, errors.WithStack(err) } - // Append zeros to file - if outputStorage != nil && obj != nil { - // Build full path to CAR file - carPath := outputStorage.Path + "/" + filename + // Check if padding needed + naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize)) + if finalPieceSize > naturalPieceSize { + targetCarSize := (int64(finalPieceSize) * 127) / 128 + paddingNeeded := targetCarSize - fileSize - // Reopen file and append zeros (like dd does) - f, err := os.OpenFile(carPath, os.O_APPEND|os.O_WRONLY, 0644) + f, err := os.OpenFile(tempPath, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { - return nil, errors.Wrap(err, "failed to open CAR file for padding") + return nil, errors.Wrap(err, "failed to open temp CAR file for padding") } zeros := make([]byte, paddingNeeded) _, err = f.Write(zeros) f.Close() if err != nil { - return nil, errors.Wrap(err, "failed to write padding to CAR file") + return nil, errors.Wrap(err, "failed to write padding to temp CAR file") } fileSize = targetCarSize - // minPieceSizePadding stays 0 for non-inline (zeros are in file) - logger.Infow("padded CAR file for minPieceSize", "original", fileSize-paddingNeeded, "padded", fileSize, "padding", paddingNeeded, "piece_size", finalPieceSize) } + + // Upload complete file + f, err := os.Open(tempPath) + if err != nil { + return nil, errors.Wrap(err, "failed to open temp file for upload") + } + defer f.Close() + + obj, err = storageWriter.Write(ctx, filename, f) + if err != nil { + return nil, errors.WithStack(err) + } + } else { + // Stream directly without temp file (input too large for padding to be likely) + reader := io.TeeReader(assembler, calc) + obj, err = storageWriter.Write(ctx, filename, reader) + if err != nil { + return nil, errors.WithStack(err) + } + fileSize = obj.Size() + + if assembler.carOffset <= 65 { + return nil, errors.WithStack(ErrNoContent) + } + + pieceCid, finalPieceSize, err = GetCommp(calc, uint64(pieceSize)) + if err != nil { + return nil, errors.WithStack(err) + } } + defer func() { + if !carGenerated && obj != nil { + removeCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + err := storageWriter.Remove(removeCtx, obj) + if err != nil { + logger.Errorf("failed to remove temporary CAR file %s: %v", filename, err) + } + cancel() + } + }() + _, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car") if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) { logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err) @@ -189,6 +230,7 @@ func Pack( } carGenerated = true } else { + // Inline preparation - no physical CAR file fileSize, err = io.Copy(calc, assembler) if err != nil { return nil, errors.WithStack(err) diff --git a/pack/pack_test.go b/pack/pack_test.go index c9672a57..e8c587ea 100644 --- a/pack/pack_test.go +++ b/pack/pack_test.go @@ -333,6 +333,7 @@ func TestLastPiecePadding(t *testing.T) { // Create and execute the packing job err := db.Create(&job).Error require.NoError(t, err) + car, err := Pack(ctx, db, job) require.NoError(t, err) @@ -457,6 +458,7 @@ func TestMultiplePiecesWithLastPiece(t *testing.T) { // Create and execute the packing job err := db.Create(&job).Error require.NoError(t, err) + car, err := Pack(ctx, db, job) require.NoError(t, err) diff --git a/service/datasetworker/daggen.go b/service/datasetworker/daggen.go index 1edf5057..73d9b258 100644 --- a/service/datasetworker/daggen.go +++ b/service/datasetworker/daggen.go @@ -207,73 +207,112 @@ func (w *Thread) ExportDag(ctx context.Context, job model.Job) error { var fileSize int64 var minPieceSizePadding int64 if storageWriter != nil { - reader := io.TeeReader(dagGenerator, calc) filename = uuid.NewString() + ".car" - obj, err := storageWriter.Write(ctx, filename, reader) - if err != nil { - return errors.WithStack(err) - } - fileSize = obj.Size() - if dagGenerator.offset <= 59 { - logger.Info("Nothing to export to dag. Skipping.") - return nil - } + // DAG size is unknown until generated, use temp file when minPieceSize is set + if pieceSize > 0 { + // Write to temp file to enable padding + tempFile, err := os.CreateTemp("", "dagcar-*.car") + if err != nil { + return errors.Wrap(err, "failed to create temp file for DAG CAR") + } + tempPath := tempFile.Name() + defer os.Remove(tempPath) - pieceCid, finalPieceSize, err = pack.GetCommp(calc, uint64(pieceSize)) - if err != nil { - return errors.WithStack(err) - } + reader := io.TeeReader(dagGenerator, calc) + _, err = io.Copy(tempFile, reader) + if err != nil { + tempFile.Close() + return errors.Wrap(err, "failed to write DAG CAR to temp file") + } + tempFile.Close() - // Check if minPieceSize constraint forced larger piece size - naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize)) - if finalPieceSize > naturalPieceSize { - // Need to pad to (127/128) × piece_size due to Fr32 padding overhead - targetCarSize := (int64(finalPieceSize) * 127) / 128 - paddingNeeded := targetCarSize - fileSize + if dagGenerator.offset <= 59 { + logger.Info("Nothing to export to dag. Skipping.") + return nil + } - // Find the output storage by ID - var outputStorage *model.Storage - for i := range job.Attachment.Preparation.OutputStorages { - if job.Attachment.Preparation.OutputStorages[i].ID == *storageID { - outputStorage = &job.Attachment.Preparation.OutputStorages[i] - break - } + stat, err := os.Stat(tempPath) + if err != nil { + return errors.WithStack(err) + } + fileSize = stat.Size() + + pieceCid, finalPieceSize, err = pack.GetCommp(calc, uint64(pieceSize)) + if err != nil { + return errors.WithStack(err) } - // Append zeros to file - if outputStorage != nil && obj != nil { - // Build full path to CAR file - carPath := outputStorage.Path + "/" + filename + // Check if padding needed + naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize)) + if finalPieceSize > naturalPieceSize { + targetCarSize := (int64(finalPieceSize) * 127) / 128 + paddingNeeded := targetCarSize - fileSize - // Reopen file and append zeros - f, err := os.OpenFile(carPath, os.O_APPEND|os.O_WRONLY, 0644) + f, err := os.OpenFile(tempPath, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { - return errors.Wrap(err, "failed to open DAG CAR file for padding") + return errors.Wrap(err, "failed to open temp DAG CAR file for padding") } zeros := make([]byte, paddingNeeded) _, err = f.Write(zeros) f.Close() if err != nil { - return errors.Wrap(err, "failed to write padding to DAG CAR file") + return errors.Wrap(err, "failed to write padding to temp DAG CAR file") } fileSize = targetCarSize - // minPieceSizePadding stays 0 for non-inline (zeros are in file) - logger.Infow("padded DAG CAR file for minPieceSize", "original", fileSize-paddingNeeded, "padded", fileSize, "padding", paddingNeeded, "piece_size", finalPieceSize) } - } - _, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car") - if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) { - logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err) - } - if err == nil { - filename = pieceCid.String() + ".car" + // Upload complete file + f, err := os.Open(tempPath) + if err != nil { + return errors.Wrap(err, "failed to open temp file for upload") + } + defer f.Close() + + obj, err := storageWriter.Write(ctx, filename, f) + if err != nil { + return errors.WithStack(err) + } + + _, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car") + if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) { + logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err) + } + if err == nil { + filename = pieceCid.String() + ".car" + } + } else { + // No minPieceSize constraint, stream directly + reader := io.TeeReader(dagGenerator, calc) + obj, err := storageWriter.Write(ctx, filename, reader) + if err != nil { + return errors.WithStack(err) + } + + if dagGenerator.offset <= 59 { + logger.Info("Nothing to export to dag. Skipping.") + return nil + } + + fileSize = obj.Size() + pieceCid, finalPieceSize, err = pack.GetCommp(calc, uint64(pieceSize)) + if err != nil { + return errors.WithStack(err) + } + + _, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car") + if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) { + logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err) + } + if err == nil { + filename = pieceCid.String() + ".car" + } } } else { + // Inline DAG - no physical CAR file fileSize, err = io.Copy(calc, dagGenerator) if err != nil { return errors.WithStack(err) diff --git a/service/datasetworker/daggen_test.go b/service/datasetworker/daggen_test.go index b55f4ee3..55acce7b 100644 --- a/service/datasetworker/daggen_test.go +++ b/service/datasetworker/daggen_test.go @@ -2,6 +2,10 @@ package datasetworker import ( "context" + "fmt" + "os" + "path/filepath" + "strings" "testing" "github.com/data-preservation-programs/singularity/model" @@ -14,6 +18,8 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-log/v2" + "github.com/orlangure/gnomock" + "github.com/orlangure/gnomock/preset/localstack" "github.com/stretchr/testify/require" "gorm.io/gorm" ) @@ -170,3 +176,159 @@ func TestExportDag_WithOutputStorage_NoInline(t *testing.T) { } }) } + +func TestExportDag_WithMinPieceSize_LocalStorage(t *testing.T) { + tmp := t.TempDir() + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + thread := &Thread{ + id: uuid.New(), + dbNoContext: db, + logger: log.Logger("test").With("test", true), + } + job := model.Job{ + Type: model.DagGen, + State: model.Ready, + Attachment: &model.SourceAttachment{ + Preparation: &model.Preparation{ + OutputStorages: []model.Storage{ + { + Type: "local", + Path: tmp, + }, + }, + NoInline: true, + MinPieceSize: 512 * 1024, // 512 KiB - will force padding for small DAG + }, + StorageID: 1, + }, + } + err := thread.dbNoContext.Create(&job).Error + require.NoError(t, err) + + // Create a small DAG that will need padding + dir1 := daggen.NewDirectoryData() + dir1.AddBlocks(ctx, []blocks.Block{ + blocks.NewBlock([]byte("small data")), + }) + dir1Data, err := dir1.MarshalBinary(ctx) + require.NoError(t, err) + + dirs := []model.Directory{ + {AttachmentID: 1, Data: dir1Data, CID: model.CID(packutil.EmptyFileCid)}, + } + err = thread.dbNoContext.Create(&dirs).Error + require.NoError(t, err) + + err = thread.ExportDag(ctx, job) + require.NoError(t, err) + + var cars []model.Car + err = db.Find(&cars).Error + require.NoError(t, err) + require.Len(t, cars, 1) + + // Verify the piece size is at least the minPieceSize + require.GreaterOrEqual(t, cars[0].PieceSize, int64(512*1024)) + + // Verify the file size matches the expected padded size (127/128 of piece size) + expectedFileSize := (cars[0].PieceSize * 127) / 128 + require.Equal(t, expectedFileSize, cars[0].FileSize) + + // Verify MinPieceSizePadding is 0 (padding is physical, not virtual) + require.Equal(t, int64(0), cars[0].MinPieceSizePadding) + }) +} + +func TestExportDag_WithMinPieceSize_RemoteStorage(t *testing.T) { + + // Set up localstack S3 + bucketName := "testbucket" + tempDir := t.TempDir() + + // Create bucket directory for localstack + err := os.MkdirAll(filepath.Join(tempDir, bucketName), 0777) + require.NoError(t, err) + + p := localstack.Preset( + localstack.WithServices(localstack.S3), + localstack.WithS3Files(tempDir), + ) + localS3, err := gnomock.Start(p) + if err != nil && strings.HasPrefix(err.Error(), "can't start container") { + t.Skip("Docker required for S3 tests") + } + require.NoError(t, err) + defer func() { _ = gnomock.Stop(localS3) }() + + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + thread := &Thread{ + id: uuid.New(), + dbNoContext: db, + logger: log.Logger("test").With("test", true), + } + job := model.Job{ + Type: model.DagGen, + State: model.Ready, + Attachment: &model.SourceAttachment{ + Preparation: &model.Preparation{ + OutputStorages: []model.Storage{ + { + Type: "s3", + Path: bucketName, + Config: map[string]string{ + "region": "us-east-1", + "provider": "Other", + "force_path_style": "true", + "chunk_size": "5Mi", + "copy_cutoff": "5Mi", + "upload_cutoff": "5Mi", + "list_chunk": "1000", + "endpoint": fmt.Sprint("http://", localS3.Address(localstack.APIPort)), + "env_auth": "false", + "access_key_id": "test", + "secret_access_key": "test", + }, + }, + }, + NoInline: true, + MinPieceSize: 512 * 1024, // 512 KiB - will force padding + }, + StorageID: 1, + }, + } + err := thread.dbNoContext.Create(&job).Error + require.NoError(t, err) + + // Create a small DAG that will need padding + dir1 := daggen.NewDirectoryData() + dir1.AddBlocks(ctx, []blocks.Block{ + blocks.NewBlock([]byte("small data for remote")), + }) + dir1Data, err := dir1.MarshalBinary(ctx) + require.NoError(t, err) + + dirs := []model.Directory{ + {AttachmentID: 1, Data: dir1Data, CID: model.CID(packutil.EmptyFileCid)}, + } + err = thread.dbNoContext.Create(&dirs).Error + require.NoError(t, err) + + err = thread.ExportDag(ctx, job) + require.NoError(t, err) + + var cars []model.Car + err = db.Find(&cars).Error + require.NoError(t, err) + require.Len(t, cars, 1) + + // Verify the piece size is at least the minPieceSize + require.GreaterOrEqual(t, cars[0].PieceSize, int64(512*1024)) + + // Verify the file size matches the expected padded size (127/128 of piece size) + expectedFileSize := (cars[0].PieceSize * 127) / 128 + require.Equal(t, expectedFileSize, cars[0].FileSize) + + // Verify MinPieceSizePadding is 0 (padding is physical, not virtual) + require.Equal(t, int64(0), cars[0].MinPieceSizePadding) + }) +}