Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import graphql.ExecutionResultImpl;
import graphql.PublicSpi;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -23,7 +22,7 @@ public AbstractAsyncExecutionStrategy(DataFetcherExceptionHandler dataFetcherExc
}

protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) {
return (List<Object> results, Throwable exception) -> {
return (List<Object> results, Throwable exception) -> executionContext.run(() -> {
if (exception != null) {
handleNonNullException(executionContext, overallResult, exception);
return;
Expand All @@ -35,6 +34,6 @@ protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext exe
resolvedValuesByField.put(fieldName, result);
}
overallResult.complete(new ExecutionResultImpl(resolvedValuesByField, executionContext.getErrors()));
};
});
}
}
84 changes: 44 additions & 40 deletions src/main/java/graphql/execution/AsyncExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,54 +38,58 @@ public AsyncExecutionStrategy(DataFetcherExceptionHandler exceptionHandler) {
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters);
Instrumentation instrumentation = executionContext.getInstrumentation();
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
return executionContext.call(() -> {
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters);
Instrumentation instrumentation = executionContext.getInstrumentation();
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);

ExecutionStrategyInstrumentationContext executionStrategyCtx = ExecutionStrategyInstrumentationContext.nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters, executionContext.getInstrumentationState()));
ExecutionStrategyInstrumentationContext executionStrategyCtx = ExecutionStrategyInstrumentationContext.nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters, executionContext.getInstrumentationState()));

MergedSelectionSet fields = parameters.getFields();
List<String> fieldNames = fields.getKeys();
MergedSelectionSet fields = parameters.getFields();
List<String> fieldNames = fields.getKeys();

Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
if (isNotSensible.isPresent()) {
return CompletableFuture.completedFuture(isNotSensible.get());
}
Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
if (isNotSensible.isPresent()) {
return CompletableFuture.completedFuture(isNotSensible.get());
}

DeferredExecutionSupport deferredExecutionSupport = createDeferredExecutionSupport(executionContext, parameters);
Async.CombinedBuilder<FieldValueInfo> futures = getAsyncFieldValueInfo(executionContext, parameters, deferredExecutionSupport);
DeferredExecutionSupport deferredExecutionSupport = createDeferredExecutionSupport(executionContext, parameters);
Async.CombinedBuilder<FieldValueInfo> futures = getAsyncFieldValueInfo(executionContext, parameters, deferredExecutionSupport);

CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
executionStrategyCtx.onDispatched();
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
executionStrategyCtx.onDispatched();

futures.await().whenComplete((completeValueInfos, throwable) -> {
List<String> fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames);
futures.await().whenComplete((completeValueInfos, throwable) -> {
executionContext.run(() -> {
List<String> fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames);

BiConsumer<List<Object>, Throwable> handleResultsConsumer = handleResults(executionContext, fieldsExecutedOnInitialResult, overallResult);
if (throwable != null) {
handleResultsConsumer.accept(null, throwable.getCause());
return;
}
BiConsumer<List<Object>, Throwable> handleResultsConsumer = handleResults(executionContext, fieldsExecutedOnInitialResult, overallResult);
if (throwable != null) {
handleResultsConsumer.accept(null, throwable.getCause());
return;
}

Async.CombinedBuilder<Object> fieldValuesFutures = Async.ofExpectedSize(completeValueInfos.size());
for (FieldValueInfo completeValueInfo : completeValueInfos) {
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
}
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
}).exceptionally((ex) -> {
// if there are any issues with combining/handling the field results,
// complete the future at all costs and bubble up any thrown exception so
// the execution does not hang.
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
executionStrategyCtx.onFieldValuesException();
overallResult.completeExceptionally(ex);
return null;
});
Async.CombinedBuilder<Object> fieldValuesFutures = Async.ofExpectedSize(completeValueInfos.size());
for (FieldValueInfo completeValueInfo : completeValueInfos) {
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
}
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
});
}).exceptionally((ex) -> executionContext.call(() -> {
// if there are any issues with combining/handling the field results,
// complete the future at all costs and bubble up any thrown exception so
// the execution does not hang.
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
executionStrategyCtx.onFieldValuesException();
overallResult.completeExceptionally(ex);
return null;
}));

overallResult.whenComplete(executionStrategyCtx::onCompleted);
return overallResult;
overallResult.whenComplete(executionStrategyCtx::onCompleted);
return overallResult;
});
}
}
62 changes: 33 additions & 29 deletions src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,41 @@ public AsyncSerialExecutionStrategy(DataFetcherExceptionHandler exceptionHandler
@Override
@SuppressWarnings({"TypeParameterUnusedInFormals", "FutureReturnValueIgnored"})
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
return executionContext.call(() -> {
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();

Instrumentation instrumentation = executionContext.getInstrumentation();
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
InstrumentationContext<ExecutionResult> executionStrategyCtx = nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters,
executionContext.getInstrumentationState())
);
MergedSelectionSet fields = parameters.getFields();
ImmutableList<String> fieldNames = ImmutableList.copyOf(fields.keySet());
Instrumentation instrumentation = executionContext.getInstrumentation();
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
InstrumentationContext<ExecutionResult> executionStrategyCtx = nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters,
executionContext.getInstrumentationState())
);
MergedSelectionSet fields = parameters.getFields();
ImmutableList<String> fieldNames = ImmutableList.copyOf(fields.keySet());

// this is highly unlikely since Mutations cant do introspection BUT in theory someone could make the query strategy this code
// so belts and braces
Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
if (isNotSensible.isPresent()) {
return CompletableFuture.completedFuture(isNotSensible.get());
}
// this is highly unlikely since Mutations cant do introspection BUT in theory someone could make the query strategy this code
// so belts and braces
Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
if (isNotSensible.isPresent()) {
return CompletableFuture.completedFuture(isNotSensible.get());
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executionContext.finished() before the Async.eachSequentially ?

CompletableFuture<List<Object>> resultsFuture = Async.eachSequentially(fieldNames, (fieldName, prevResults) -> {
MergedField currentField = fields.getSubField(fieldName);
ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
ExecutionStrategyParameters newParameters = parameters
.transform(builder -> builder.field(currentField).path(fieldPath));
CompletableFuture<List<Object>> resultsFuture = Async.eachSequentially(fieldNames, (fieldName, prevResults) -> executionContext.call(() -> {
MergedField currentField = fields.getSubField(fieldName);
ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
ExecutionStrategyParameters newParameters = parameters
.transform(builder -> builder.field(currentField).path(fieldPath));

return resolveSerialField(executionContext, dataLoaderDispatcherStrategy, newParameters);
});
Object resolveSerialField = resolveSerialField(executionContext, dataLoaderDispatcherStrategy, newParameters);
return resolveSerialField;
}));

CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
executionStrategyCtx.onDispatched();
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
executionStrategyCtx.onDispatched();

resultsFuture.whenComplete(handleResults(executionContext, fieldNames, overallResult));
overallResult.whenComplete(executionStrategyCtx::onCompleted);
return overallResult;
resultsFuture.whenComplete(handleResults(executionContext, fieldNames, overallResult));
overallResult.whenComplete(executionStrategyCtx::onCompleted);
return overallResult;
});
}

private Object resolveSerialField(ExecutionContext executionContext,
Expand All @@ -74,10 +77,11 @@ private Object resolveSerialField(ExecutionContext executionContext,
Object fieldWithInfo = resolveFieldWithInfo(executionContext, newParameters);
if (fieldWithInfo instanceof CompletableFuture) {
//noinspection unchecked
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> {
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> executionContext.call(() -> {
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
return fvi.getFieldValueFuture();
});
CompletableFuture<Object> fieldValueFuture = fvi.getFieldValueFuture();
return fieldValueFuture;
}));
} else {
FieldValueInfo fvi = (FieldValueInfo) fieldWithInfo;
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/graphql/execution/EngineRunningObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package graphql.execution;

import graphql.ExperimentalApi;
import graphql.GraphQLContext;
import org.jspecify.annotations.NullMarked;

@ExperimentalApi
@NullMarked
public interface EngineRunningObserver {


String ENGINE_RUNNING_OBSERVER_KEY = "__ENGINE_RUNNING_OBSERVER";

void runningStateChanged(ExecutionId executionId, GraphQLContext graphQLContext, boolean runningState);
}
6 changes: 5 additions & 1 deletion src/main/java/graphql/execution/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import graphql.schema.GraphQLObjectType;
import graphql.schema.GraphQLSchema;
import graphql.schema.impl.SchemaUtil;
import org.jspecify.annotations.NonNull;
import graphql.util.FpKit;
import org.jspecify.annotations.NonNull;
import org.reactivestreams.Publisher;

import java.util.Collections;
Expand Down Expand Up @@ -89,6 +89,9 @@ public CompletableFuture<ExecutionResult> execute(Document document, GraphQLSche

boolean propagateErrorsOnNonNullContractFailure = propagateErrorsOnNonNullContractFailure(getOperationResult.operationDefinition.getDirectives());

// can be null
EngineRunningObserver engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY);

ExecutionContext executionContext = newExecutionContextBuilder()
.instrumentation(instrumentation)
.instrumentationState(instrumentationState)
Expand All @@ -111,6 +114,7 @@ public CompletableFuture<ExecutionResult> execute(Document document, GraphQLSche
.valueUnboxer(valueUnboxer)
.executionInput(executionInput)
.propagapropagateErrorsOnNonNullContractFailureeErrors(propagateErrorsOnNonNullContractFailure)
.engineRunningObserver(engineRunningObserver)
.build();

executionContext.getGraphQLContext().put(ResultNodesInfo.RESULT_NODES_INFO, executionContext.getResultNodesInfo());
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/graphql/execution/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import graphql.util.FpKit;
import graphql.util.LockKit;
import org.dataloader.DataLoaderRegistry;
import org.jspecify.annotations.Nullable;

import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -63,10 +66,13 @@ public class ExecutionContext {
private final Supplier<ExecutableNormalizedOperation> queryTree;
private final boolean propagateErrorsOnNonNullContractFailure;

private final AtomicInteger isRunning = new AtomicInteger(0);

// this is modified after creation so it needs to be volatile to ensure visibility across Threads
private volatile DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = DataLoaderDispatchStrategy.NO_OP;

private final ResultNodesInfo resultNodesInfo = new ResultNodesInfo();
private final EngineRunningObserver engineRunningObserver;

ExecutionContext(ExecutionContextBuilder builder) {
this.graphQLSchema = builder.graphQLSchema;
Expand All @@ -93,6 +99,7 @@ public class ExecutionContext {
this.dataLoaderDispatcherStrategy = builder.dataLoaderDispatcherStrategy;
this.queryTree = FpKit.interThreadMemoize(() -> ExecutableNormalizedOperationFactory.createExecutableNormalizedOperation(graphQLSchema, operationDefinition, fragmentsByName, coercedVariables));
this.propagateErrorsOnNonNullContractFailure = builder.propagateErrorsOnNonNullContractFailure;
this.engineRunningObserver = builder.engineRunningObserver;
}


Expand Down Expand Up @@ -141,7 +148,9 @@ public Supplier<NormalizedVariables> getNormalizedVariables() {

/**
* @param <T> for two
*
* @return the legacy context
*
* @deprecated use {@link #getGraphQLContext()} instead
*/
@Deprecated(since = "2021-07-05")
Expand Down Expand Up @@ -184,6 +193,7 @@ public ValueUnboxer getValueUnboxer() {
* @return true if the current operation should propagate errors in non-null positions
* Propagating errors is the default. Error aware clients may opt in returning null in non-null positions
* by using the `@experimental_disableErrorPropagation` directive.
*
* @see graphql.Directives#setExperimentalDisableErrorPropagationEnabled(boolean) to change the JVM wide default
*/
@ExperimentalApi
Expand Down Expand Up @@ -338,6 +348,7 @@ public DataLoaderDispatchStrategy getDataLoaderDispatcherStrategy() {
* the current values and allows you to transform it how you want.
*
* @param builderConsumer the consumer code that will be given a builder to transform
*
* @return a new ExecutionContext object based on calling build on that builder
*/
public ExecutionContext transform(Consumer<ExecutionContextBuilder> builderConsumer) {
Expand All @@ -349,4 +360,59 @@ public ExecutionContext transform(Consumer<ExecutionContextBuilder> builderConsu
public ResultNodesInfo getResultNodesInfo() {
return resultNodesInfo;
}

@Nullable
EngineRunningObserver getEngineRunningObserver() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Internal

return engineRunningObserver;
}

@Internal
public boolean isRunning() {
return isRunning.get() > 0;
}

public void incrementRunning() {
if (isRunning.incrementAndGet() == 1 && engineRunningObserver != null) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, true);
}
}

public void decrementRunning() {
if (isRunning.decrementAndGet() == 0 && engineRunningObserver != null) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, false);
}
}

public void incrementRunning(CompletableFuture<?> cf) {
cf.whenComplete((result, throwable) -> {
incrementRunning();
});
}

public void decrementRunning(CompletableFuture<?> cf) {
cf.whenComplete((result, throwable) -> {
decrementRunning();
});

}

public <T> T call(Supplier<T> callable) {
incrementRunning();
try {
return callable.get();
} finally {
decrementRunning();
}
}

public void run(Runnable runnable) {
incrementRunning();
try {
runnable.run();
} finally {
decrementRunning();
}
}


}
Loading
Loading