Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions src/main/java/graphql/EngineRunningState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package graphql;

import graphql.execution.EngineRunningObserver;
import graphql.execution.ExecutionId;
import org.jspecify.annotations.Nullable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import static graphql.Assert.assertTrue;
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING;
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING;

@Internal
public class EngineRunningState {

@Nullable
private final EngineRunningObserver engineRunningObserver;
@Nullable
private final GraphQLContext graphQLContext;
@Nullable
private volatile ExecutionId executionId;

private final AtomicInteger isRunning = new AtomicInteger(0);

@VisibleForTesting
public EngineRunningState() {
this.engineRunningObserver = null;
this.graphQLContext = null;
this.executionId = null;
}

public EngineRunningState(ExecutionInput executionInput) {
EngineRunningObserver engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY);
if (engineRunningObserver != null) {
this.engineRunningObserver = engineRunningObserver;
this.graphQLContext = executionInput.getGraphQLContext();
this.executionId = executionInput.getExecutionId();
} else {
this.engineRunningObserver = null;
this.graphQLContext = null;
this.executionId = null;
}
}

public <U, T> CompletableFuture<U> handle(CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends U> fn) {
if (engineRunningObserver == null) {
return src.handle(fn);
}
src = observeCompletableFutureStart(src);
CompletableFuture<U> result = src.handle((t, throwable) -> {
// because we added an artificial dependent CF on src (in observeCompletableFutureStart) , a throwable is a CompletionException
// that needs to be unwrapped
if (throwable != null) {
throwable = throwable.getCause();
}
return fn.apply(t, throwable);
});
observerCompletableFutureEnd(src);
return result;
}

public <T> CompletableFuture<T> whenComplete(CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn) {
if (engineRunningObserver == null) {
return src.whenComplete(fn);
}
src = observeCompletableFutureStart(src);
CompletableFuture<T> result = src.whenComplete((t, throwable) -> {
// because we added an artificial dependent CF on src (in observeCompletableFutureStart) , a throwable is a CompletionException
// that needs to be unwrapped
if (throwable != null) {
throwable = throwable.getCause();
}
fn.accept(t, throwable);
});
observerCompletableFutureEnd(src);
return result;
}

public <U, T> CompletableFuture<U> compose(CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<U>> fn) {
if (engineRunningObserver == null) {
return src.thenCompose(fn);
}
CompletableFuture<U> result = new CompletableFuture<>();
src = observeCompletableFutureStart(src);
src.whenComplete((u, t) -> {
CompletionStage<U> innerCF;
try {
innerCF = fn.apply(u).toCompletableFuture();
} catch (Throwable e) {
innerCF = CompletableFuture.failedFuture(e);
}
// this run is needed to wrap around the result.complete()/result.completeExceptionally() call
innerCF.whenComplete((u1, t1) -> run(() -> {
if (t1 != null) {
result.completeExceptionally(t1);
} else {
result.complete(u1);
}
}));
});
observerCompletableFutureEnd(src);
return result;
}


private <T> CompletableFuture<T> observeCompletableFutureStart(CompletableFuture<T> future) {
if (engineRunningObserver == null) {
return future;
}
// the completion order of dependent CFs is in stack order for
// directly dependent CFs, but in reverse stack order for indirect dependent ones
// By creating one dependent CF on originalFetchValue, we make sure the order it is always
// in reverse stack order
future = future.thenApply(Function.identity());
incrementRunningWhenCompleted(future);
return future;
}

private void observerCompletableFutureEnd(CompletableFuture<?> future) {
if (engineRunningObserver == null) {
return;
}
decrementRunningWhenCompleted(future);
}


private void incrementRunningWhenCompleted(CompletableFuture<?> cf) {
cf.whenComplete((result, throwable) -> {
incrementRunning();
});
}

private void decrementRunningWhenCompleted(CompletableFuture<?> cf) {
cf.whenComplete((result, throwable) -> {
decrementRunning();
});

}

private void decrementRunning() {
if (engineRunningObserver == null) {
return;
}
assertTrue(isRunning.get() > 0);
if (isRunning.decrementAndGet() == 0) {
changeOfState(NOT_RUNNING);
}
}


private void incrementRunning() {
if (engineRunningObserver == null) {
return;
}
assertTrue(isRunning.get() >= 0);
if (isRunning.incrementAndGet() == 1) {
changeOfState(RUNNING);
}

}


public void updateExecutionId(ExecutionId executionId) {
if (engineRunningObserver == null) {
return;
}
this.executionId = executionId;
}

private void changeOfState(EngineRunningObserver.RunningState runningState) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState);
}

private void run(Runnable runnable) {
if (engineRunningObserver == null) {
runnable.run();
return;
}
incrementRunning();
try {
runnable.run();
} finally {
decrementRunning();
}
}

/**
* Only used once outside of this class: when the execution starts
*/
public <T> T call(Supplier<T> supplier) {
if (engineRunningObserver == null) {
return supplier.get();
}
incrementRunning();
try {
return supplier.get();
} finally {
decrementRunning();
}
}


}
76 changes: 40 additions & 36 deletions src/main/java/graphql/GraphQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,41 +412,44 @@ public CompletableFuture<ExecutionResult> executeAsync(UnaryOperator<ExecutionIn
* @return a promise to an {@link ExecutionResult} which can include errors
*/
public CompletableFuture<ExecutionResult> executeAsync(ExecutionInput executionInput) {
ExecutionInput executionInputWithId = ensureInputHasId(executionInput);

CompletableFuture<InstrumentationState> instrumentationStateCF = instrumentation.createStateAsync(new InstrumentationCreateStateParameters(this.graphQLSchema, executionInputWithId));
return Async.orNullCompletedFuture(instrumentationStateCF).thenCompose(instrumentationState -> {
try {
InstrumentationExecutionParameters inputInstrumentationParameters = new InstrumentationExecutionParameters(executionInputWithId, this.graphQLSchema);
ExecutionInput instrumentedExecutionInput = instrumentation.instrumentExecutionInput(executionInputWithId, inputInstrumentationParameters, instrumentationState);

InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(instrumentedExecutionInput, this.graphQLSchema);
InstrumentationContext<ExecutionResult> executionInstrumentation = nonNullCtx(instrumentation.beginExecution(instrumentationParameters, instrumentationState));
executionInstrumentation.onDispatched();

GraphQLSchema graphQLSchema = instrumentation.instrumentSchema(this.graphQLSchema, instrumentationParameters, instrumentationState);

CompletableFuture<ExecutionResult> executionResult = parseValidateAndExecute(instrumentedExecutionInput, graphQLSchema, instrumentationState);
//
// finish up instrumentation
executionResult = executionResult.whenComplete(completeInstrumentationCtxCF(executionInstrumentation));
//
// allow instrumentation to tweak the result
executionResult = executionResult.thenCompose(result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters, instrumentationState));
return executionResult;
} catch (AbortExecutionException abortException) {
return handleAbortException(executionInput, instrumentationState, abortException);
}
EngineRunningState engineRunningState = new EngineRunningState(executionInput);
return engineRunningState.call(() -> {
ExecutionInput executionInputWithId = ensureInputHasId(executionInput);
engineRunningState.updateExecutionId(executionInputWithId.getExecutionId());

CompletableFuture<InstrumentationState> instrumentationStateCF = instrumentation.createStateAsync(new InstrumentationCreateStateParameters(this.graphQLSchema, executionInputWithId));
instrumentationStateCF = Async.orNullCompletedFuture(instrumentationStateCF);

return engineRunningState.compose(instrumentationStateCF, (instrumentationState -> {
try {
InstrumentationExecutionParameters inputInstrumentationParameters = new InstrumentationExecutionParameters(executionInputWithId, this.graphQLSchema);
ExecutionInput instrumentedExecutionInput = instrumentation.instrumentExecutionInput(executionInputWithId, inputInstrumentationParameters, instrumentationState);

InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(instrumentedExecutionInput, this.graphQLSchema);
InstrumentationContext<ExecutionResult> executionInstrumentation = nonNullCtx(instrumentation.beginExecution(instrumentationParameters, instrumentationState));
executionInstrumentation.onDispatched();

GraphQLSchema graphQLSchema = instrumentation.instrumentSchema(this.graphQLSchema, instrumentationParameters, instrumentationState);

CompletableFuture<ExecutionResult> executionResult = parseValidateAndExecute(instrumentedExecutionInput, graphQLSchema, instrumentationState, engineRunningState);
//
// finish up instrumentation
executionResult = executionResult.whenComplete(completeInstrumentationCtxCF(executionInstrumentation));
//
// allow instrumentation to tweak the result
executionResult = engineRunningState.compose(executionResult, (result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters, instrumentationState)));
return executionResult;
} catch (AbortExecutionException abortException) {
return handleAbortException(executionInput, instrumentationState, abortException);
}
}));
});
}


private CompletableFuture<ExecutionResult> handleAbortException(ExecutionInput executionInput, InstrumentationState instrumentationState, AbortExecutionException abortException) {
CompletableFuture<ExecutionResult> executionResult = CompletableFuture.completedFuture(abortException.toExecutionResult());
InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(executionInput, this.graphQLSchema);
//
// allow instrumentation to tweak the result
executionResult = executionResult.thenCompose(result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters, instrumentationState));
return executionResult;
return instrumentation.instrumentExecutionResult(abortException.toExecutionResult(), instrumentationParameters, instrumentationState);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that was weird code

}

private ExecutionInput ensureInputHasId(ExecutionInput executionInput) {
Expand All @@ -460,24 +463,24 @@ private ExecutionInput ensureInputHasId(ExecutionInput executionInput) {
}


private CompletableFuture<ExecutionResult> parseValidateAndExecute(ExecutionInput executionInput, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState) {
private CompletableFuture<ExecutionResult> parseValidateAndExecute(ExecutionInput executionInput, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState, EngineRunningState engineRunningState) {
AtomicReference<ExecutionInput> executionInputRef = new AtomicReference<>(executionInput);
Function<ExecutionInput, PreparsedDocumentEntry> computeFunction = transformedInput -> {
// if they change the original query in the pre-parser, then we want to see it downstream from then on
executionInputRef.set(transformedInput);
return parseAndValidate(executionInputRef, graphQLSchema, instrumentationState);
};
CompletableFuture<PreparsedDocumentEntry> preparsedDoc = preparsedDocumentProvider.getDocumentAsync(executionInput, computeFunction);
return preparsedDoc.thenCompose(preparsedDocumentEntry -> {
return engineRunningState.compose(preparsedDoc, (preparsedDocumentEntry -> {
if (preparsedDocumentEntry.hasErrors()) {
return CompletableFuture.completedFuture(new ExecutionResultImpl(preparsedDocumentEntry.getErrors()));
}
try {
return execute(executionInputRef.get(), preparsedDocumentEntry.getDocument(), graphQLSchema, instrumentationState);
return execute(executionInputRef.get(), preparsedDocumentEntry.getDocument(), graphQLSchema, instrumentationState, engineRunningState);
} catch (AbortExecutionException e) {
return CompletableFuture.completedFuture(e.toExecutionResult());
}
});
}));
}

private PreparsedDocumentEntry parseAndValidate(AtomicReference<ExecutionInput> executionInputRef, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState) {
Expand Down Expand Up @@ -536,13 +539,14 @@ private List<ValidationError> validate(ExecutionInput executionInput, Document d
private CompletableFuture<ExecutionResult> execute(ExecutionInput executionInput,
Document document,
GraphQLSchema graphQLSchema,
InstrumentationState instrumentationState
InstrumentationState instrumentationState,
EngineRunningState engineRunningState
) {

Execution execution = new Execution(queryStrategy, mutationStrategy, subscriptionStrategy, instrumentation, valueUnboxer, doNotAutomaticallyDispatchDataLoader);
ExecutionId executionId = executionInput.getExecutionId();

return execution.execute(document, graphQLSchema, executionId, executionInput, instrumentationState);
return execution.execute(document, graphQLSchema, executionId, executionInput, instrumentationState, engineRunningState);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public AbstractAsyncExecutionStrategy(DataFetcherExceptionHandler dataFetcherExc
}

protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) {
return (List<Object> results, Throwable exception) -> executionContext.run(exception, () -> {
return (List<Object> results, Throwable exception) -> {
if (exception != null) {
handleNonNullException(executionContext, overallResult, exception);
return;
Expand All @@ -35,6 +35,6 @@ protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext exe
resolvedValuesByField.put(fieldName, result);
}
overallResult.complete(new ExecutionResultImpl(resolvedValuesByField, executionContext.getErrors()));
});
};
}
}
Loading
Loading