From 3bc3bf532f26cf8c05db7f0baaf527bb6d130b05 Mon Sep 17 00:00:00 2001 From: Hantsy Bai Date: Tue, 8 Apr 2025 10:58:29 +0800 Subject: [PATCH] Clean up ReactiveSupport.java --- .../execution/reactive/ReactiveSupport.java | 44 +------------------ 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/src/main/java/graphql/execution/reactive/ReactiveSupport.java b/src/main/java/graphql/execution/reactive/ReactiveSupport.java index 3e02f1159..80deb38be 100644 --- a/src/main/java/graphql/execution/reactive/ReactiveSupport.java +++ b/src/main/java/graphql/execution/reactive/ReactiveSupport.java @@ -2,9 +2,8 @@ import graphql.DuckTyped; import graphql.Internal; +import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -26,17 +25,11 @@ public static Object fetchedObject(Object fetchedObject) { return flowPublisherToCF((Flow.Publisher) fetchedObject); } if (fetchedObject instanceof Publisher) { - return reactivePublisherToCF((Publisher) fetchedObject); + return flowPublisherToCF(FlowAdapters.toFlowPublisher((Publisher) fetchedObject)); } return fetchedObject; } - private static CompletableFuture reactivePublisherToCF(Publisher publisher) { - ReactivePublisherToCompletableFuture cf = new ReactivePublisherToCompletableFuture<>(); - publisher.subscribe(cf); - return cf; - } - private static CompletableFuture flowPublisherToCF(Flow.Publisher publisher) { FlowPublisherToCompletableFuture cf = new FlowPublisherToCompletableFuture<>(); publisher.subscribe(cf); @@ -116,39 +109,6 @@ void onCompleteImpl() { } } - private static class ReactivePublisherToCompletableFuture extends PublisherToCompletableFuture implements Subscriber { - - @Override - void doSubscriptionCancel(Subscription subscription) { - subscription.cancel(); - } - - @Override - void doSubscriptionRequest(Subscription subscription, long n) { - subscription.request(n); - } - - @Override - public void onSubscribe(Subscription s) { - onSubscribeImpl(s); - } - - @Override - public void onNext(T t) { - onNextImpl(t); - } - - @Override - public void onError(Throwable t) { - onErrorImpl(t); - } - - @Override - public void onComplete() { - onCompleteImpl(); - } - } - private static class FlowPublisherToCompletableFuture extends PublisherToCompletableFuture implements Flow.Subscriber { @Override