Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add metadata assertions to rivertest. [PR #1137](https://github.com/riverqueue/river/pull/1137).

## [0.30.2] - 2026-01-26

### Fixed
Expand Down
106 changes: 106 additions & 0 deletions rivertest/rivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"slices"
"strings"
"testing"
Expand Down Expand Up @@ -50,6 +51,13 @@ type RequireInsertedOpts struct {
// No assertion is made if left the zero value.
MaxAttempts int

// Metadata is a subset of job metadata to assert against. Only the keys and
// values provided are compared, and any extra metadata on the job is
// ignored.
//
// No assertion is made if left nil or empty.
Metadata map[string]any

// Priority is the expected priority for the inserted job.
//
// No assertion is made if left the zero value.
Expand Down Expand Up @@ -501,6 +509,16 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts *
}
}

if len(expectedOpts.Metadata) > 0 {
metadataMatches, metadataFailures := compareMetadataSubset(t, jobRow.Metadata, expectedOpts.Metadata, requireNotInserted)

if !metadataMatches && requireNotInserted {
return true
}

failures = append(failures, metadataFailures...)
}

if expectedOpts.Priority != 0 {
if jobRow.Priority == expectedOpts.Priority {
if requireNotInserted {
Expand Down Expand Up @@ -594,6 +612,94 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts *
return false
}

func compareMetadataSubset(t testingT, jobMetadataBytes []byte, expectedMetadata map[string]any, requireNotInserted bool) (bool, []string) {
t.Helper()

jobMetadata := map[string]any{}
if len(jobMetadataBytes) > 0 {
if err := json.Unmarshal(jobMetadataBytes, &jobMetadata); err != nil {
failuref(t, "Internal failure: error unmarshaling job metadata: %s", err)
}
}

keys := make([]string, 0, len(expectedMetadata))
for key := range expectedMetadata {
keys = append(keys, key)
}
slices.Sort(keys)

failures := make([]string, 0, len(keys))
allMatch := true
for _, key := range keys {
expectedValue := expectedMetadata[key]

actualValue, ok := jobMetadata[key]
if !ok {
allMatch = false
if requireNotInserted {
return false, nil
}
failures = append(failures, fmt.Sprintf("metadata missing key '%s'", key))
continue
}

if expectedValue == nil {
if actualValue == nil {
if requireNotInserted {
failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded null", key))
}
} else {
allMatch = false
if requireNotInserted {
return false, nil
}
failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected null", key, formatMetadataValue(actualValue)))
}
continue
}

normalizedExpected, err := normalizeMetadataValue(expectedValue)
if err != nil {
failuref(t, "Internal failure: error normalizing metadata for key '%s': %s", key, err)
}

if reflect.DeepEqual(actualValue, normalizedExpected) {
if requireNotInserted {
failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded %s", key, formatMetadataValue(normalizedExpected)))
}
} else {
allMatch = false
if requireNotInserted {
return false, nil
}
failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected %s", key, formatMetadataValue(actualValue), formatMetadataValue(normalizedExpected)))
}
}

return allMatch, failures
}

func formatMetadataValue(value any) string {
encoded, err := json.Marshal(value)
if err != nil {
return fmt.Sprintf("%v", value)
}
return string(encoded)
}

func normalizeMetadataValue(value any) (any, error) {
encoded, err := json.Marshal(value)
if err != nil {
return nil, err
}

var normalized any
if err := json.Unmarshal(encoded, &normalized); err != nil {
return nil, err
}
return normalized, nil
}

// failuref takes a printf-style directive and is a shortcut for failing an
// assertion.
func failuref(t testingT, format string, a ...any) {
Expand Down
65 changes: 65 additions & 0 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,41 @@ func TestRequireInsertedTx(t *testing.T) {
mockT.LogOutput())
})

t.Run("Metadata", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
Metadata: []byte(`{"key":"value","list":[1,2],"nested":{"enabled":true},"num":1}`),
})
require.NoError(t, err)

mockT := testutil.NewMockT(t)
opts := &RequireInsertedOpts{
Metadata: map[string]any{
"key": "value",
"nested": map[string]any{"enabled": true},
"num": int64(1),
},
}
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.False(t, mockT.Failed, "Should have succeeded, but failed with: "+mockT.LogOutput())

mockT = testutil.NewMockT(t)
opts = &RequireInsertedOpts{
Metadata: map[string]any{
"key": "wrong",
"missing": "value",
},
}
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.True(t, mockT.Failed)
require.Equal(t,
failureString("Job with kind 'job2' metadata[key] \"value\" not equal to expected \"wrong\", metadata missing key 'missing'")+"\n",
mockT.LogOutput())
})

t.Run("Priority", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -587,6 +622,36 @@ func TestRequireNotInsertedTx(t *testing.T) {
mockT.LogOutput())
})

t.Run("Metadata", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
Metadata: []byte(`{"key":"value"}`),
})
require.NoError(t, err)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.Metadata = map[string]any{
"key": "value",
}
requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.True(t, mockT.Failed)
require.Equal(t,
failureString("Job with kind 'job2' metadata[key] equal to excluded \"value\"")+"\n",
mockT.LogOutput())

mockT = testutil.NewMockT(t)
opts = emptyOpts()
opts.Metadata = map[string]any{
"key": "other",
}
requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.False(t, mockT.Failed)
})

t.Run("Priority", func(t *testing.T) {
t.Parallel()

Expand Down
Loading