-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Cancel support of operations #3890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
85371b2
cfa76ec
f79932a
f6546b6
fbe3773
ddb960d
74d2edd
8156187
95cf25d
add6558
9186621
869ee3c
9cb1c71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| package graphql; | ||
|
|
||
| import graphql.execution.AbortExecutionException; | ||
| import graphql.execution.EngineRunningObserver; | ||
| import graphql.execution.ExecutionId; | ||
| import org.jspecify.annotations.NullMarked; | ||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
|
|
@@ -13,44 +15,32 @@ | |
| import java.util.function.Supplier; | ||
|
|
||
| import static graphql.Assert.assertTrue; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.CANCELLED; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING_FINISH; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.RUNNING; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.RUNNING_START; | ||
|
|
||
| @Internal | ||
| @NullMarked | ||
| public class EngineRunningState { | ||
|
|
||
| @Nullable | ||
| private final EngineRunningObserver engineRunningObserver; | ||
| @Nullable | ||
| private volatile ExecutionInput executionInput; | ||
| private final GraphQLContext graphQLContext; | ||
| @Nullable | ||
| private volatile ExecutionId executionId; | ||
|
|
||
| // if true the last decrementRunning() call will be ignored | ||
| private volatile boolean finished; | ||
|
|
||
| private final AtomicInteger isRunning = new AtomicInteger(0); | ||
|
|
||
| @VisibleForTesting | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. killed this - tests now create a valid one |
||
| public EngineRunningState() { | ||
| this.engineRunningObserver = null; | ||
| this.graphQLContext = null; | ||
| this.executionId = null; | ||
| } | ||
|
|
||
| public EngineRunningState(ExecutionInput executionInput) { | ||
| EngineRunningObserver engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY); | ||
| if (engineRunningObserver != null) { | ||
| this.engineRunningObserver = engineRunningObserver; | ||
| this.graphQLContext = executionInput.getGraphQLContext(); | ||
| this.executionId = executionInput.getExecutionId(); | ||
| } else { | ||
| this.engineRunningObserver = null; | ||
| this.graphQLContext = null; | ||
| this.executionId = null; | ||
| } | ||
| this.executionInput = executionInput; | ||
| this.graphQLContext = executionInput.getGraphQLContext(); | ||
| this.executionId = executionInput.getExecutionId(); | ||
| this.engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY); | ||
| } | ||
|
|
||
| public <U, T> CompletableFuture<U> handle(CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends U> fn) { | ||
|
|
@@ -64,6 +54,7 @@ public <U, T> CompletableFuture<U> handle(CompletableFuture<T> src, BiFunction<? | |
| if (throwable != null) { | ||
| throwable = throwable.getCause(); | ||
| } | ||
| //noinspection DataFlowIssue | ||
| return fn.apply(t, throwable); | ||
| }); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. idea was giving it a yellow line |
||
| observerCompletableFutureEnd(src); | ||
|
|
@@ -171,15 +162,15 @@ private void incrementRunning() { | |
| } | ||
|
|
||
|
|
||
| public void updateExecutionId(ExecutionId executionId) { | ||
| if (engineRunningObserver == null) { | ||
| return; | ||
| } | ||
| this.executionId = executionId; | ||
| public void updateExecutionInput(ExecutionInput executionInput) { | ||
| this.executionInput = executionInput; | ||
| this.executionId = executionInput.getExecutionId(); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we now can change the execution input |
||
| } | ||
|
|
||
| private void changeOfState(EngineRunningObserver.RunningState runningState) { | ||
| engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState); | ||
| if (engineRunningObserver != null) { | ||
| engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState); | ||
| } | ||
| } | ||
|
|
||
| private void run(Runnable runnable) { | ||
|
|
@@ -215,4 +206,46 @@ public CompletableFuture<ExecutionResult> engineRun(Supplier<CompletableFuture<E | |
| } | ||
|
|
||
|
|
||
| /** | ||
| * This will abort the execution via throwing {@link AbortExecutionException} if the {@link ExecutionInput} has been cancelled | ||
| */ | ||
| public void throwIfCancelled() throws AbortExecutionException { | ||
| AbortExecutionException abortExecutionException = ifCancelledMakeException(); | ||
| if (abortExecutionException != null) { | ||
| throw abortExecutionException; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * if the passed in {@link Throwable}is non-null then it is returned as id and if there is no exception then | ||
| * the cancellation state is checked in {@link ExecutionInput#isCancelled()} and a {@link AbortExecutionException} | ||
| * is made as the returned {@link Throwable} | ||
| * | ||
| * @param currentThrowable the current exception state | ||
| * | ||
| * @return a current throwable or a cancellation exception or null if none are in error | ||
| */ | ||
| @Internal | ||
| @Nullable | ||
| public Throwable possibleCancellation(@Nullable Throwable currentThrowable) { | ||
| // no need to check we are cancelled if we already have an exception in play | ||
| // since it can lead to an exception being thrown when an exception has already been | ||
| // thrown | ||
| if (currentThrowable == null) { | ||
| return ifCancelledMakeException(); | ||
| } | ||
| return currentThrowable; | ||
| } | ||
|
|
||
| /** | ||
| * @return a AbortExecutionException if the current operation has been cancelled via {@link ExecutionInput#cancel()} | ||
| */ | ||
| public @Nullable AbortExecutionException ifCancelledMakeException() { | ||
| if (executionInput.isCancelled()) { | ||
| changeOfState(CANCELLED); | ||
| return new AbortExecutionException("Execution has been asked to be cancelled"); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import graphql.util.FpKit; | ||
| import graphql.util.LockKit; | ||
| import org.dataloader.DataLoaderRegistry; | ||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| import java.util.HashSet; | ||
| import java.util.List; | ||
|
|
@@ -377,4 +378,15 @@ public boolean hasIncrementalSupport() { | |
| public EngineRunningState getEngineRunningState() { | ||
| return engineRunningState; | ||
| } | ||
| } | ||
|
|
||
| @Internal | ||
| @Nullable | ||
| Throwable possibleCancellation(@Nullable Throwable currentThrowable) { | ||
| return engineRunningState.possibleCancellation(currentThrowable); | ||
| } | ||
|
|
||
| @Internal | ||
| void throwIfCancelled() throws AbortExecutionException { | ||
| engineRunningState.throwIfCancelled(); | ||
| } | ||
| } | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are just helper methods to make it nicer inside the ExecutionStrategies |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -194,6 +194,8 @@ public static String mkNameForPath(List<Field> currentField) { | |
| @SuppressWarnings("unchecked") | ||
| @DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>") | ||
| protected Object executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException { | ||
| executionContext.throwIfCancelled(); | ||
|
|
||
| DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy(); | ||
| Instrumentation instrumentation = executionContext.getInstrumentation(); | ||
| InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters); | ||
|
|
@@ -218,6 +220,8 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat | |
| if (fieldValueInfosResult instanceof CompletableFuture) { | ||
| CompletableFuture<List<FieldValueInfo>> fieldValueInfos = (CompletableFuture<List<FieldValueInfo>>) fieldValueInfosResult; | ||
| fieldValueInfos.whenComplete((completeValueInfos, throwable) -> { | ||
| throwable = executionContext.possibleCancellation(throwable); | ||
|
|
||
| if (throwable != null) { | ||
| handleResultsConsumer.accept(null, throwable); | ||
| return; | ||
|
|
@@ -269,6 +273,8 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat | |
|
|
||
| private BiConsumer<List<Object>, Throwable> buildFieldValueMap(List<String> fieldNames, CompletableFuture<Map<String, Object>> overallResult, ExecutionContext executionContext) { | ||
| return (List<Object> results, Throwable exception) -> { | ||
| exception = executionContext.possibleCancellation(exception); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does a trick - if the Throwable is already set - leave it set - otherwise check if we are cancelled and make the throwable be that |
||
|
|
||
| if (exception != null) { | ||
| handleValueException(overallResult, exception, executionContext); | ||
| return; | ||
|
|
@@ -296,6 +302,8 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi | |
| ExecutionStrategyParameters parameters, | ||
| DeferredExecutionSupport deferredExecutionSupport | ||
| ) { | ||
| executionContext.throwIfCancelled(); | ||
|
|
||
| MergedSelectionSet fields = parameters.getFields(); | ||
|
|
||
| executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls()); | ||
|
|
@@ -305,6 +313,8 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi | |
| .ofExpectedSize(fields.size() - deferredExecutionSupport.deferredFieldsCount()); | ||
|
|
||
| for (String fieldName : fields.getKeys()) { | ||
| executionContext.throwIfCancelled(); | ||
|
|
||
| MergedField currentField = fields.getSubField(fieldName); | ||
|
|
||
| ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField)); | ||
|
|
@@ -392,6 +402,7 @@ protected Object fetchField(ExecutionContext executionContext, ExecutionStrategy | |
|
|
||
| @DuckTyped(shape = "CompletableFuture<FetchedValue> | FetchedValue") | ||
| private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext executionContext, ExecutionStrategyParameters parameters) { | ||
| executionContext.throwIfCancelled(); | ||
|
|
||
| if (incrementAndCheckMaxNodesExceeded(executionContext)) { | ||
| return new FetchedValue(null, Collections.emptyList(), null); | ||
|
|
@@ -465,9 +476,10 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec | |
| CompletableFuture<CompletableFuture<Object>> handleCF = engineRunningState.handle(fetchedValue, (result, exception) -> { | ||
| // because we added an artificial CF, we need to unwrap the exception | ||
| fetchCtx.onCompleted(result, exception); | ||
| exception = engineRunningState.possibleCancellation(exception); | ||
|
|
||
| if (exception != null) { | ||
| CompletableFuture<Object> handleFetchingExceptionResult = handleFetchingException(dataFetchingEnvironment.get(), parameters, exception); | ||
| return handleFetchingExceptionResult; | ||
| return handleFetchingException(dataFetchingEnvironment.get(), parameters, exception); | ||
| } else { | ||
| // we can simply return the fetched value CF and avoid a allocation | ||
| return fetchedValue; | ||
|
|
@@ -588,6 +600,8 @@ private <T> CompletableFuture<T> asyncHandleException(DataFetcherExceptionHandle | |
| * if a nonnull field resolves to a null value | ||
| */ | ||
| protected FieldValueInfo completeField(ExecutionContext executionContext, ExecutionStrategyParameters parameters, FetchedValue fetchedValue) { | ||
| executionContext.throwIfCancelled(); | ||
|
|
||
| Field field = parameters.getField().getSingleField(); | ||
| GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType(); | ||
| GraphQLFieldDefinition fieldDef = getFieldDef(executionContext.getGraphQLSchema(), parentType, field); | ||
|
|
@@ -784,6 +798,8 @@ protected FieldValueInfo completeValueForList(ExecutionContext executionContext, | |
| overallResult.whenComplete(completeListCtx::onCompleted); | ||
|
|
||
| resultsFuture.whenComplete((results, exception) -> { | ||
| exception = executionContext.possibleCancellation(exception); | ||
|
|
||
| if (exception != null) { | ||
| handleValueException(overallResult, exception, executionContext); | ||
| return; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer nullable.