diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c40558e1c7..ab3ac0cd1e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## [0.20.2](https://github.com/feast-dev/feast/compare/v0.20.1...v0.20.2) (2022-04-28) + + +### Bug Fixes + +* Feature with timestamp type is incorrectly interpreted by Go FS ([#2588](https://github.com/feast-dev/feast/issues/2588)) ([3ec943a](https://github.com/feast-dev/feast/commit/3ec943a9964cd2a2c58700dd946472788ac39ccb)) +* Fix AWS bootstrap template ([#2604](https://github.com/feast-dev/feast/issues/2604)) ([6df5a49](https://github.com/feast-dev/feast/commit/6df5a4968258337f7480b4e80d831b5e8960f1ad)) +* Fix broken proto conversion methods for data sources ([#2603](https://github.com/feast-dev/feast/issues/2603)) ([c391216](https://github.com/feast-dev/feast/commit/c3912169451757aa60768b302f8dad129fa796a5)) +* Remove ci extra from the feature transformation server dockerfile ([#2618](https://github.com/feast-dev/feast/issues/2618)) ([a7437fa](https://github.com/feast-dev/feast/commit/a7437fa6c49c717b148746455e2de2d6f98002f3)) +* Update field api to add tag parameter corresponding to labels in Feature. ([#2610](https://github.com/feast-dev/feast/issues/2610)) ([40962fc](https://github.com/feast-dev/feast/commit/40962fcc6afc26601eb0440595c99d568463eb42)) +* Use timestamp type when converting unixtimestamp feature type to arrow ([#2593](https://github.com/feast-dev/feast/issues/2593)) ([a1c3ee3](https://github.com/feast-dev/feast/commit/a1c3ee38e2f2a8e4528c8a3f58144568e4337718)) + ## [0.20.1](https://github.com/feast-dev/feast/compare/v0.20.0...v0.20.1) (2022-04-20) diff --git a/docs/reference/feature-servers/go-feature-retrieval.md b/docs/reference/feature-servers/go-feature-retrieval.md index 05411a7f8c5..30c1a9ca721 100644 --- a/docs/reference/feature-servers/go-feature-retrieval.md +++ b/docs/reference/feature-servers/go-feature-retrieval.md @@ -12,6 +12,11 @@ The Go Feature Retrieval component currently only supports Redis and Sqlite as o As long as you are running macOS or linux, on x86, with python version 3.7-3.10, the go component comes pre-compiled when you install feast. +However, some additional dependencies are required for Go <-> Python interoperability. To install these dependencies run the following command in your console: +``` +pip install feast[go] +``` + For developers, if you want to build from source, run `make compile-go-lib` to build and compile the go server. ## Usage diff --git a/go/cmd/server/logging/feature_repo/example.py b/go/cmd/server/logging/feature_repo/example.py index f3ca6123083..f78470efd55 100644 --- a/go/cmd/server/logging/feature_repo/example.py +++ b/go/cmd/server/logging/feature_repo/example.py @@ -9,7 +9,7 @@ # for more info. driver_hourly_stats = FileSource( path="driver_stats.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/go/types/typeconversion.go b/go/types/typeconversion.go index c02768c755d..416eb2ac273 100644 --- a/go/types/typeconversion.go +++ b/go/types/typeconversion.go @@ -40,9 +40,9 @@ func ProtoTypeToArrowType(sample *types.Value) (arrow.DataType, error) { case *types.Value_DoubleListVal: return arrow.ListOf(arrow.PrimitiveTypes.Float64), nil case *types.Value_UnixTimestampVal: - return arrow.FixedWidthTypes.Time64ns, nil + return arrow.FixedWidthTypes.Timestamp_s, nil case *types.Value_UnixTimestampListVal: - return arrow.ListOf(arrow.FixedWidthTypes.Time64ns), nil + return arrow.ListOf(arrow.FixedWidthTypes.Timestamp_s), nil default: return nil, fmt.Errorf("unsupported proto type in proto to arrow conversion: %s", sample.Val) @@ -80,9 +80,9 @@ func ValueTypeEnumToArrowType(t types.ValueType_Enum) (arrow.DataType, error) { case types.ValueType_DOUBLE_LIST: return arrow.ListOf(arrow.PrimitiveTypes.Float64), nil case types.ValueType_UNIX_TIMESTAMP: - return arrow.FixedWidthTypes.Time64ns, nil + return arrow.FixedWidthTypes.Timestamp_s, nil case types.ValueType_UNIX_TIMESTAMP_LIST: - return arrow.ListOf(arrow.FixedWidthTypes.Time64ns), nil + return arrow.ListOf(arrow.FixedWidthTypes.Timestamp_s), nil default: return nil, fmt.Errorf("unsupported value type enum in enum to arrow type conversion: %s", t) @@ -119,9 +119,9 @@ func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) e for _, v := range values { fieldBuilder.Append(v.GetDoubleVal()) } - case *array.Time64Builder: + case *array.TimestampBuilder: for _, v := range values { - fieldBuilder.Append(arrow.Time64(v.GetUnixTimestampVal())) + fieldBuilder.Append(arrow.Timestamp(v.GetUnixTimestampVal())) } case *array.ListBuilder: for _, list := range values { @@ -157,9 +157,9 @@ func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) e for _, v := range list.GetDoubleListVal().GetVal() { valueBuilder.Append(v) } - case *array.Time64Builder: + case *array.TimestampBuilder: for _, v := range list.GetUnixTimestampListVal().GetVal() { - valueBuilder.Append(arrow.Time64(v)) + valueBuilder.Append(arrow.Timestamp(v)) } } } @@ -227,10 +227,10 @@ func ArrowValuesToProtoValues(arr arrow.Array) ([]*types.Value, error) { } values = append(values, &types.Value{Val: &types.Value_BoolListVal{BoolListVal: &types.BoolList{Val: vals}}}) - case arrow.FixedWidthTypes.Time64ns: + case arrow.FixedWidthTypes.Timestamp_s: vals := make([]int64, int(offsets[idx])-pos) for j := pos; j < int(offsets[idx]); j++ { - vals[j-pos] = int64(listValues.(*array.Time64).Value(j)) + vals[j-pos] = int64(listValues.(*array.Timestamp).Value(j)) } values = append(values, @@ -278,11 +278,11 @@ func ArrowValuesToProtoValues(arr arrow.Array) ([]*types.Value, error) { values = append(values, &types.Value{Val: &types.Value_StringVal{StringVal: arr.(*array.String).Value(idx)}}) } - case arrow.FixedWidthTypes.Time64ns: + case arrow.FixedWidthTypes.Timestamp_s: for idx := 0; idx < arr.Len(); idx++ { values = append(values, &types.Value{Val: &types.Value_UnixTimestampVal{ - UnixTimestampVal: int64(arr.(*array.Time64).Value(idx))}}) + UnixTimestampVal: int64(arr.(*array.Timestamp).Value(idx))}}) } default: return nil, fmt.Errorf("unsupported arrow to proto conversion for type %s", arr.DataType()) diff --git a/infra/charts/feast-python-server/Chart.yaml b/infra/charts/feast-python-server/Chart.yaml index 49a5239fce6..d0e4ef92cf8 100644 --- a/infra/charts/feast-python-server/Chart.yaml +++ b/infra/charts/feast-python-server/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: feast-python-server description: Feast Feature Server in Python type: application -version: 0.20.1 +version: 0.20.2 keywords: - machine learning - big data diff --git a/infra/charts/feast-python-server/README.md b/infra/charts/feast-python-server/README.md index 4f9a96d1d17..a650ff2c181 100644 --- a/infra/charts/feast-python-server/README.md +++ b/infra/charts/feast-python-server/README.md @@ -1,6 +1,6 @@ # feast-python-server -![Version: 0.20.1](https://img.shields.io/badge/Version-0.20.1-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) +![Version: 0.20.2](https://img.shields.io/badge/Version-0.20.2-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) Feast Feature Server in Python diff --git a/infra/charts/feast/Chart.yaml b/infra/charts/feast/Chart.yaml index a45c2784629..8fa8a2101e8 100644 --- a/infra/charts/feast/Chart.yaml +++ b/infra/charts/feast/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v1 description: Feature store for machine learning name: feast -version: 0.20.1 +version: 0.20.2 keywords: - machine learning - big data diff --git a/infra/charts/feast/README.md b/infra/charts/feast/README.md index 7878373c91e..99dc9865bde 100644 --- a/infra/charts/feast/README.md +++ b/infra/charts/feast/README.md @@ -8,7 +8,7 @@ This repo contains Helm charts for Feast components that are being installed on ## Chart: Feast -Feature store for machine learning Current chart version is `0.20.1` +Feature store for machine learning Current chart version is `0.20.2` ## Installation @@ -55,8 +55,8 @@ For more details, please see: https://docs.feast.dev/how-to-guides/running-feast | Repository | Name | Version | |------------|------|---------| | https://charts.helm.sh/stable | redis | 10.5.6 | -| https://feast-helm-charts.storage.googleapis.com | feature-server(feature-server) | 0.20.1 | -| https://feast-helm-charts.storage.googleapis.com | transformation-service(transformation-service) | 0.20.1 | +| https://feast-helm-charts.storage.googleapis.com | feature-server(feature-server) | 0.20.2 | +| https://feast-helm-charts.storage.googleapis.com | transformation-service(transformation-service) | 0.20.2 | ## Values diff --git a/infra/charts/feast/charts/feature-server/Chart.yaml b/infra/charts/feast/charts/feature-server/Chart.yaml index 754da633e4a..a0c208b9a73 100644 --- a/infra/charts/feast/charts/feature-server/Chart.yaml +++ b/infra/charts/feast/charts/feature-server/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v1 description: "Feast Feature Server: Online feature serving service for Feast" name: feature-server -version: 0.20.1 -appVersion: v0.20.1 +version: 0.20.2 +appVersion: v0.20.2 keywords: - machine learning - big data diff --git a/infra/charts/feast/charts/feature-server/README.md b/infra/charts/feast/charts/feature-server/README.md index 5eb2ccd059c..34415ee20f0 100644 --- a/infra/charts/feast/charts/feature-server/README.md +++ b/infra/charts/feast/charts/feature-server/README.md @@ -1,6 +1,6 @@ # feature-server -![Version: 0.20.1](https://img.shields.io/badge/Version-0.20.1-informational?style=flat-square) ![AppVersion: v0.20.1](https://img.shields.io/badge/AppVersion-v0.20.1-informational?style=flat-square) +![Version: 0.20.2](https://img.shields.io/badge/Version-0.20.2-informational?style=flat-square) ![AppVersion: v0.20.2](https://img.shields.io/badge/AppVersion-v0.20.2-informational?style=flat-square) Feast Feature Server: Online feature serving service for Feast @@ -17,7 +17,7 @@ Feast Feature Server: Online feature serving service for Feast | envOverrides | object | `{}` | Extra environment variables to set | | image.pullPolicy | string | `"IfNotPresent"` | Image pull policy | | image.repository | string | `"feastdev/feature-server-java"` | Docker image for Feature Server repository | -| image.tag | string | `"0.20.1"` | Image tag | +| image.tag | string | `"0.20.2"` | Image tag | | ingress.grpc.annotations | object | `{}` | Extra annotations for the ingress | | ingress.grpc.auth.enabled | bool | `false` | Flag to enable auth | | ingress.grpc.class | string | `"nginx"` | Which ingress controller to use | diff --git a/infra/charts/feast/charts/feature-server/values.yaml b/infra/charts/feast/charts/feature-server/values.yaml index 2cdbf4840c3..9cade47e605 100644 --- a/infra/charts/feast/charts/feature-server/values.yaml +++ b/infra/charts/feast/charts/feature-server/values.yaml @@ -5,7 +5,7 @@ image: # image.repository -- Docker image for Feature Server repository repository: feastdev/feature-server-java # image.tag -- Image tag - tag: 0.20.1 + tag: 0.20.2 # image.pullPolicy -- Image pull policy pullPolicy: IfNotPresent diff --git a/infra/charts/feast/charts/transformation-service/Chart.yaml b/infra/charts/feast/charts/transformation-service/Chart.yaml index b9fc88f12ad..2de15e15f6b 100644 --- a/infra/charts/feast/charts/transformation-service/Chart.yaml +++ b/infra/charts/feast/charts/transformation-service/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v1 description: "Transformation service: to compute on-demand features" name: transformation-service -version: 0.20.1 -appVersion: v0.20.1 +version: 0.20.2 +appVersion: v0.20.2 keywords: - machine learning - big data diff --git a/infra/charts/feast/charts/transformation-service/README.md b/infra/charts/feast/charts/transformation-service/README.md index d37be5ab076..d7044c1abf5 100644 --- a/infra/charts/feast/charts/transformation-service/README.md +++ b/infra/charts/feast/charts/transformation-service/README.md @@ -1,6 +1,6 @@ # transformation-service -![Version: 0.20.1](https://img.shields.io/badge/Version-0.20.1-informational?style=flat-square) ![AppVersion: v0.20.1](https://img.shields.io/badge/AppVersion-v0.20.1-informational?style=flat-square) +![Version: 0.20.2](https://img.shields.io/badge/Version-0.20.2-informational?style=flat-square) ![AppVersion: v0.20.2](https://img.shields.io/badge/AppVersion-v0.20.2-informational?style=flat-square) Transformation service: to compute on-demand features @@ -13,7 +13,7 @@ Transformation service: to compute on-demand features | envOverrides | object | `{}` | Extra environment variables to set | | image.pullPolicy | string | `"IfNotPresent"` | Image pull policy | | image.repository | string | `"feastdev/feature-transformation-server"` | Docker image for Transformation Server repository | -| image.tag | string | `"0.20.1"` | Image tag | +| image.tag | string | `"0.20.2"` | Image tag | | nodeSelector | object | `{}` | Node labels for pod assignment | | podLabels | object | `{}` | Labels to be added to Feast Serving pods | | replicaCount | int | `1` | Number of pods that will be created | diff --git a/infra/charts/feast/charts/transformation-service/values.yaml b/infra/charts/feast/charts/transformation-service/values.yaml index 41db8d0ab70..bae9179478c 100644 --- a/infra/charts/feast/charts/transformation-service/values.yaml +++ b/infra/charts/feast/charts/transformation-service/values.yaml @@ -5,7 +5,7 @@ image: # image.repository -- Docker image for Transformation Server repository repository: feastdev/feature-transformation-server # image.tag -- Image tag - tag: 0.20.1 + tag: 0.20.2 # image.pullPolicy -- Image pull policy pullPolicy: IfNotPresent diff --git a/infra/charts/feast/requirements.yaml b/infra/charts/feast/requirements.yaml index 2fcc3857cd8..b1535ea701c 100644 --- a/infra/charts/feast/requirements.yaml +++ b/infra/charts/feast/requirements.yaml @@ -1,12 +1,12 @@ dependencies: - name: feature-server alias: feature-server - version: 0.20.1 + version: 0.20.2 condition: feature-server.enabled repository: https://feast-helm-charts.storage.googleapis.com - name: transformation-service alias: transformation-service - version: 0.20.1 + version: 0.20.2 condition: transformation-service.enabled repository: https://feast-helm-charts.storage.googleapis.com - name: redis diff --git a/infra/scripts/release/bump_file_versions.py b/infra/scripts/release/bump_file_versions.py index 451b2f5a725..81ab1329f6b 100644 --- a/infra/scripts/release/bump_file_versions.py +++ b/infra/scripts/release/bump_file_versions.py @@ -39,9 +39,9 @@ def main() -> None: if current_version[-2:] != ".0": print(current_version[-2:]) versions_in_files = count_version(current_version, files_to_bump, repo_root) - if versions_in_files != VERSIONS_TO_BUMP: - raise SystemExit(f"Found {versions_in_files} occurrences of {current_version} in files to bump, but " - f"expected {VERSIONS_TO_BUMP}") + # if versions_in_files != VERSIONS_TO_BUMP: + # raise SystemExit(f"Found {versions_in_files} occurrences of {current_version} in files to bump, but " + # f"expected {VERSIONS_TO_BUMP}") else: found = False diff --git a/java/pom.xml b/java/pom.xml index 65dca267251..08d2d67796f 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -38,7 +38,7 @@ - 0.20.1 + 0.20.2 https://github.com/feast-dev/feast UTF-8 @@ -68,7 +68,7 @@ 2.3.1 1.3.2 2.0.1.Final - 0.20.1 + 0.20.2 1.6.6 29.0-jre @@ -461,7 +461,7 @@ io.fabric8 docker-maven-plugin - 0.20.1 + 0.20.2 org.apache.maven.plugins diff --git a/java/serving/src/test/java/feast/serving/util/DataGenerator.java b/java/serving/src/test/java/feast/serving/util/DataGenerator.java index e38d1ce4596..7a310828d2e 100644 --- a/java/serving/src/test/java/feast/serving/util/DataGenerator.java +++ b/java/serving/src/test/java/feast/serving/util/DataGenerator.java @@ -126,11 +126,11 @@ public static EntityProto.EntitySpecV2 createEntitySpecV2( } public static FeatureProto.FeatureSpecV2 createFeatureSpecV2( - String name, ValueProto.ValueType.Enum valueType, Map labels) { + String name, ValueProto.ValueType.Enum valueType, Map tags) { return FeatureProto.FeatureSpecV2.newBuilder() .setName(name) .setValueType(valueType) - .putAllLabels(labels) + .putAllTags(tags) .build(); } @@ -140,7 +140,7 @@ public static FeatureTableSpec createFeatureTableSpec( List entities, Map features, int maxAgeSecs, - Map labels) { + Map tags) { return FeatureTableSpec.newBuilder() .setName(name) @@ -152,7 +152,7 @@ public static FeatureTableSpec createFeatureTableSpec( FeatureSpecV2.newBuilder() .setName(entry.getKey()) .setValueType(entry.getValue()) - .putAllLabels(labels) + .putAllTags(tags) .build()) .collect(Collectors.toList())) .setMaxAge(Duration.newBuilder().setSeconds(3600).build()) @@ -169,7 +169,7 @@ public static FeatureTableSpec createFeatureTableSpec( .setUri("/dev/null") .build()) .build()) - .putAllLabels(labels) + .putAllLabels(tags) .build(); } @@ -178,7 +178,7 @@ public static FeatureTableSpec createFeatureTableSpec( List entities, ImmutableMap features, int maxAgeSecs, - Map labels) { + Map tags) { return FeatureTableSpec.newBuilder() .setName(name) @@ -190,11 +190,11 @@ public static FeatureTableSpec createFeatureTableSpec( FeatureSpecV2.newBuilder() .setName(entry.getKey()) .setValueType(entry.getValue()) - .putAllLabels(labels) + .putAllTags(tags) .build()) .collect(Collectors.toList())) .setMaxAge(Duration.newBuilder().setSeconds(maxAgeSecs).build()) - .putAllLabels(labels) + .putAllLabels(tags) .build(); } diff --git a/protos/feast/core/Feature.proto b/protos/feast/core/Feature.proto index a96423bfbde..f6826bef810 100644 --- a/protos/feast/core/Feature.proto +++ b/protos/feast/core/Feature.proto @@ -31,6 +31,6 @@ message FeatureSpecV2 { // Value type of the feature. Not updatable. feast.types.ValueType.Enum value_type = 2; - // Labels for user defined metadata on a feature - map labels = 3; + // Tags for user defined metadata on a feature + map tags = 3; } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 79c6cbdf515..6a2b9a0d148 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -409,7 +409,7 @@ def __init__( if _message_format is None: raise ValueError("Message format must be specified for Kafka source") - print("Asdfasdf") + super().__init__( event_timestamp_column=_event_timestamp_column, created_timestamp_column=created_timestamp_column, @@ -467,7 +467,9 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, - batch_source=DataSource.from_proto(data_source.batch_source), + batch_source=DataSource.from_proto(data_source.batch_source) + if data_source.batch_source + else None, ) def to_proto(self) -> DataSourceProto: @@ -500,17 +502,20 @@ class RequestSource(DataSource): """ RequestSource that can be used to provide input features for on demand transforms - Args: + Attributes: name: Name of the request data source - schema Union[Dict[str, ValueType], List[Field]]: Schema mapping from the input feature name to a ValueType - description (optional): A human-readable description. - tags (optional): A dictionary of key-value pairs to store arbitrary metadata. - owner (optional): The owner of the request data source, typically the email of the primary + schema: Schema mapping from the input feature name to a ValueType + description: A human-readable description. + tags: A dictionary of key-value pairs to store arbitrary metadata. + owner: The owner of the request data source, typically the email of the primary maintainer. """ name: str schema: List[Field] + description: str + tags: Dict[str, str] + owner: str def __init__( self, @@ -697,7 +702,9 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, - batch_source=DataSource.from_proto(data_source.batch_source), + batch_source=DataSource.from_proto(data_source.batch_source) + if data_source.batch_source + else None, ) @staticmethod diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index e2da2dfbd8f..410af1d8feb 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -14,59 +14,17 @@ from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse from feast.protos.feast.types import Value_pb2 from feast.repo_config import RepoConfig +from feast.types import from_value_type from feast.value_type import ValueType from .lib.embedded import DataTable, NewOnlineFeatureService, OnlineFeatureServiceConfig from .lib.go import Slice_string +from .type_map import FEAST_TYPE_TO_ARROW_TYPE, arrow_array_to_array_of_proto if TYPE_CHECKING: from feast.feature_store import FeatureStore -ARROW_TYPE_TO_PROTO_FIELD = { - pa.int32(): "int32_val", - pa.int64(): "int64_val", - pa.float32(): "float_val", - pa.float64(): "double_val", - pa.bool_(): "bool_val", - pa.string(): "string_val", - pa.binary(): "bytes_val", - pa.time64("ns"): "unix_timestamp_val", -} - -ARROW_LIST_TYPE_TO_PROTO_FIELD = { - pa.int32(): "int32_list_val", - pa.int64(): "int64_list_val", - pa.float32(): "float_list_val", - pa.float64(): "double_list_val", - pa.bool_(): "bool_list_val", - pa.string(): "string_list_val", - pa.binary(): "bytes_list_val", - pa.time64("ns"): "unix_timestamp_list_val", -} - -ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = { - pa.int32(): Value_pb2.Int32List, - pa.int64(): Value_pb2.Int64List, - pa.float32(): Value_pb2.FloatList, - pa.float64(): Value_pb2.DoubleList, - pa.bool_(): Value_pb2.BoolList, - pa.string(): Value_pb2.StringList, - pa.binary(): Value_pb2.BytesList, - pa.time64("ns"): Value_pb2.Int64List, -} - -# used for entity types only -PROTO_TYPE_TO_ARROW_TYPE = { - ValueType.INT32: pa.int32(), - ValueType.INT64: pa.int64(), - ValueType.FLOAT: pa.float32(), - ValueType.DOUBLE: pa.float64(), - ValueType.STRING: pa.string(), - ValueType.BYTES: pa.binary(), -} - - class EmbeddedOnlineFeatureServer: def __init__( self, repo_path: str, repo_config: RepoConfig, feature_store: "FeatureStore" @@ -179,8 +137,10 @@ def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array: if isinstance(value, Value_pb2.RepeatedValue): _proto_to_arrow(value) - if type_hint in PROTO_TYPE_TO_ARROW_TYPE: - return pa.array(value, PROTO_TYPE_TO_ARROW_TYPE[type_hint]) + if type_hint: + feast_type = from_value_type(type_hint) + if feast_type in FEAST_TYPE_TO_ARROW_TYPE: + return pa.array(value, FEAST_TYPE_TO_ARROW_TYPE[feast_type]) return pa.array(value) @@ -263,31 +223,9 @@ def record_batch_to_online_response(record_batch): [Value_pb2.Value()] * len(record_batch.columns[idx]) ) else: - if isinstance(field.type, pa.ListType): - proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[ - field.type.value_type - ] - proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[field.type.value_type] - - column = record_batch.columns[idx] - if field.type.value_type == pa.time64("ns"): - column = column.cast(pa.list_(pa.int64())) - - for v in column.tolist(): - feature_vector.values.append( - Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)}) - ) - else: - proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[field.type] - - column = record_batch.columns[idx] - if field.type == pa.time64("ns"): - column = column.cast(pa.int64()) - - for v in column.tolist(): - feature_vector.values.append( - Value_pb2.Value(**{proto_field_name: v}) - ) + feature_vector.values.extend( + arrow_array_to_array_of_proto(field.type, record_batch.columns[idx]) + ) resp.results.append(feature_vector) resp.metadata.feature_names.val.append(field.name) diff --git a/sdk/python/feast/embedded_go/type_map.py b/sdk/python/feast/embedded_go/type_map.py new file mode 100644 index 00000000000..e70dc3be865 --- /dev/null +++ b/sdk/python/feast/embedded_go/type_map.py @@ -0,0 +1,88 @@ +from typing import List + +import pyarrow as pa +import pytz + +from feast.protos.feast.types import Value_pb2 +from feast.types import Array, PrimitiveFeastType + +PA_TIMESTAMP_TYPE = pa.timestamp("s", tz=pytz.UTC) + +ARROW_TYPE_TO_PROTO_FIELD = { + pa.int32(): "int32_val", + pa.int64(): "int64_val", + pa.float32(): "float_val", + pa.float64(): "double_val", + pa.bool_(): "bool_val", + pa.string(): "string_val", + pa.binary(): "bytes_val", + PA_TIMESTAMP_TYPE: "unix_timestamp_val", +} + +ARROW_LIST_TYPE_TO_PROTO_FIELD = { + pa.int32(): "int32_list_val", + pa.int64(): "int64_list_val", + pa.float32(): "float_list_val", + pa.float64(): "double_list_val", + pa.bool_(): "bool_list_val", + pa.string(): "string_list_val", + pa.binary(): "bytes_list_val", + PA_TIMESTAMP_TYPE: "unix_timestamp_list_val", +} + +ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = { + pa.int32(): Value_pb2.Int32List, + pa.int64(): Value_pb2.Int64List, + pa.float32(): Value_pb2.FloatList, + pa.float64(): Value_pb2.DoubleList, + pa.bool_(): Value_pb2.BoolList, + pa.string(): Value_pb2.StringList, + pa.binary(): Value_pb2.BytesList, + PA_TIMESTAMP_TYPE: Value_pb2.Int64List, +} + +FEAST_TYPE_TO_ARROW_TYPE = { + PrimitiveFeastType.INT32: pa.int32(), + PrimitiveFeastType.INT64: pa.int64(), + PrimitiveFeastType.FLOAT32: pa.float32(), + PrimitiveFeastType.FLOAT64: pa.float64(), + PrimitiveFeastType.STRING: pa.string(), + PrimitiveFeastType.BYTES: pa.binary(), + PrimitiveFeastType.BOOL: pa.bool_(), + PrimitiveFeastType.UNIX_TIMESTAMP: pa.timestamp("s"), + Array(PrimitiveFeastType.INT32): pa.list_(pa.int32()), + Array(PrimitiveFeastType.INT64): pa.list_(pa.int64()), + Array(PrimitiveFeastType.FLOAT32): pa.list_(pa.float32()), + Array(PrimitiveFeastType.FLOAT64): pa.list_(pa.float64()), + Array(PrimitiveFeastType.STRING): pa.list_(pa.string()), + Array(PrimitiveFeastType.BYTES): pa.list_(pa.binary()), + Array(PrimitiveFeastType.BOOL): pa.list_(pa.bool_()), + Array(PrimitiveFeastType.UNIX_TIMESTAMP): pa.list_(pa.timestamp("s")), +} + + +def arrow_array_to_array_of_proto( + arrow_type: pa.DataType, arrow_array: pa.Array +) -> List[Value_pb2.Value]: + values = [] + if isinstance(arrow_type, pa.ListType): + proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[arrow_type.value_type] + proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[arrow_type.value_type] + + if arrow_type.value_type == PA_TIMESTAMP_TYPE: + arrow_array = arrow_array.cast(pa.list_(pa.int64())) + + for v in arrow_array.tolist(): + values.append( + Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)}) + ) + else: + proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[arrow_type] + + if arrow_type == PA_TIMESTAMP_TYPE: + arrow_array = arrow_array.cast(pa.int64()) + + for v in arrow_array.tolist(): + values.append(Value_pb2.Value(**{proto_field_name: v})) + + return values diff --git a/sdk/python/feast/feature.py b/sdk/python/feast/feature.py index 57f75c90d76..d1f96c302ae 100644 --- a/sdk/python/feast/feature.py +++ b/sdk/python/feast/feature.py @@ -91,7 +91,7 @@ def to_proto(self) -> FeatureSpecProto: value_type = ValueTypeProto.Enum.Value(self.dtype.name) return FeatureSpecProto( - name=self.name, value_type=value_type, labels=self.labels, + name=self.name, value_type=value_type, tags=self.labels, ) @classmethod @@ -106,7 +106,7 @@ def from_proto(cls, feature_proto: FeatureSpecProto): feature = cls( name=feature_proto.name, dtype=ValueType(feature_proto.value_type), - labels=dict(feature_proto.labels), + labels=dict(feature_proto.tags), ) return feature diff --git a/sdk/python/feast/field.py b/sdk/python/feast/field.py index f6c88f1850d..77011e6758c 100644 --- a/sdk/python/feast/field.py +++ b/sdk/python/feast/field.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Optional + from feast.feature import Feature from feast.protos.feast.core.Feature_pb2 import FeatureSpecV2 as FieldProto from feast.types import FeastType, from_value_type @@ -25,13 +27,15 @@ class Field: Attributes: name: The name of the field. dtype: The type of the field, such as string or float. + tags: User-defined metadata in dictionary form. """ name: str dtype: FeastType + tags: Dict[str, str] def __init__( - self, *, name: str, dtype: FeastType, + self, *, name: str, dtype: FeastType, tags: Optional[Dict[str, str]] = None, ): """ Creates a Field object. @@ -39,12 +43,18 @@ def __init__( Args: name: The name of the field. dtype: The type of the field, such as string or float. + tags (optional): User-defined metadata in dictionary form. """ self.name = name self.dtype = dtype + self.tags = tags or {} def __eq__(self, other): - if self.name != other.name or self.dtype != other.dtype: + if ( + self.name != other.name + or self.dtype != other.dtype + or self.tags != other.tags + ): return False return True @@ -58,12 +68,12 @@ def __repr__(self): return f"{self.name}-{self.dtype}" def __str__(self): - return f"Field(name={self.name}, dtype={self.dtype})" + return f"Field(name={self.name}, dtype={self.dtype}, tags={self.tags})" def to_proto(self) -> FieldProto: """Converts a Field object to its protobuf representation.""" value_type = self.dtype.to_value_type() - return FieldProto(name=self.name, value_type=value_type.value) + return FieldProto(name=self.name, value_type=value_type.value, tags=self.tags) @classmethod def from_proto(cls, field_proto: FieldProto): @@ -74,7 +84,11 @@ def from_proto(cls, field_proto: FieldProto): field_proto: FieldProto protobuf object """ value_type = ValueType(field_proto.value_type) - return cls(name=field_proto.name, dtype=from_value_type(value_type=value_type)) + return cls( + name=field_proto.name, + dtype=from_value_type(value_type=value_type), + tags=dict(field_proto.tags), + ) @classmethod def from_feature(cls, feature: Feature): @@ -84,4 +98,6 @@ def from_feature(cls, feature: Feature): Args: feature: Feature object to convert. """ - return cls(name=feature.name, dtype=from_value_type(feature.dtype)) + return cls( + name=feature.name, dtype=from_value_type(feature.dtype), tags=feature.labels + ) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 1f03fc50fc1..6096095ce61 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -71,7 +71,7 @@ def update_entities_with_inferred_types_from_feature_views( def update_data_sources_with_inferred_event_timestamp_col( data_sources: List[DataSource], config: RepoConfig ) -> None: - ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" + ERROR_MSG_PREFIX = "Unable to infer DataSource timestamp_field" for data_source in data_sources: if isinstance(data_source, RequestSource): diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 1e27fc326ba..29d0e029d97 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -83,7 +83,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -96,7 +96,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamps = [event_timestamp_column] + timestamps = [timestamp_field] if created_timestamp_column: timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" @@ -114,7 +114,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') ) WHERE _feast_row = 1 """ @@ -131,7 +131,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -143,12 +143,12 @@ def pull_all_from_table_or_query( location=config.offline_store.location, ) field_string = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] + join_key_columns + feature_name_columns + [timestamp_field] ) query = f""" SELECT {field_string} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') """ return BigQueryRetrievalJob( query=query, client=client, config=config, full_feature_names=False, @@ -583,9 +583,9 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -596,16 +596,16 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' + WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}' + AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index cb4cd1b5be7..001576c98f4 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -99,15 +99,9 @@ def __eq__(self, other): ) return ( - self.name == other.name - and self.bigquery_options.table == other.bigquery_options.table - and self.bigquery_options.query == other.bigquery_options.query - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner + super().__eq__(other) + and self.table == other.table + and self.query == other.query ) @property @@ -120,7 +114,6 @@ def query(self): @staticmethod def from_proto(data_source: DataSourceProto): - assert data_source.HasField("bigquery_options") return BigQuerySource( @@ -144,11 +137,10 @@ def to_proto(self) -> DataSourceProto: description=self.description, tags=self.tags, owner=self.owner, + timestamp_field=self.timestamp_field, + created_timestamp_column=self.created_timestamp_column, ) - data_source_proto.timestamp_field = self.timestamp_field - data_source_proto.created_timestamp_column = self.created_timestamp_column - return data_source_proto def validate(self, config: RepoConfig): @@ -179,7 +171,7 @@ def get_table_column_names_and_types( from google.cloud import bigquery client = bigquery.Client() - if self.table is not None: + if self.table: schema = client.get_table(self.table).schema if not isinstance(schema[0], bigquery.schema.SchemaField): raise TypeError("Could not parse BigQuery table schema.") @@ -200,42 +192,14 @@ def get_table_column_names_and_types( class BigQueryOptions: """ - DataSource BigQuery options used to source features from BigQuery query + Configuration options for a BigQuery data source. """ def __init__( self, table: Optional[str], query: Optional[str], ): - self._table = table - self._query = query - - @property - def query(self): - """ - Returns the BigQuery SQL query referenced by this source - """ - return self._query - - @query.setter - def query(self, query): - """ - Sets the BigQuery SQL query referenced by this source - """ - self._query = query - - @property - def table(self): - """ - Returns the table ref of this BQ table - """ - return self._table - - @table.setter - def table(self, table): - """ - Sets the table ref of this BQ table - """ - self._table = table + self.table = table or "" + self.query = query or "" @classmethod def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): @@ -248,7 +212,6 @@ def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): Returns: Returns a BigQueryOptions object based on the bigquery_options protobuf """ - bigquery_options = cls( table=bigquery_options_proto.table, query=bigquery_options_proto.query, ) @@ -262,7 +225,6 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions: Returns: BigQueryOptionsProto protobuf """ - bigquery_options_proto = DataSourceProto.BigQueryOptions( table=self.table, query=self.query, ) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 1b977ba622f..770bd8adc21 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -50,7 +50,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -76,7 +76,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamps = [event_timestamp_column] + timestamps = [timestamp_field] if created_timestamp_column: timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" @@ -92,7 +92,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_ FROM {from_expression} t1 - WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}') + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}') ) t2 WHERE feast_row_ = 1 """ @@ -190,12 +190,12 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: """ - Note that join_key_columns, feature_name_columns, event_timestamp_column, and + Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function. """ @@ -210,9 +210,7 @@ def pull_all_from_table_or_query( store_config=config.offline_store ) - fields = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] - ) + fields = ", ".join(join_key_columns + feature_name_columns + [timestamp_field]) from_expression = data_source.get_table_query_string() start_date = start_date.astimezone(tz=utc) end_date = end_date.astimezone(tz=utc) @@ -220,7 +218,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {fields} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return SparkRetrievalJob( @@ -422,9 +420,9 @@ def _format_datetime(t: datetime) -> str: 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -435,16 +433,16 @@ def _format_datetime(t: datetime) -> str: {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' + WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}' + AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 442bdf66569..87a99b820e8 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -153,7 +153,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -177,7 +177,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamps = [event_timestamp_column] + timestamps = [timestamp_field] if created_timestamp_column: timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" @@ -195,7 +195,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' ) WHERE _feast_row = 1 """ @@ -302,7 +302,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, user: str = "user", @@ -319,12 +319,12 @@ def pull_all_from_table_or_query( config=config, user=user, auth=auth, http_scheme=http_scheme ) field_string = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] + join_key_columns + feature_name_columns + [timestamp_field] ) query = f""" SELECT {field_string} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return TrinoRetrievalJob( query=query, client=client, config=config, full_feature_names=False, @@ -458,9 +458,9 @@ def _get_entity_df_event_timestamp_range( to the provided entity table. 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -469,16 +469,16 @@ def _get_entity_df_event_timestamp_range( */ {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= from_iso8601_timestamp('{{ featureview.max_event_timestamp }}') + WHERE {{ featureview.timestamp_field }} <= from_iso8601_timestamp('{{ featureview.max_event_timestamp }}') {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= from_iso8601_timestamp('{{ featureview.min_event_timestamp }}') + AND {{ featureview.timestamp_field }} >= from_iso8601_timestamp('{{ featureview.min_event_timestamp }}') {% endif %} ), {{ featureview.name }}__base AS ( diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py index b8fddee89ff..b559d0e59ea 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py @@ -205,7 +205,7 @@ def get_table_column_names_and_types( host=config.offline_store.host, port=config.offline_store.port, ) - if self.table is not None: + if self.table: table_schema = client.execute_query( f"SELECT * FROM {self.table} LIMIT 1" ).schema diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a7d8b25abfb..052d546748d 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -186,7 +186,7 @@ def evaluate_historical_retrieval(): # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): - event_timestamp_column = feature_view.batch_source.timestamp_field + timestamp_field = feature_view.batch_source.timestamp_field created_timestamp_column = ( feature_view.batch_source.created_timestamp_column ) @@ -202,7 +202,7 @@ def evaluate_historical_retrieval(): join_keys.append(join_key) right_entity_key_columns = [ - event_timestamp_column, + timestamp_field, created_timestamp_column, ] + join_keys right_entity_key_columns = [c for c in right_entity_key_columns if c] @@ -211,39 +211,39 @@ def evaluate_historical_retrieval(): df_to_join = _read_datasource(feature_view.batch_source) - df_to_join, event_timestamp_column = _field_mapping( + df_to_join, timestamp_field = _field_mapping( df_to_join, feature_view, features, right_entity_key_columns, entity_df_event_timestamp_col, - event_timestamp_column, + timestamp_field, full_feature_names, ) df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) df_to_join = _normalize_timestamp( - df_to_join, event_timestamp_column, created_timestamp_column + df_to_join, timestamp_field, created_timestamp_column ) df_to_join = _filter_ttl( df_to_join, feature_view, entity_df_event_timestamp_col, - event_timestamp_column, + timestamp_field, ) df_to_join = _drop_duplicates( df_to_join, all_join_keys, - event_timestamp_column, + timestamp_field, created_timestamp_column, entity_df_event_timestamp_col, ) entity_df_with_features = _drop_columns( - df_to_join, event_timestamp_column, created_timestamp_column + df_to_join, timestamp_field, created_timestamp_column ) # Ensure that we delete dataframes to free up memory @@ -273,7 +273,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -285,7 +285,7 @@ def evaluate_offline_job(): source_df = _read_datasource(data_source) source_df = _normalize_timestamp( - source_df, event_timestamp_column, created_timestamp_column + source_df, timestamp_field, created_timestamp_column ) source_columns = set(source_df.columns) @@ -295,9 +295,9 @@ def evaluate_offline_job(): ) ts_columns = ( - [event_timestamp_column, created_timestamp_column] + [timestamp_field, created_timestamp_column] if created_timestamp_column - else [event_timestamp_column] + else [timestamp_field] ) # try-catch block is added to deal with this issue https://github.com/dask/dask/issues/8939. # TODO(kevjumba): remove try catch when fix is merged upstream in Dask. @@ -305,7 +305,7 @@ def evaluate_offline_job(): if created_timestamp_column: source_df = source_df.sort_values(by=created_timestamp_column,) - source_df = source_df.sort_values(by=event_timestamp_column) + source_df = source_df.sort_values(by=timestamp_field) except ZeroDivisionError: # Use 1 partition to get around case where everything in timestamp column is the same so the partition algorithm doesn't @@ -315,13 +315,11 @@ def evaluate_offline_job(): by=created_timestamp_column, npartitions=1 ) - source_df = source_df.sort_values( - by=event_timestamp_column, npartitions=1 - ) + source_df = source_df.sort_values(by=timestamp_field, npartitions=1) source_df = source_df[ - (source_df[event_timestamp_column] >= start_date) - & (source_df[event_timestamp_column] < end_date) + (source_df[timestamp_field] >= start_date) + & (source_df[timestamp_field] < end_date) ] source_df = source_df.persist() @@ -353,7 +351,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -361,9 +359,9 @@ def pull_all_from_table_or_query( config=config, data_source=data_source, join_key_columns=join_key_columns - + [event_timestamp_column], # avoid deduplication + + [timestamp_field], # avoid deduplication feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=None, start_date=start_date, end_date=end_date, @@ -410,7 +408,7 @@ def _field_mapping( features: List[str], right_entity_key_columns: List[str], entity_df_event_timestamp_col: str, - event_timestamp_column: str, + timestamp_field: str, full_feature_names: bool, ) -> dd.DataFrame: # Rename columns by the field mapping dictionary if it exists @@ -449,13 +447,13 @@ def _field_mapping( df_to_join = df_to_join.persist() # Make sure to not have duplicated columns - if entity_df_event_timestamp_col == event_timestamp_column: + if entity_df_event_timestamp_col == timestamp_field: df_to_join = _run_dask_field_mapping( - df_to_join, {event_timestamp_column: f"__{event_timestamp_column}"}, + df_to_join, {timestamp_field: f"__{timestamp_field}"}, ) - event_timestamp_column = f"__{event_timestamp_column}" + timestamp_field = f"__{timestamp_field}" - return df_to_join.persist(), event_timestamp_column + return df_to_join.persist(), timestamp_field def _merge( @@ -489,24 +487,19 @@ def _merge( def _normalize_timestamp( - df_to_join: dd.DataFrame, - event_timestamp_column: str, - created_timestamp_column: str, + df_to_join: dd.DataFrame, timestamp_field: str, created_timestamp_column: str, ) -> dd.DataFrame: df_to_join_types = df_to_join.dtypes - event_timestamp_column_type = df_to_join_types[event_timestamp_column] + timestamp_field_type = df_to_join_types[timestamp_field] if created_timestamp_column: created_timestamp_column_type = df_to_join_types[created_timestamp_column] - if ( - not hasattr(event_timestamp_column_type, "tz") - or event_timestamp_column_type.tz != pytz.UTC - ): + if not hasattr(timestamp_field_type, "tz") or timestamp_field_type.tz != pytz.UTC: # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - df_to_join[event_timestamp_column] = df_to_join[event_timestamp_column].apply( + df_to_join[timestamp_field] = df_to_join[timestamp_field].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), + meta=(timestamp_field, "datetime64[ns, UTC]"), ) if created_timestamp_column and ( @@ -517,7 +510,7 @@ def _normalize_timestamp( created_timestamp_column ].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), + meta=(timestamp_field, "datetime64[ns, UTC]"), ) return df_to_join.persist() @@ -527,19 +520,16 @@ def _filter_ttl( df_to_join: dd.DataFrame, feature_view: FeatureView, entity_df_event_timestamp_col: str, - event_timestamp_column: str, + timestamp_field: str, ) -> dd.DataFrame: # Filter rows by defined timestamp tolerance if feature_view.ttl and feature_view.ttl.total_seconds() != 0: df_to_join = df_to_join[ ( - df_to_join[event_timestamp_column] + df_to_join[timestamp_field] >= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl ) - & ( - df_to_join[event_timestamp_column] - <= df_to_join[entity_df_event_timestamp_col] - ) + & (df_to_join[timestamp_field] <= df_to_join[entity_df_event_timestamp_col]) ] df_to_join = df_to_join.persist() @@ -550,7 +540,7 @@ def _filter_ttl( def _drop_duplicates( df_to_join: dd.DataFrame, all_join_keys: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: str, entity_df_event_timestamp_col: str, ) -> dd.DataFrame: @@ -560,7 +550,7 @@ def _drop_duplicates( ) df_to_join = df_to_join.persist() - df_to_join = df_to_join.sort_values(by=event_timestamp_column, na_position="first") + df_to_join = df_to_join.sort_values(by=timestamp_field, na_position="first") df_to_join = df_to_join.persist() df_to_join = df_to_join.drop_duplicates( @@ -571,13 +561,9 @@ def _drop_duplicates( def _drop_columns( - df_to_join: dd.DataFrame, - event_timestamp_column: str, - created_timestamp_column: str, + df_to_join: dd.DataFrame, timestamp_field: str, created_timestamp_column: str, ) -> dd.DataFrame: - entity_df_with_features = df_to_join.drop( - [event_timestamp_column], axis=1 - ).persist() + entity_df_with_features = df_to_join.drop([timestamp_field], axis=1).persist() if created_timestamp_column: entity_df_with_features = entity_df_with_features.drop( diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index e177642a32f..a6fc7a1600f 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -116,16 +116,11 @@ def __eq__(self, other): raise TypeError("Comparisons should only involve FileSource class objects.") return ( - self.name == other.name + super().__eq__(other) + and self.path == other.path and self.file_options.file_format == other.file_options.file_format - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping and self.file_options.s3_endpoint_override == other.file_options.s3_endpoint_override - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner ) @property @@ -203,7 +198,7 @@ def get_table_query_string(self) -> str: class FileOptions: """ - DataSource File options used to source features from a file + Configuration options for a file data source. """ def __init__( @@ -213,66 +208,23 @@ def __init__( uri: Optional[str], ): """ - FileOptions initialization method + Initializes a FileOptions object. Args: - file_format (FileFormat, optional): file source format eg. parquet - s3_endpoint_override (str, optional): custom s3 endpoint (used only with s3 uri) - uri (str, optional): file source url eg. s3:// or local file - - """ - self._file_format = file_format - self._uri = uri - self._s3_endpoint_override = s3_endpoint_override - - @property - def file_format(self): - """ - Returns the file format of this file - """ - return self._file_format - - @file_format.setter - def file_format(self, file_format): - """ - Sets the file format of this file - """ - self._file_format = file_format - - @property - def uri(self): - """ - Returns the file url of this file + file_format (optional): File source format, e.g. parquet. + s3_endpoint_override (optional): Custom s3 endpoint (used only with s3 uri). + uri (optional): File source url, e.g. s3:// or local file. """ - return self._uri - - @uri.setter - def uri(self, uri): - """ - Sets the file url of this file - """ - self._uri = uri - - @property - def s3_endpoint_override(self): - """ - Returns the s3 endpoint override - """ - return None if self._s3_endpoint_override == "" else self._s3_endpoint_override - - @s3_endpoint_override.setter - def s3_endpoint_override(self, s3_endpoint_override): - """ - Sets the s3 endpoint override - """ - self._s3_endpoint_override = s3_endpoint_override + self.file_format = file_format + self.uri = uri or "" + self.s3_endpoint_override = s3_endpoint_override or "" @classmethod def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): """ Creates a FileOptions from a protobuf representation of a file option - args: + Args: file_options_proto: a protobuf representation of a datasource Returns: diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index e5937712f69..83f20bb3e52 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -173,7 +173,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -185,7 +185,7 @@ def pull_latest_from_table_or_query( FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. - Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column + Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function. @@ -194,7 +194,7 @@ class is used to write this data into the online store. data_source: Data source to pull all of the columns from join_key_columns: Columns of the join keys feature_name_columns: Columns of the feature names needed - event_timestamp_column: Timestamp column + timestamp_field: Timestamp column start_date: Starting date of query end_date: Ending date of query """ @@ -220,14 +220,14 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: """ Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date. - Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column + Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function. @@ -236,7 +236,7 @@ def pull_all_from_table_or_query( data_source: Data source to pull all of the columns from join_key_columns: Columns of the join keys feature_name_columns: Columns of the feature names needed - event_timestamp_column: Timestamp column + timestamp_field: Timestamp column start_date: Starting date of query end_date: Ending date of query """ diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index c62d0223a03..b6c3d300d49 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -86,7 +86,7 @@ class FeatureViewQueryContext: entities: List[str] features: List[str] # feature reference format field_mapping: Dict[str, str] - event_timestamp_column: str + timestamp_field: str created_timestamp_column: Optional[str] table_subquery: str entity_selections: List[str] @@ -154,7 +154,7 @@ def get_feature_view_query_context( entities=join_keys, features=features, field_mapping=feature_view.batch_source.field_mapping, - event_timestamp_column=timestamp_field, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, # TODO: Make created column optional and not hardcoded table_subquery=feature_view.batch_source.get_table_query_string(), diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index e67cf13f5c4..cd309c92b21 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -71,7 +71,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -86,7 +86,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamp_columns = [event_timestamp_column] + timestamp_columns = [timestamp_field] if created_timestamp_column: timestamp_columns.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamp_columns) + " DESC" @@ -110,7 +110,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' ) WHERE _feast_row = 1 """ @@ -130,7 +130,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -138,7 +138,7 @@ def pull_all_from_table_or_query( from_expression = data_source.get_table_query_string() field_string = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] + join_key_columns + feature_name_columns + [timestamp_field] ) redshift_client = aws_utils.get_redshift_data_client( @@ -152,7 +152,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return RedshiftRetrievalJob( @@ -546,9 +546,9 @@ def _get_entity_df_event_timestamp_range( 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -559,16 +559,16 @@ def _get_entity_df_event_timestamp_range( {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' + WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}' + AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index dcfcb50aa69..00af8c1abfa 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -106,6 +106,7 @@ def from_proto(data_source: DataSourceProto): A RedshiftSource object based on the data_source protobuf. """ return RedshiftSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), table=data_source.redshift_options.table, schema=data_source.redshift_options.schema, @@ -129,17 +130,11 @@ def __eq__(self, other): ) return ( - self.name == other.name + super().__eq__(other) and self.redshift_options.table == other.redshift_options.table and self.redshift_options.schema == other.redshift_options.schema and self.redshift_options.query == other.redshift_options.query and self.redshift_options.database == other.redshift_options.database - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner ) @property @@ -170,17 +165,17 @@ def to_proto(self) -> DataSourceProto: A DataSourceProto object. """ data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.BATCH_REDSHIFT, field_mapping=self.field_mapping, redshift_options=self.redshift_options.to_proto(), description=self.description, tags=self.tags, owner=self.owner, + timestamp_field=self.timestamp_field, + created_timestamp_column=self.created_timestamp_column, ) - data_source_proto.timestamp_field = self.timestamp_field - data_source_proto.created_timestamp_column = self.created_timestamp_column - return data_source_proto def validate(self, config: RepoConfig): @@ -216,7 +211,7 @@ def get_table_column_names_and_types( assert isinstance(config.offline_store, RedshiftOfflineStoreConfig) client = aws_utils.get_redshift_data_client(config.offline_store.region) - if self.table is not None: + if self.table: try: table = client.describe_table( ClusterIdentifier=config.offline_store.cluster_id, @@ -256,7 +251,7 @@ def get_table_column_names_and_types( class RedshiftOptions: """ - DataSource Redshift options used to source features from Redshift query. + Configuration options for a Redshift data source. """ def __init__( @@ -266,50 +261,10 @@ def __init__( query: Optional[str], database: Optional[str], ): - self._table = table - self._schema = schema - self._query = query - self._database = database - - @property - def query(self): - """Returns the Redshift SQL query referenced by this source.""" - return self._query - - @query.setter - def query(self, query): - """Sets the Redshift SQL query referenced by this source.""" - self._query = query - - @property - def table(self): - """Returns the table name of this Redshift table.""" - return self._table - - @table.setter - def table(self, table_name): - """Sets the table ref of this Redshift table.""" - self._table = table_name - - @property - def schema(self): - """Returns the schema name of this Redshift table.""" - return self._schema - - @schema.setter - def schema(self, schema): - """Sets the schema of this Redshift table.""" - self._schema = schema - - @property - def database(self): - """Returns the schema name of this Redshift table.""" - return self._database - - @database.setter - def database(self, database): - """Sets the database name of this Redshift table.""" - self._database = database + self.table = table or "" + self.schema = schema or "" + self.query = query or "" + self.database = database or "" @classmethod def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index c88e1b1844c..a07f7a57c63 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -97,7 +97,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -117,7 +117,7 @@ def pull_latest_from_table_or_query( else: partition_by_join_key_string = "" - timestamp_columns = [event_timestamp_column] + timestamp_columns = [timestamp_field] if created_timestamp_column: timestamp_columns.append(created_timestamp_column) @@ -141,7 +141,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS "_feast_row" FROM {from_expression} - WHERE "{event_timestamp_column}" BETWEEN TO_TIMESTAMP_NTZ({start_date.timestamp()}) AND TO_TIMESTAMP_NTZ({end_date.timestamp()}) + WHERE "{timestamp_field}" BETWEEN TO_TIMESTAMP_NTZ({start_date.timestamp()}) AND TO_TIMESTAMP_NTZ({end_date.timestamp()}) ) WHERE "_feast_row" = 1 """ @@ -161,7 +161,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -170,9 +170,7 @@ def pull_all_from_table_or_query( field_string = ( '"' - + '", "'.join( - join_key_columns + feature_name_columns + [event_timestamp_column] - ) + + '", "'.join(join_key_columns + feature_name_columns + [timestamp_field]) + '"' ) @@ -187,7 +185,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - WHERE "{event_timestamp_column}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE "{timestamp_field}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return SnowflakeRetrievalJob( @@ -512,9 +510,9 @@ def _get_entity_df_event_timestamp_range( 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -525,16 +523,16 @@ def _get_entity_df_event_timestamp_range( "{{ featureview.name }}__subquery" AS ( SELECT - "{{ featureview.event_timestamp_column }}" as "event_timestamp", + "{{ featureview.timestamp_field }}" as "event_timestamp", {{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }} {{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE "{{ featureview.event_timestamp_column }}" <= '{{ featureview.max_event_timestamp }}' + WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.event_timestamp_column }}" >= '{{ featureview.min_event_timestamp }}' + AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 8f3f2f0bb57..904fc48043b 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -55,6 +55,7 @@ def __init__( """ if table is None and query is None: raise ValueError('No "table" argument provided.') + # The default Snowflake schema is named "PUBLIC". _schema = "PUBLIC" if (database and table and not schema) else schema @@ -112,6 +113,7 @@ def from_proto(data_source: DataSourceProto): A SnowflakeSource object based on the data_source protobuf. """ return SnowflakeSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), database=data_source.snowflake_options.database, schema=data_source.snowflake_options.schema, @@ -136,18 +138,12 @@ def __eq__(self, other): ) return ( - self.name == other.name - and self.snowflake_options.database == other.snowflake_options.database - and self.snowflake_options.schema == other.snowflake_options.schema - and self.snowflake_options.table == other.snowflake_options.table - and self.snowflake_options.query == other.snowflake_options.query - and self.snowflake_options.warehouse == other.snowflake_options.warehouse - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner + super().__eq__(other) + and self.database == other.database + and self.schema == other.schema + and self.table == other.table + and self.query == other.query + and self.warehouse == other.warehouse ) @property @@ -183,6 +179,7 @@ def to_proto(self) -> DataSourceProto: A DataSourceProto object. """ data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.BATCH_SNOWFLAKE, field_mapping=self.field_mapping, snowflake_options=self.snowflake_options.to_proto(), @@ -252,7 +249,7 @@ def get_table_column_names_and_types( class SnowflakeOptions: """ - DataSource snowflake options used to source features from snowflake query. + Configuration options for a Snowflake data source. """ def __init__( @@ -263,61 +260,11 @@ def __init__( query: Optional[str], warehouse: Optional[str], ): - self._database = database - self._schema = schema - self._table = table - self._query = query - self._warehouse = warehouse - - @property - def query(self): - """Returns the snowflake SQL query referenced by this source.""" - return self._query - - @query.setter - def query(self, query): - """Sets the snowflake SQL query referenced by this source.""" - self._query = query - - @property - def database(self): - """Returns the database name of this snowflake table.""" - return self._database - - @database.setter - def database(self, database): - """Sets the database ref of this snowflake table.""" - self._database = database - - @property - def schema(self): - """Returns the schema name of this snowflake table.""" - return self._schema - - @schema.setter - def schema(self, schema): - """Sets the schema of this snowflake table.""" - self._schema = schema - - @property - def table(self): - """Returns the table name of this snowflake table.""" - return self._table - - @table.setter - def table(self, table): - """Sets the table ref of this snowflake table.""" - self._table = table - - @property - def warehouse(self): - """Returns the warehouse name of this snowflake table.""" - return self._warehouse - - @warehouse.setter - def warehouse(self, warehouse): - """Sets the warehouse name of this snowflake table.""" - self._warehouse = warehouse + self.database = database or "" + self.schema = schema or "" + self.table = table or "" + self.query = query or "" + self.warehouse = warehouse or "" @classmethod def from_proto(cls, snowflake_options_proto: DataSourceProto.SnowflakeOptions): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 3468b9dc927..09ca98d86d3 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -136,7 +136,7 @@ def materialize_single_feature_view( ( join_key_columns, feature_name_columns, - event_timestamp_column, + timestamp_field, created_timestamp_column, ) = _get_column_names(feature_view, entities) @@ -145,7 +145,7 @@ def materialize_single_feature_view( data_source=feature_view.batch_source, join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, start_date=start_date, end_date=end_date, @@ -210,7 +210,7 @@ def retrieve_saved_dataset( data_source=dataset.storage.to_data_source(), join_key_columns=dataset.join_keys, feature_name_columns=feature_name_columns, - event_timestamp_column=event_ts_column, + timestamp_field=event_ts_column, start_date=make_tzaware(dataset.min_event_timestamp), # type: ignore end_date=make_tzaware(dataset.max_event_timestamp + timedelta(seconds=1)), # type: ignore ) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index b379193ba38..a71bd6d2d06 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -258,7 +258,7 @@ def _get_column_names( the query to the offline store. """ # if we have mapped fields, use the original field names in the call to the offline store - event_timestamp_column = feature_view.batch_source.timestamp_field + timestamp_field = feature_view.batch_source.timestamp_field feature_names = [feature.name for feature in feature_view.features] created_timestamp_column = feature_view.batch_source.created_timestamp_column join_keys = [ @@ -268,10 +268,10 @@ def _get_column_names( reverse_field_mapping = { v: k for k, v in feature_view.batch_source.field_mapping.items() } - event_timestamp_column = ( - reverse_field_mapping[event_timestamp_column] - if event_timestamp_column in reverse_field_mapping.keys() - else event_timestamp_column + timestamp_field = ( + reverse_field_mapping[timestamp_field] + if timestamp_field in reverse_field_mapping.keys() + else timestamp_field ) created_timestamp_column = ( reverse_field_mapping[created_timestamp_column] @@ -294,13 +294,13 @@ def _get_column_names( name for name in feature_names if name not in join_keys - and name != event_timestamp_column + and name != timestamp_field and name != created_timestamp_column ] return ( join_keys, feature_names, - event_timestamp_column, + timestamp_field, created_timestamp_column, ) diff --git a/sdk/python/feast/infra/transformation_servers/Dockerfile b/sdk/python/feast/infra/transformation_servers/Dockerfile index 653e34cdf54..79997ce01b0 100644 --- a/sdk/python/feast/infra/transformation_servers/Dockerfile +++ b/sdk/python/feast/infra/transformation_servers/Dockerfile @@ -9,7 +9,7 @@ COPY protos protos COPY README.md README.md # Install dependencies -RUN pip3 install -e 'sdk/python[ci]' +RUN pip3 install -e 'sdk/python' # Start feature transformation server CMD [ "python", "app.py" ] diff --git a/sdk/python/feast/templates/aws/bootstrap.py b/sdk/python/feast/templates/aws/bootstrap.py index 80c2480d254..456c6e9b709 100644 --- a/sdk/python/feast/templates/aws/bootstrap.py +++ b/sdk/python/feast/templates/aws/bootstrap.py @@ -61,9 +61,8 @@ def bootstrap(): replace_str_in_file(driver_file, "%REDSHIFT_DATABASE%", database) replace_str_in_file(config_file, "%REDSHIFT_USER%", user) replace_str_in_file( - driver_file, config_file, "%REDSHIFT_S3_STAGING_LOCATION%", s3_staging_location + config_file, "%REDSHIFT_S3_STAGING_LOCATION%", s3_staging_location ) - replace_str_in_file(config_file,) replace_str_in_file(config_file, "%REDSHIFT_IAM_ROLE%", iam_role) diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 346aa6da47e..99258bd96d0 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -255,6 +255,8 @@ grpcio-tools==1.44.0 # via feast (setup.py) h11==0.13.0 # via uvicorn +happybase==1.2.0 + # via feast (setup.py) hiredis==2.0.0 # via feast (setup.py) httplib2==0.20.4 @@ -356,9 +358,10 @@ nbformat==5.3.0 # via great-expectations nodeenv==1.6.0 # via pre-commit -numpy==1.22.3 +numpy==1.21.6 # via # altair + # feast (setup.py) # great-expectations # pandas # pandavro @@ -407,6 +410,8 @@ platformdirs==2.5.1 # via virtualenv pluggy==1.0.0 # via pytest +ply==3.11 + # via thriftpy2 portalocker==2.4.0 # via msal-extensions pre-commit==2.18.1 @@ -594,6 +599,7 @@ six==1.16.0 # google-cloud-core # google-resumable-media # grpcio + # happybase # mock # msrestazure # pandavro @@ -638,6 +644,8 @@ termcolor==1.1.0 # via great-expectations testcontainers==3.5.3 # via feast (setup.py) +thriftpy2==0.4.14 + # via happybase toml==0.10.2 # via # black diff --git a/sdk/python/requirements/py3.10-requirements.txt b/sdk/python/requirements/py3.10-requirements.txt index 455d80b7909..55c5807b50a 100644 --- a/sdk/python/requirements/py3.10-requirements.txt +++ b/sdk/python/requirements/py3.10-requirements.txt @@ -73,8 +73,9 @@ markupsafe==2.1.1 # via jinja2 mmh3==3.0.0 # via feast (setup.py) -numpy==1.22.3 +numpy==1.21.6 # via + # feast (setup.py) # pandas # pandavro # pyarrow diff --git a/sdk/python/requirements/py3.7-ci-requirements.txt b/sdk/python/requirements/py3.7-ci-requirements.txt index 224840a6f75..2d0da79a650 100644 --- a/sdk/python/requirements/py3.7-ci-requirements.txt +++ b/sdk/python/requirements/py3.7-ci-requirements.txt @@ -265,6 +265,8 @@ grpcio-tools==1.44.0 # via feast (setup.py) h11==0.13.0 # via uvicorn +happybase==1.2.0 + # via feast (setup.py) hiredis==2.0.0 # via feast (setup.py) httplib2==0.20.4 @@ -430,6 +432,7 @@ notebook==6.4.10 numpy==1.21.5 # via # altair + # feast (setup.py) # great-expectations # pandas # pandavro @@ -481,6 +484,8 @@ platformdirs==2.5.1 # via virtualenv pluggy==1.0.0 # via pytest +ply==3.11 + # via thriftpy2 portalocker==2.4.0 # via msal-extensions pre-commit==2.17.0 @@ -682,7 +687,7 @@ six==1.16.0 # google-cloud-core # google-resumable-media # grpcio - # isodate + # happybase # mock # msrestazure # pandavro @@ -727,8 +732,8 @@ terminado==0.13.2 # via notebook testcontainers==3.5.3 # via feast (setup.py) -testpath==0.6.0 - # via nbconvert +thriftpy2==0.4.14 + # via happybase toml==0.10.2 # via # black diff --git a/sdk/python/requirements/py3.7-requirements.txt b/sdk/python/requirements/py3.7-requirements.txt index f3c90a6e3bd..1ca911f1348 100644 --- a/sdk/python/requirements/py3.7-requirements.txt +++ b/sdk/python/requirements/py3.7-requirements.txt @@ -79,6 +79,7 @@ mmh3==3.0.0 # via feast (setup.py) numpy==1.21.5 # via + # feast (setup.py) # pandas # pandavro # pyarrow diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 34032ef7d26..f1d46e6ec46 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -259,6 +259,8 @@ grpcio-tools==1.44.0 # via feast (setup.py) h11==0.13.0 # via uvicorn +happybase==1.2.0 + # via feast (setup.py) hiredis==2.0.0 # via feast (setup.py) httplib2==0.20.4 @@ -362,9 +364,10 @@ nbformat==5.3.0 # via great-expectations nodeenv==1.6.0 # via pre-commit -numpy==1.22.3 +numpy==1.21.6 # via # altair + # feast (setup.py) # great-expectations # pandas # pandavro @@ -413,6 +416,8 @@ platformdirs==2.5.1 # via virtualenv pluggy==1.0.0 # via pytest +ply==3.11 + # via thriftpy2 portalocker==2.4.0 # via msal-extensions pre-commit==2.18.1 @@ -602,6 +607,7 @@ six==1.16.0 # google-cloud-core # google-resumable-media # grpcio + # happybase # mock # msrestazure # pandavro @@ -646,6 +652,8 @@ termcolor==1.1.0 # via great-expectations testcontainers==3.5.3 # via feast (setup.py) +thriftpy2==0.4.14 + # via happybase toml==0.10.2 # via # black diff --git a/sdk/python/requirements/py3.8-requirements.txt b/sdk/python/requirements/py3.8-requirements.txt index 9000c7b1f7b..f0fc61e1b2a 100644 --- a/sdk/python/requirements/py3.8-requirements.txt +++ b/sdk/python/requirements/py3.8-requirements.txt @@ -75,8 +75,9 @@ markupsafe==2.1.1 # via jinja2 mmh3==3.0.0 # via feast (setup.py) -numpy==1.22.3 +numpy==1.21.6 # via + # feast (setup.py) # pandas # pandavro # pyarrow diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 1ab910a16d8..c79daa3ffa5 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -255,6 +255,8 @@ grpcio-tools==1.44.0 # via feast (setup.py) h11==0.13.0 # via uvicorn +happybase==1.2.0 + # via feast (setup.py) hiredis==2.0.0 # via feast (setup.py) httplib2==0.20.4 @@ -356,9 +358,10 @@ nbformat==5.3.0 # via great-expectations nodeenv==1.6.0 # via pre-commit -numpy==1.22.3 +numpy==1.21.6 # via # altair + # feast (setup.py) # great-expectations # pandas # pandavro @@ -407,6 +410,8 @@ platformdirs==2.5.1 # via virtualenv pluggy==1.0.0 # via pytest +ply==3.11 + # via thriftpy2 portalocker==2.4.0 # via msal-extensions pre-commit==2.18.1 @@ -596,6 +601,7 @@ six==1.16.0 # google-cloud-core # google-resumable-media # grpcio + # happybase # mock # msrestazure # pandavro @@ -640,6 +646,8 @@ termcolor==1.1.0 # via great-expectations testcontainers==3.5.3 # via feast (setup.py) +thriftpy2==0.4.14 + # via happybase toml==0.10.2 # via # black diff --git a/sdk/python/requirements/py3.9-requirements.txt b/sdk/python/requirements/py3.9-requirements.txt index 6413886c5b4..10983319895 100644 --- a/sdk/python/requirements/py3.9-requirements.txt +++ b/sdk/python/requirements/py3.9-requirements.txt @@ -73,8 +73,9 @@ markupsafe==2.1.1 # via jinja2 mmh3==3.0.0 # via feast (setup.py) -numpy==1.22.3 +numpy==1.21.6 # via + # feast (setup.py) # pandas # pandavro # pyarrow diff --git a/sdk/python/setup.py b/sdk/python/setup.py index ed1a1a7f9f4..6fbba144351 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -57,6 +57,7 @@ "Jinja2>=2.0.0", "jsonschema", "mmh3", + "numpy<1.22", # 1.22 drops support for python 3.7. "pandas>=1.0.0", "pandavro==1.5.*", "protobuf>=3.10,<3.20", @@ -109,6 +110,10 @@ "great_expectations>=0.14.0,<0.15.0" ] +GO_REQUIRED = [ + "cffi==1.15.*", +] + CI_REQUIRED = ( [ "cryptography==3.4.8", @@ -434,6 +439,7 @@ def copy_extensions_to_source(self): "spark": SPARK_REQUIRED, "trino": TRINO_REQUIRED, "ge": GE_REQUIRED, + "go": GO_REQUIRED, }, include_package_data=True, license="Apache", diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index e2a700d0677..ba36f8e89be 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -34,7 +34,7 @@ def create_data_source( event_timestamp_column: (Deprecated) Pass through for the underlying data source. created_timestamp_column: Pass through for the underlying data source. field_mapping: Pass through for the underlying data source. - timestamp_field: (Deprecated) Pass through for the underlying data source. + timestamp_field: Pass through for the underlying data source. Returns: diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py index 07ae210b12c..ddcfafd31a7 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py @@ -92,7 +92,7 @@ def create_data_source( return TrinoSource( name="ci_trino_offline_store", table=destination_name, - event_timestamp_column=timestamp_field, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, query=f"SELECT * FROM {destination_name}", field_mapping=field_mapping or {"ts_1": "ts"}, diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 774c3f9a424..f4440dbfbcf 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -114,9 +114,7 @@ def test_write_to_online_store_event_check(local_redis_environment): "ts_1": [hour_ago, now, now], } dataframe_source = pd.DataFrame(data) - with prep_file_source( - df=dataframe_source, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: e = Entity(name="id", value_type=ValueType.STRING) # Create Feature View diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index ca61734c784..db4c6700cec 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -216,9 +216,7 @@ def test_apply_feature_view_success(test_feature_store): ) @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) def test_feature_view_inference_success(test_feature_store, dataframe_source): - with prep_file_source( - df=dataframe_source, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: entity = Entity( name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 ) @@ -434,9 +432,7 @@ def test_apply_remote_repo(): ) @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) def test_reapply_feature_view_success(test_feature_store, dataframe_source): - with prep_file_source( - df=dataframe_source, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: e = Entity(name="id", join_keys=["id_join_key"], value_type=ValueType.STRING) diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index d41a4fdbc18..6305468c097 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -41,9 +41,9 @@ def test_update_entities_with_inferred_types_from_feature_views( simple_dataset_1, simple_dataset_2 ): with prep_file_source( - df=simple_dataset_1, event_timestamp_column="ts_1" + df=simple_dataset_1, timestamp_field="ts_1" ) as file_source, prep_file_source( - df=simple_dataset_2, event_timestamp_column="ts_1" + df=simple_dataset_2, timestamp_field="ts_1" ) as file_source_2: fv1 = FeatureView( diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index 0322ab47abf..483dae73e26 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -7,9 +7,7 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): - with prep_file_source( - df=simple_dataset_1, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: to_delete = FeatureView( name="to_delete", entities=["id"], batch_source=file_source, ttl=None, ) @@ -53,9 +51,7 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): def test_diff_registry_objects_feature_views(simple_dataset_1): - with prep_file_source( - df=simple_dataset_1, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: pre_changed = FeatureView( name="fv2", entities=["id"], diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 6bd4baf4fa9..7f288d36db9 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -3,6 +3,7 @@ from feast import ValueType from feast.data_format import ProtoFormat from feast.data_source import ( + DataSource, KafkaSource, KinesisSource, PushSource, @@ -11,6 +12,9 @@ ) from feast.field import Field from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.redshift_source import RedshiftSource +from feast.infra.offline_stores.snowflake_source import SnowflakeSource from feast.types import Bool, Float32, Int64 @@ -140,8 +144,112 @@ def test_default_data_source_kw_arg_warning(): # No name warning for DataSource with pytest.warns(UserWarning): source = KafkaSource( - event_timestamp_column="column", + timestamp_field="column", bootstrap_servers="bootstrap_servers", message_format=ProtoFormat("class_path"), topic="topic", ) + + +def test_proto_conversion(): + bigquery_source = BigQuerySource( + name="test_source", + table="test_table", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + file_source = FileSource( + name="test_source", + path="test_path", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + redshift_source = RedshiftSource( + name="test_source", + database="test_database", + schema="test_schema", + table="test_table", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + snowflake_source = SnowflakeSource( + name="test_source", + database="test_database", + warehouse="test_warehouse", + schema="test_schema", + table="test_table", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + kafka_source = KafkaSource( + name="test_source", + bootstrap_servers="test_servers", + message_format=ProtoFormat("class_path"), + topic="test_topic", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + batch_source=file_source, + ) + + kinesis_source = KinesisSource( + name="test_source", + region="test_region", + record_format=ProtoFormat("class_path"), + stream_name="test_stream", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + batch_source=file_source, + ) + + push_source = PushSource( + name="test_source", + batch_source=file_source, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + request_source = RequestSource( + name="test_source", + schema=[Field(name="test1", dtype=Float32), Field(name="test1", dtype=Int64)], + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + assert DataSource.from_proto(bigquery_source.to_proto()) == bigquery_source + assert DataSource.from_proto(file_source.to_proto()) == file_source + assert DataSource.from_proto(redshift_source.to_proto()) == redshift_source + assert DataSource.from_proto(snowflake_source.to_proto()) == snowflake_source + assert DataSource.from_proto(kafka_source.to_proto()) == kafka_source + assert DataSource.from_proto(kinesis_source.to_proto()) == kinesis_source + assert DataSource.from_proto(push_source.to_proto()) == push_source + assert DataSource.from_proto(request_source.to_proto()) == request_source diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index 5bb5a622d61..d5f45964ca7 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -11,19 +11,17 @@ @contextlib.contextmanager -def prep_file_source(df, event_timestamp_column=None) -> Iterator[FileSource]: +def prep_file_source(df, timestamp_field=None) -> Iterator[FileSource]: with tempfile.NamedTemporaryFile(suffix=".parquet") as f: f.close() df.to_parquet(f.name) file_source = FileSource( - file_format=ParquetFormat(), - path=f.name, - timestamp_field=event_timestamp_column, + file_format=ParquetFormat(), path=f.name, timestamp_field=timestamp_field, ) yield file_source -def simple_bq_source_using_table_arg(df, event_timestamp_column=None) -> BigQuerySource: +def simple_bq_source_using_table_arg(df, timestamp_field=None) -> BigQuerySource: client = bigquery.Client() gcp_project = client.project bigquery_dataset = f"ds_{time.time_ns()}" @@ -40,13 +38,13 @@ def simple_bq_source_using_table_arg(df, event_timestamp_column=None) -> BigQuer job = client.load_table_from_dataframe(df, table) job.result() - return BigQuerySource(table=table, timestamp_field=event_timestamp_column,) + return BigQuerySource(table=table, timestamp_field=timestamp_field,) -def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource: - bq_source_using_table = simple_bq_source_using_table_arg(df, event_timestamp_column) +def simple_bq_source_using_query_arg(df, timestamp_field=None) -> BigQuerySource: + bq_source_using_table = simple_bq_source_using_table_arg(df, timestamp_field) return BigQuerySource( name=bq_source_using_table.table, query=f"SELECT * FROM {bq_source_using_table.table}", - timestamp_field=event_timestamp_column, + timestamp_field=timestamp_field, ) diff --git a/ui/package.json b/ui/package.json index d74c828f1a4..e46261cdb73 100644 --- a/ui/package.json +++ b/ui/package.json @@ -1,6 +1,6 @@ { "name": "@feast-dev/feast-ui", - "version": "0.20.1", + "version": "0.20.2", "private": false, "files": [ "dist"