guard timestamp_column in LocalDedupNode against missing DataFrame column #5985
guard timestamp_column in LocalDedupNode against missing DataFrame column #5985faustaround wants to merge 1 commit intofeast-dev:masterfrom
Conversation
…t in feature schema timestamp_column is unconditionally added to sort_keys even when the column doesn't exist in the DataFrame (e.g. when the timestamp_field isn't declared in the feature view schema and gets projected away by the DAG pipeline). The adjacent created_timestamp_column already has an in df.columns guard — timestamp_column needs the same treatment.
| if sort_keys: | ||
| df = self.backend.drop_duplicates( | ||
| df, keys=dedup_keys, sort_by=sort_keys, ascending=False | ||
| ) |
There was a problem hiding this comment.
🔴 Deduplication silently skipped when no timestamp columns are present in DataFrame
When neither timestamp_column nor created_timestamp_column is present in the DataFrame columns, sort_keys will be empty and the if sort_keys: guard at line 204 causes the entire drop_duplicates call to be skipped. This means duplicate rows (by join key) pass through undetected.
Root Cause and Impact
The PR description states the intent is to "add a fallback to deduplicate by key only when no sort columns survive (rather than crashing)." However, the implementation at lines 204-207 simply skips deduplication entirely when sort_keys is empty:
if sort_keys:
df = self.backend.drop_duplicates(
df, keys=dedup_keys, sort_by=sort_keys, ascending=False
)When sort_keys is empty (falsy), no deduplication happens at all. The correct behavior should be to still deduplicate by dedup_keys alone — just without a deterministic sort order. For example, using pandas' df.drop_duplicates(subset=dedup_keys) or equivalent.
Impact: Any feature view where timestamp_field is an internal bookkeeping column not exposed in the feature schema will have its timestamp column projected away before the dedup node runs. In this case, duplicate entity rows will silently remain in the output, leading to incorrect feature values (e.g., duplicated rows in training datasets or multiple values written to the online store for the same entity key).
| if sort_keys: | |
| df = self.backend.drop_duplicates( | |
| df, keys=dedup_keys, sort_by=sort_keys, ascending=False | |
| ) | |
| if sort_keys: | |
| df = self.backend.drop_duplicates( | |
| df, keys=dedup_keys, sort_by=sort_keys, ascending=False | |
| ) | |
| else: | |
| df = self.backend.drop_duplicates( | |
| df, keys=dedup_keys, sort_by=dedup_keys, ascending=True | |
| ) | |
Was this helpful? React with 👍 or 👎 to provide feedback.
LocalDedupNode.executeunconditionally appendstimestamp_columntosort_keys, butcreated_timestamp_column(added immediately after)already guards against this with an
in df.columnscheck:When the feature view's
timestamp_fieldcolumn is not declared in thefeature schema, the DAG pipeline projects it away before the dedup node
runs. The column is present in the raw Redshift result but absent from the
DataFrame by the time
drop_duplicatesis called, causing:This affects any feature view where
timestamp_fieldis an internalbookkeeping column not exposed as a feature.
Apply the same guard to
timestamp_columnfor consistency, and add afallback to deduplicate by key only when no sort columns survive (rather
than crashing).