Skip to content
59 changes: 59 additions & 0 deletions services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
DESCRIPTION >
Global metrics for activities, organizations, and members used for CDP dashboard.
Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV.
"Last30Days" is based on lastActive >= now() - 30 days.

NODE cdpDashboardMetricsTotal
SQL >
SELECT
ms.activitiesTotal,
ms.activitiesLast30Days,
ms.membersTotal,
ms.membersLast30Days,
os.organizationsTotal,
os.organizationsLast30Days
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing date column in dashboard metrics sink

Medium Severity

The sink output is missing a date column that other Kafka sink pipes in the codebase include (using toStartOfDay(now()) as date). Both health_score_sink.pipe and insights_projects_populated_sink.pipe follow this pattern. Without a date column, downstream consumers cannot determine when the metrics snapshot was generated, making historical analysis, deduplication, and data tracking difficult.

Fix in Cursor Fix in Web

FROM
(
-- member-based global metrics (single scan over cdp_member_segment_aggregates_MV)
SELECT
sum(activityCount) AS activitiesTotal,
sumIf(activityCount, lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30Days,
uniqCombined(memberId) AS membersTotal,
uniqCombinedIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days
FROM
(
-- finalize AggregateFunction states per member
SELECT
memberId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive
FROM cdp_member_segment_aggregates_MV
GROUP BY
memberId
) AS m
) AS ms
CROSS JOIN
(
-- organization-based global metrics (single scan over cdp_organization_segment_aggregates_MV)
SELECT
uniqCombined(organizationId) AS organizationsTotal,
uniqCombinedIf(organizationId, lastActive >= now() - INTERVAL 30 DAY) AS organizationsLast30Days
FROM
(
-- finalize AggregateFunction states per organization
SELECT
organizationId,
maxMerge(lastActiveState) AS lastActive
FROM cdp_organization_segment_aggregates_MV
GROUP BY
organizationId
) AS o
) AS os

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 0 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink
Loading