diff --git a/downstreamadapter/sink/kafka/sink.go b/downstreamadapter/sink/kafka/sink.go index 56580fc029..577db6ec5d 100644 --- a/downstreamadapter/sink/kafka/sink.go +++ b/downstreamadapter/sink/kafka/sink.go @@ -442,9 +442,14 @@ func (s *sink) sendDDLEvent(event *commonEvent.DDLEvent) error { return err } } + + finishedAt := time.UnixMilli(int64(event.GetCommitTs() >> 18)) + duration := time.Since(finishedAt) log.Info("kafka sink send DDL event", zap.String("keyspace", s.changefeedID.Keyspace()), zap.String("changefeed", s.changefeedID.Name()), - zap.Any("startTs", event.GetStartTs()), zap.Any("commitTs", event.GetCommitTs()), zap.Any("event", event.GetDDLQuery()), + zap.Any("startTs", event.GetStartTs()), zap.Any("commitTs", event.GetCommitTs()), + zap.Any("tidbFinishedAt", finishedAt), zap.Duration("duration", duration), + zap.Any("DDL", event.GetDDLQuery()), zap.String("schema", event.GetSchemaName()), zap.String("table", event.GetTableName())) return nil } diff --git a/pkg/config/debug.go b/pkg/config/debug.go index c8e40c5ea5..0b92705113 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -78,7 +78,7 @@ func NewDefaultPullerConfig() *PullerConfig { EnableResolvedTsStuckDetection: false, ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), LogRegionDetails: false, - PendingRegionRequestQueueSize: 32, // This value is chosen to reduce the impact of new changefeeds on existing ones. + PendingRegionRequestQueueSize: 256, // This value is chosen to reduce the impact of new changefeeds on existing ones. } } diff --git a/tests/integration_tests/debezium01/src/test_cases.go b/tests/integration_tests/debezium01/src/test_cases.go index 50d6373427..2c6de7a637 100644 --- a/tests/integration_tests/debezium01/src/test_cases.go +++ b/tests/integration_tests/debezium01/src/test_cases.go @@ -36,7 +36,7 @@ import ( "go.uber.org/zap" ) -const timeout = time.Second * 30 +const timeout = time.Second * 20 var ( nFailed = 0