|
10 | 10 | import pytest |
11 | 11 | import pytz |
12 | 12 |
|
13 | | -from feast import FeatureService, ValueType |
| 13 | +from feast import FeatureService, FeatureView, ValueType |
14 | 14 | from feast.embedded_go.lib.embedded import LoggingOptions |
15 | 15 | from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer |
16 | 16 | from feast.feast_object import FeastObject |
@@ -162,13 +162,14 @@ def test_feature_logging( |
162 | 162 |
|
163 | 163 | _, datasets, _ = universal_data_sources |
164 | 164 | latest_rows = get_latest_rows(datasets.driver_df, "driver_id", driver_ids) |
| 165 | + feature_view = fs.get_feature_view("driver_stats") |
165 | 166 | features = [ |
166 | 167 | feature.name |
167 | 168 | for proj in feature_service.feature_view_projections |
168 | 169 | for feature in proj.features |
169 | 170 | ] |
170 | 171 | expected_logs = generate_expected_logs( |
171 | | - latest_rows, "driver_stats", features, ["driver_id"], "event_timestamp" |
| 172 | + latest_rows, feature_view, features, ["driver_id"], "event_timestamp" |
172 | 173 | ) |
173 | 174 |
|
174 | 175 | def retrieve(): |
@@ -213,15 +214,26 @@ def get_latest_rows(df, join_key, entity_values): |
213 | 214 |
|
214 | 215 |
|
215 | 216 | def generate_expected_logs( |
216 | | - df, feature_view_name, features, join_keys, timestamp_column |
| 217 | + df: pd.DataFrame, |
| 218 | + feature_view: FeatureView, |
| 219 | + features: List[str], |
| 220 | + join_keys: List[str], |
| 221 | + timestamp_column: str, |
217 | 222 | ): |
218 | 223 | logs = pd.DataFrame() |
219 | 224 | for join_key in join_keys: |
220 | 225 | logs[join_key] = df[join_key] |
221 | 226 |
|
222 | 227 | for feature in features: |
223 | | - logs[f"{feature_view_name}__{feature}"] = df[feature] |
224 | | - logs[f"{feature_view_name}__{feature}__timestamp"] = df[timestamp_column] |
225 | | - logs[f"{feature_view_name}__{feature}__status"] = FieldStatus.PRESENT |
| 228 | + col = f"{feature_view.name}__{feature}" |
| 229 | + logs[col] = df[feature] |
| 230 | + logs[f"{col}__timestamp"] = df[timestamp_column] |
| 231 | + logs[f"{col}__status"] = FieldStatus.PRESENT |
| 232 | + if feature_view.ttl: |
| 233 | + logs[f"{col}__status"] = logs[f"{col}__status"].mask( |
| 234 | + df[timestamp_column] |
| 235 | + < datetime.utcnow().replace(tzinfo=pytz.UTC) - feature_view.ttl, |
| 236 | + FieldStatus.OUTSIDE_MAX_AGE, |
| 237 | + ) |
226 | 238 |
|
227 | 239 | return logs.sort_values(by=join_keys).reset_index(drop=True) |
0 commit comments