diff --git a/src/main/java/graphql/execution/reactive/ReactiveSupport.java b/src/main/java/graphql/execution/reactive/ReactiveSupport.java index 3e02f11592..80deb38be0 100644 --- a/src/main/java/graphql/execution/reactive/ReactiveSupport.java +++ b/src/main/java/graphql/execution/reactive/ReactiveSupport.java @@ -2,9 +2,8 @@ import graphql.DuckTyped; import graphql.Internal; +import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -26,17 +25,11 @@ public static Object fetchedObject(Object fetchedObject) { return flowPublisherToCF((Flow.Publisher) fetchedObject); } if (fetchedObject instanceof Publisher) { - return reactivePublisherToCF((Publisher) fetchedObject); + return flowPublisherToCF(FlowAdapters.toFlowPublisher((Publisher) fetchedObject)); } return fetchedObject; } - private static CompletableFuture reactivePublisherToCF(Publisher publisher) { - ReactivePublisherToCompletableFuture cf = new ReactivePublisherToCompletableFuture<>(); - publisher.subscribe(cf); - return cf; - } - private static CompletableFuture flowPublisherToCF(Flow.Publisher publisher) { FlowPublisherToCompletableFuture cf = new FlowPublisherToCompletableFuture<>(); publisher.subscribe(cf); @@ -116,39 +109,6 @@ void onCompleteImpl() { } } - private static class ReactivePublisherToCompletableFuture extends PublisherToCompletableFuture implements Subscriber { - - @Override - void doSubscriptionCancel(Subscription subscription) { - subscription.cancel(); - } - - @Override - void doSubscriptionRequest(Subscription subscription, long n) { - subscription.request(n); - } - - @Override - public void onSubscribe(Subscription s) { - onSubscribeImpl(s); - } - - @Override - public void onNext(T t) { - onNextImpl(t); - } - - @Override - public void onError(Throwable t) { - onErrorImpl(t); - } - - @Override - public void onComplete() { - onCompleteImpl(); - } - } - private static class FlowPublisherToCompletableFuture extends PublisherToCompletableFuture implements Flow.Subscriber { @Override