@@ -8,23 +8,22 @@ import (
88 "os"
99 "os/signal"
1010 "syscall"
11-
12- "google.golang.org/grpc"
13-
14- "github.com/feast-dev/feast/go/internal/feast/server"
15- "github.com/feast-dev/feast/go/internal/feast/server/logging"
16- "github.com/feast-dev/feast/go/protos/feast/serving"
11+ "time"
1712
1813 "github.com/apache/arrow/go/v8/arrow"
1914 "github.com/apache/arrow/go/v8/arrow/array"
2015 "github.com/apache/arrow/go/v8/arrow/cdata"
2116 "github.com/apache/arrow/go/v8/arrow/memory"
17+ "google.golang.org/grpc"
2218
2319 "github.com/feast-dev/feast/go/internal/feast"
2420 "github.com/feast-dev/feast/go/internal/feast/model"
2521 "github.com/feast-dev/feast/go/internal/feast/onlineserving"
2622 "github.com/feast-dev/feast/go/internal/feast/registry"
23+ "github.com/feast-dev/feast/go/internal/feast/server"
24+ "github.com/feast-dev/feast/go/internal/feast/server/logging"
2725 "github.com/feast-dev/feast/go/internal/feast/transformation"
26+ "github.com/feast-dev/feast/go/protos/feast/serving"
2827 prototypes "github.com/feast-dev/feast/go/protos/feast/types"
2928 "github.com/feast-dev/feast/go/types"
3029)
@@ -44,6 +43,15 @@ type DataTable struct {
4443 SchemaPtr uintptr
4544}
4645
46+ // LoggingOptions is a public (embedded) copy of logging.LoggingOptions struct.
47+ // See logging.LoggingOptions for properties description
48+ type LoggingOptions struct {
49+ ChannelCapacity int
50+ EmitTimeout time.Duration
51+ WriteInterval time.Duration
52+ FlushInterval time.Duration
53+ }
54+
4755func NewOnlineFeatureService (conf * OnlineFeatureServiceConfig , transformationCallback transformation.TransformationCallback ) * OnlineFeatureService {
4856 repoConfig , err := registry .NewRepoConfigFromJSON (conf .RepoPath , conf .RepoConfig )
4957 if err != nil {
@@ -214,17 +222,50 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
214222 return nil
215223}
216224
225+ // StartGprcServer starts gRPC server with disabled feature logging and blocks the thread
217226func (s * OnlineFeatureService ) StartGprcServer (host string , port int ) error {
218- // TODO(oleksii): enable logging
219- // Disable logging for now
227+ return s .StartGprcServerWithLogging (host , port , nil , LoggingOptions {})
228+ }
229+
230+ // StartGprcServerWithLoggingDefaultOpts starts gRPC server with enabled feature logging but default configuration for logging
231+ // Caller of this function must provide Python callback to flush buffered logs
232+ func (s * OnlineFeatureService ) StartGprcServerWithLoggingDefaultOpts (host string , port int , writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback ) error {
233+ defaultOpts := LoggingOptions {
234+ ChannelCapacity : logging .DefaultOptions .ChannelCapacity ,
235+ EmitTimeout : logging .DefaultOptions .EmitTimeout ,
236+ WriteInterval : logging .DefaultOptions .WriteInterval ,
237+ FlushInterval : logging .DefaultOptions .FlushInterval ,
238+ }
239+ return s .StartGprcServerWithLogging (host , port , writeLoggedFeaturesCallback , defaultOpts )
240+ }
241+
242+ // StartGprcServerWithLogging starts gRPC server with enabled feature logging
243+ // Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
244+ func (s * OnlineFeatureService ) StartGprcServerWithLogging (host string , port int , writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback , loggingOpts LoggingOptions ) error {
220245 var loggingService * logging.LoggingService = nil
246+ var err error
247+ if writeLoggedFeaturesCallback != nil {
248+ sink , err := logging .NewOfflineStoreSink (writeLoggedFeaturesCallback )
249+ if err != nil {
250+ return err
251+ }
252+
253+ loggingService , err = logging .NewLoggingService (s .fs , sink , logging.LoggingOptions {
254+ ChannelCapacity : loggingOpts .ChannelCapacity ,
255+ EmitTimeout : loggingOpts .EmitTimeout ,
256+ WriteInterval : loggingOpts .WriteInterval ,
257+ FlushInterval : loggingOpts .FlushInterval ,
258+ })
259+ if err != nil {
260+ return err
261+ }
262+ }
221263 ser := server .NewGrpcServingServiceServer (s .fs , loggingService )
222264 log .Printf ("Starting a gRPC server on host %s port %d\n " , host , port )
223265 lis , err := net .Listen ("tcp" , fmt .Sprintf ("%s:%d" , host , port ))
224266 if err != nil {
225267 return err
226268 }
227- log .Printf ("Listening a gRPC server on host %s port %d\n " , host , port )
228269
229270 grpcServer := grpc .NewServer ()
230271 serving .RegisterServingServiceServer (grpcServer , ser )
@@ -234,6 +275,10 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
234275 <- s .grpcStopCh
235276 fmt .Println ("Stopping the gRPC server..." )
236277 grpcServer .GracefulStop ()
278+ if loggingService != nil {
279+ loggingService .Stop ()
280+ }
281+ fmt .Println ("gRPC server terminated" )
237282 }()
238283
239284 err = grpcServer .Serve (lis )
0 commit comments