Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 58 additions & 25 deletions src/main/java/graphql/EngineRunningState.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package graphql;

import graphql.execution.AbortExecutionException;
import graphql.execution.EngineRunningObserver;
import graphql.execution.ExecutionId;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

import java.util.concurrent.CompletableFuture;
Expand All @@ -13,44 +15,32 @@
import java.util.function.Supplier;

import static graphql.Assert.assertTrue;
import static graphql.execution.EngineRunningObserver.RunningState.CANCELLED;
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING;
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING_FINISH;
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING;
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING_START;

@Internal
@NullMarked
public class EngineRunningState {

@Nullable
private final EngineRunningObserver engineRunningObserver;
@Nullable
private volatile ExecutionInput executionInput;
private final GraphQLContext graphQLContext;
@Nullable
private volatile ExecutionId executionId;

// if true the last decrementRunning() call will be ignored
private volatile boolean finished;

private final AtomicInteger isRunning = new AtomicInteger(0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer nullable.


@VisibleForTesting
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

killed this - tests now create a valid one

public EngineRunningState() {
this.engineRunningObserver = null;
this.graphQLContext = null;
this.executionId = null;
}

public EngineRunningState(ExecutionInput executionInput) {
EngineRunningObserver engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY);
if (engineRunningObserver != null) {
this.engineRunningObserver = engineRunningObserver;
this.graphQLContext = executionInput.getGraphQLContext();
this.executionId = executionInput.getExecutionId();
} else {
this.engineRunningObserver = null;
this.graphQLContext = null;
this.executionId = null;
}
this.executionInput = executionInput;
this.graphQLContext = executionInput.getGraphQLContext();
this.executionId = executionInput.getExecutionId();
this.engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY);
}

public <U, T> CompletableFuture<U> handle(CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends U> fn) {
Expand All @@ -64,6 +54,7 @@ public <U, T> CompletableFuture<U> handle(CompletableFuture<T> src, BiFunction<?
if (throwable != null) {
throwable = throwable.getCause();
}
//noinspection DataFlowIssue
return fn.apply(t, throwable);
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea was giving it a yellow line

observerCompletableFutureEnd(src);
Expand Down Expand Up @@ -171,15 +162,15 @@ private void incrementRunning() {
}


public void updateExecutionId(ExecutionId executionId) {
if (engineRunningObserver == null) {
return;
}
this.executionId = executionId;
public void updateExecutionInput(ExecutionInput executionInput) {
this.executionInput = executionInput;
this.executionId = executionInput.getExecutionId();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now can change the execution input

}

private void changeOfState(EngineRunningObserver.RunningState runningState) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState);
if (engineRunningObserver != null) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState);
}
}

private void run(Runnable runnable) {
Expand Down Expand Up @@ -215,4 +206,46 @@ public CompletableFuture<ExecutionResult> engineRun(Supplier<CompletableFuture<E
}


/**
* This will abort the execution via throwing {@link AbortExecutionException} if the {@link ExecutionInput} has been cancelled
*/
public void throwIfCancelled() throws AbortExecutionException {
AbortExecutionException abortExecutionException = ifCancelledMakeException();
if (abortExecutionException != null) {
throw abortExecutionException;
}
}

/**
* if the passed in {@link Throwable}is non-null then it is returned as id and if there is no exception then
* the cancellation state is checked in {@link ExecutionInput#isCancelled()} and a {@link AbortExecutionException}
* is made as the returned {@link Throwable}
*
* @param currentThrowable the current exception state
*
* @return a current throwable or a cancellation exception or null if none are in error
*/
@Internal
@Nullable
public Throwable possibleCancellation(@Nullable Throwable currentThrowable) {
// no need to check we are cancelled if we already have an exception in play
// since it can lead to an exception being thrown when an exception has already been
// thrown
if (currentThrowable == null) {
return ifCancelledMakeException();
}
return currentThrowable;
}

/**
* @return a AbortExecutionException if the current operation has been cancelled via {@link ExecutionInput#cancel()}
*/
public @Nullable AbortExecutionException ifCancelledMakeException() {
if (executionInput.isCancelled()) {
changeOfState(CANCELLED);
return new AbortExecutionException("Execution has been asked to be cancelled");
}
return null;
}

}
23 changes: 22 additions & 1 deletion src/main/java/graphql/ExecutionInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class ExecutionInput {
private final DataLoaderRegistry dataLoaderRegistry;
private final ExecutionId executionId;
private final Locale locale;
// this is currently not used but we want it back soon after the v23 release
private final AtomicBoolean cancelled;


Expand Down Expand Up @@ -142,6 +141,28 @@ public Map<String, Object> getExtensions() {
return extensions;
}


/**
* The graphql engine will check this frequently and if that is true, it will
* throw a {@link graphql.execution.AbortExecutionException} to cancel the execution.
* <p>
* This is a cooperative cancellation. Some asynchronous data fetching code may still continue to
* run but there will be no more efforts run future field fetches say.
*
* @return true if the execution should be cancelled
*/
public boolean isCancelled() {
return cancelled.get();
}

/**
* This can be called to cancel the graphql execution. Remember this is a cooperative cancellation
* and the graphql engine needs to be running on a thread to allow is to respect this flag.
*/
public void cancel() {
cancelled.set(true);
}

/**
* This helps you transform the current ExecutionInput object into another one by starting a builder with all
* the current values and allows you to transform it how you want.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/graphql/GraphQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public CompletableFuture<ExecutionResult> executeAsync(ExecutionInput executionI
EngineRunningState engineRunningState = new EngineRunningState(executionInput);
return engineRunningState.engineRun(() -> {
ExecutionInput executionInputWithId = ensureInputHasId(executionInput);
engineRunningState.updateExecutionId(executionInputWithId.getExecutionId());
engineRunningState.updateExecutionInput(executionInputWithId);

CompletableFuture<InstrumentationState> instrumentationStateCF = instrumentation.createStateAsync(new InstrumentationCreateStateParameters(this.graphQLSchema, executionInputWithId));
instrumentationStateCF = Async.orNullCompletedFuture(instrumentationStateCF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public AbstractAsyncExecutionStrategy(DataFetcherExceptionHandler dataFetcherExc

protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) {
return (List<Object> results, Throwable exception) -> {
exception = executionContext.possibleCancellation(exception);

if (exception != null) {
handleNonNullException(executionContext, overallResult, exception);
return;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/graphql/execution/AsyncExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
List<String> fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames);

BiConsumer<List<Object>, Throwable> handleResultsConsumer = handleResults(executionContext, fieldsExecutedOnInitialResult, overallResult);
throwable = executionContext.possibleCancellation(throwable);

if (throwable != null) {
handleResultsConsumer.accept(null, throwable.getCause());
return;
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/graphql/execution/EngineRunningObserver.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package graphql.execution;

import graphql.ExecutionInput;
import graphql.ExperimentalApi;
import graphql.GraphQLContext;
import org.jspecify.annotations.NullMarked;
Expand All @@ -8,6 +9,8 @@
* This class lets you observe the running state of the graphql-java engine. As it processes and dispatches graphql fields,
* the engine moves in and out of a running and not running state. As it does this, the callback is called with information telling you the current
* state.
* <p>
* If the engine is cancelled via {@link ExecutionInput#cancel()} then the observer will also be called to indicate that.
*/
@ExperimentalApi
@NullMarked
Expand All @@ -29,7 +32,11 @@ enum RunningState {
/**
* Represents that the engine is finished
*/
NOT_RUNNING_FINISH
NOT_RUNNING_FINISH,
/**
* Represents that the engine code has been cancelled via {@link ExecutionInput#cancel()}
*/
CANCELLED
}


Expand Down
6 changes: 6 additions & 0 deletions src/main/java/graphql/execution/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ public CompletableFuture<ExecutionResult> execute(Document document, GraphQLSche
throw rte;
}

// before we get started - did they ask us to cancel?
AbortExecutionException abortExecutionException = engineRunningState.ifCancelledMakeException();
if (abortExecutionException != null) {
return completedFuture(abortExecutionException.toExecutionResult());
}

boolean propagateErrorsOnNonNullContractFailure = propagateErrorsOnNonNullContractFailure(getOperationResult.operationDefinition.getDirectives());

ResponseMapFactory responseMapFactory = GraphQL.unusualConfiguration(executionInput.getGraphQLContext())
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/graphql/execution/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import graphql.util.FpKit;
import graphql.util.LockKit;
import org.dataloader.DataLoaderRegistry;
import org.jspecify.annotations.Nullable;

import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -377,4 +378,15 @@ public boolean hasIncrementalSupport() {
public EngineRunningState getEngineRunningState() {
return engineRunningState;
}
}

@Internal
@Nullable
Throwable possibleCancellation(@Nullable Throwable currentThrowable) {
return engineRunningState.possibleCancellation(currentThrowable);
}

@Internal
void throwIfCancelled() throws AbortExecutionException {
engineRunningState.throwIfCancelled();
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just helper methods to make it nicer inside the ExecutionStrategies

20 changes: 18 additions & 2 deletions src/main/java/graphql/execution/ExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ public static String mkNameForPath(List<Field> currentField) {
@SuppressWarnings("unchecked")
@DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>")
protected Object executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
executionContext.throwIfCancelled();

DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
Instrumentation instrumentation = executionContext.getInstrumentation();
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
Expand All @@ -218,6 +220,8 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat
if (fieldValueInfosResult instanceof CompletableFuture) {
CompletableFuture<List<FieldValueInfo>> fieldValueInfos = (CompletableFuture<List<FieldValueInfo>>) fieldValueInfosResult;
fieldValueInfos.whenComplete((completeValueInfos, throwable) -> {
throwable = executionContext.possibleCancellation(throwable);

if (throwable != null) {
handleResultsConsumer.accept(null, throwable);
return;
Expand Down Expand Up @@ -269,6 +273,8 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat

private BiConsumer<List<Object>, Throwable> buildFieldValueMap(List<String> fieldNames, CompletableFuture<Map<String, Object>> overallResult, ExecutionContext executionContext) {
return (List<Object> results, Throwable exception) -> {
exception = executionContext.possibleCancellation(exception);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does a trick - if the Throwable is already set - leave it set - otherwise check if we are cancelled and make the throwable be that


if (exception != null) {
handleValueException(overallResult, exception, executionContext);
return;
Expand Down Expand Up @@ -296,6 +302,8 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi
ExecutionStrategyParameters parameters,
DeferredExecutionSupport deferredExecutionSupport
) {
executionContext.throwIfCancelled();

MergedSelectionSet fields = parameters.getFields();

executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls());
Expand All @@ -305,6 +313,8 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi
.ofExpectedSize(fields.size() - deferredExecutionSupport.deferredFieldsCount());

for (String fieldName : fields.getKeys()) {
executionContext.throwIfCancelled();

MergedField currentField = fields.getSubField(fieldName);

ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
Expand Down Expand Up @@ -392,6 +402,7 @@ protected Object fetchField(ExecutionContext executionContext, ExecutionStrategy

@DuckTyped(shape = "CompletableFuture<FetchedValue> | FetchedValue")
private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
executionContext.throwIfCancelled();

if (incrementAndCheckMaxNodesExceeded(executionContext)) {
return new FetchedValue(null, Collections.emptyList(), null);
Expand Down Expand Up @@ -465,9 +476,10 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec
CompletableFuture<CompletableFuture<Object>> handleCF = engineRunningState.handle(fetchedValue, (result, exception) -> {
// because we added an artificial CF, we need to unwrap the exception
fetchCtx.onCompleted(result, exception);
exception = engineRunningState.possibleCancellation(exception);

if (exception != null) {
CompletableFuture<Object> handleFetchingExceptionResult = handleFetchingException(dataFetchingEnvironment.get(), parameters, exception);
return handleFetchingExceptionResult;
return handleFetchingException(dataFetchingEnvironment.get(), parameters, exception);
} else {
// we can simply return the fetched value CF and avoid a allocation
return fetchedValue;
Expand Down Expand Up @@ -588,6 +600,8 @@ private <T> CompletableFuture<T> asyncHandleException(DataFetcherExceptionHandle
* if a nonnull field resolves to a null value
*/
protected FieldValueInfo completeField(ExecutionContext executionContext, ExecutionStrategyParameters parameters, FetchedValue fetchedValue) {
executionContext.throwIfCancelled();

Field field = parameters.getField().getSingleField();
GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType();
GraphQLFieldDefinition fieldDef = getFieldDef(executionContext.getGraphQLSchema(), parentType, field);
Expand Down Expand Up @@ -784,6 +798,8 @@ protected FieldValueInfo completeValueForList(ExecutionContext executionContext,
overallResult.whenComplete(completeListCtx::onCompleted);

resultsFuture.whenComplete((results, exception) -> {
exception = executionContext.possibleCancellation(exception);

if (exception != null) {
handleValueException(overallResult, exception, executionContext);
return;
Expand Down
Loading