Added support for reactive Publishers to be returned from data fetchers#3731
Added support for reactive Publishers to be returned from data fetchers#3731
Conversation
| * @param <T> for two | ||
| * @param <S> for subscription | ||
| */ | ||
| private static abstract class PublisherToCompletableFuture<T, S> extends CompletableFuture<T> { |
There was a problem hiding this comment.
This class is heavily inspired from the Reactor reactor.core.publisher.MonoToCompletableFuture class. Its uses pretty much the same pattern to turn a Publisher into a CF
There was a problem hiding this comment.
Note it "IS-A" CompletableFuture class
| @SuppressWarnings("unchecked") | ||
| protected Object /* CompletableFuture<Map<String, Object>> | Map<String, Object> */ | ||
| executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException { | ||
| @DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>") |
There was a problem hiding this comment.
I think this annotation reads better in the IDEA rather than a comment. Maybe we can use it later in some way but honestly its just a more type safe way of saying the same as we try to say in the comment
There was a problem hiding this comment.
Nice idea. Makes it more official than a comment
| public class ReactiveSupport { | ||
|
|
||
| @DuckTyped(shape = "CompletableFuture | Object") | ||
| public static Object fetchedObject(Object fetchedObject) { |
…rs - review tweaks
| val = ReactiveSupport.fetchedObject(cf) | ||
| then: | ||
| val === cf | ||
| } |
There was a problem hiding this comment.
wont touch CFs or plain old objects
| !cf.isDone() | ||
|
|
||
| when: | ||
| def cfCancelled = cf.cancel(true) |
There was a problem hiding this comment.
cancelled before it produce a value
| return counter + 1; | ||
| }) | ||
| } | ||
| } |
| reactiveObject || _ | ||
| Mono.just("X") || _ | ||
| toFlow(Mono.just("X")) || _ | ||
| } |
There was a problem hiding this comment.
tests with both Reactor and Java Flows
| } | ||
|
|
||
|
|
||
| def "can get a reactive Flux and only take one value and make a CF from it"() { |
There was a problem hiding this comment.
even a Flux of N values only has 1 value taken from it because that is what the graphql contact is - a Query field is 1 value not a stream of values - at least not yet (aka @stream future work perhaps)
| cfField : "cf", | ||
| materialisedField: "materialised" | ||
| ] | ||
| } |
There was a problem hiding this comment.
full integration tests showing it working.
We know CF error handling works already so there is no need for integration tests showing all that
| fetchCtx.onDispatched(); | ||
| fetchCtx.onFetchedValue(fetchedObject); | ||
| // possible convert reactive objects into CompletableFutures | ||
| fetchedObject = ReactiveSupport.fetchedObject(fetchedObject); |
There was a problem hiding this comment.
This is the main chance. One liner. Turns a reactive publisher into a CF
…rs - never on subscriptions
…rs - never on subscriptions with tests working
| */ | ||
| public boolean isSubscriptionOperation() { | ||
| return isOpType(OperationDefinition.Operation.SUBSCRIPTION); | ||
| } |
There was a problem hiding this comment.
I hav long wanted this. Checking operationDefinition.getOperation() is a PITA
| if (!executionContext.isSubscriptionOperation()) { | ||
| // possible convert reactive objects into CompletableFutures | ||
| fetchedObject = ReactiveSupport.fetchedObject(fetchedObject); | ||
| } |
There was a problem hiding this comment.
This is the main code - if its NOT a Subscription, we turns the Pubisher into a CF
| @Target(value = {METHOD, PARAMETER}) | ||
| public @interface DuckTyped { | ||
| String shape(); | ||
| } |
There was a problem hiding this comment.
I think this is better than a comment
@DuckedTyped(shape="Object | CompletableFuture<Object>")
versus
/* Object | CompletableFuture<Object> */
| return reactivePublisherToCF((Publisher<?>) fetchedObject); | ||
| } | ||
| return fetchedObject; | ||
| } |
There was a problem hiding this comment.
Leaves other things alone = only acts on Flow.Publisher and Pubisher
This adds support for Reactive Publishers, both reactive streams Publishers and JDK Flow Publishers, to be returned from a
DataFetcheras a value.They will be turned into
CompletableFutureby using a subscriber to ask for one value from them.This is a lot like how reactor
Monohas a.toFuture()method except now they can just return the Mono itself, since a Mono is a reactive streams Publisher.The cost of this is 2 more
instanceofchecks like we already for forCompleteableFutures