-
Notifications
You must be signed in to change notification settings - Fork 1.1k
refactor engine tracking and simplify it #3893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
a590cfa
refactor engine tracking and simplify it
andimarek 2e63072
tests
andimarek 950ae47
cleanup
andimarek d8fc613
cleanup
andimarek e6ac56a
fix test
andimarek 6102a24
test
andimarek 88246e9
catch exceptions and avoid failing without completion
andimarek acf780e
tests
andimarek 4ae0897
Merge branch 'master' into tracking-refactor
andimarek 59195c1
cleanup
andimarek File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,209 @@ | ||
| package graphql; | ||
|
|
||
| import graphql.execution.EngineRunningObserver; | ||
| import graphql.execution.ExecutionId; | ||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionStage; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.function.BiFunction; | ||
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static graphql.Assert.assertTrue; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.RUNNING; | ||
|
|
||
| @Internal | ||
| public class EngineRunningState { | ||
|
|
||
| @Nullable | ||
| private final EngineRunningObserver engineRunningObserver; | ||
| @Nullable | ||
| private final GraphQLContext graphQLContext; | ||
| @Nullable | ||
| private volatile ExecutionId executionId; | ||
|
|
||
| private final AtomicInteger isRunning = new AtomicInteger(0); | ||
|
|
||
| @VisibleForTesting | ||
| public EngineRunningState() { | ||
| this.engineRunningObserver = null; | ||
| this.graphQLContext = null; | ||
| this.executionId = null; | ||
| } | ||
|
|
||
| public EngineRunningState(ExecutionInput executionInput) { | ||
| EngineRunningObserver engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY); | ||
| if (engineRunningObserver != null) { | ||
| this.engineRunningObserver = engineRunningObserver; | ||
| this.graphQLContext = executionInput.getGraphQLContext(); | ||
| this.executionId = executionInput.getExecutionId(); | ||
| } else { | ||
| this.engineRunningObserver = null; | ||
| this.graphQLContext = null; | ||
| this.executionId = null; | ||
| } | ||
| } | ||
|
|
||
| public <U, T> CompletableFuture<U> handle(CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends U> fn) { | ||
| if (engineRunningObserver == null) { | ||
| return src.handle(fn); | ||
| } | ||
| src = observeCompletableFutureStart(src); | ||
| CompletableFuture<U> result = src.handle((t, throwable) -> { | ||
| // because we added an artificial dependent CF on src (in observeCompletableFutureStart) , a throwable is a CompletionException | ||
| // that needs to be unwrapped | ||
| if (throwable != null) { | ||
| throwable = throwable.getCause(); | ||
| } | ||
| return fn.apply(t, throwable); | ||
| }); | ||
| observerCompletableFutureEnd(src); | ||
| return result; | ||
| } | ||
|
|
||
| public <T> CompletableFuture<T> whenComplete(CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn) { | ||
| if (engineRunningObserver == null) { | ||
| return src.whenComplete(fn); | ||
| } | ||
| src = observeCompletableFutureStart(src); | ||
| CompletableFuture<T> result = src.whenComplete((t, throwable) -> { | ||
| // because we added an artificial dependent CF on src (in observeCompletableFutureStart) , a throwable is a CompletionException | ||
| // that needs to be unwrapped | ||
| if (throwable != null) { | ||
| throwable = throwable.getCause(); | ||
| } | ||
| fn.accept(t, throwable); | ||
| }); | ||
| observerCompletableFutureEnd(src); | ||
| return result; | ||
| } | ||
|
|
||
| public <U, T> CompletableFuture<U> compose(CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<U>> fn) { | ||
| if (engineRunningObserver == null) { | ||
| return src.thenCompose(fn); | ||
| } | ||
| CompletableFuture<U> result = new CompletableFuture<>(); | ||
| src = observeCompletableFutureStart(src); | ||
| src.whenComplete((u, t) -> { | ||
| CompletionStage<U> innerCF; | ||
| try { | ||
| innerCF = fn.apply(u).toCompletableFuture(); | ||
| } catch (Throwable e) { | ||
| innerCF = CompletableFuture.failedFuture(e); | ||
| } | ||
| // this run is needed to wrap around the result.complete()/result.completeExceptionally() call | ||
| innerCF.whenComplete((u1, t1) -> run(() -> { | ||
| if (t1 != null) { | ||
| result.completeExceptionally(t1); | ||
| } else { | ||
| result.complete(u1); | ||
| } | ||
| })); | ||
| }); | ||
| observerCompletableFutureEnd(src); | ||
| return result; | ||
| } | ||
|
|
||
|
|
||
| private <T> CompletableFuture<T> observeCompletableFutureStart(CompletableFuture<T> future) { | ||
| if (engineRunningObserver == null) { | ||
| return future; | ||
| } | ||
| // the completion order of dependent CFs is in stack order for | ||
| // directly dependent CFs, but in reverse stack order for indirect dependent ones | ||
| // By creating one dependent CF on originalFetchValue, we make sure the order it is always | ||
| // in reverse stack order | ||
| future = future.thenApply(Function.identity()); | ||
| incrementRunningWhenCompleted(future); | ||
| return future; | ||
| } | ||
|
|
||
| private void observerCompletableFutureEnd(CompletableFuture<?> future) { | ||
| if (engineRunningObserver == null) { | ||
| return; | ||
| } | ||
| decrementRunningWhenCompleted(future); | ||
| } | ||
|
|
||
|
|
||
| private void incrementRunningWhenCompleted(CompletableFuture<?> cf) { | ||
| cf.whenComplete((result, throwable) -> { | ||
| incrementRunning(); | ||
| }); | ||
| } | ||
|
|
||
| private void decrementRunningWhenCompleted(CompletableFuture<?> cf) { | ||
| cf.whenComplete((result, throwable) -> { | ||
| decrementRunning(); | ||
| }); | ||
|
|
||
| } | ||
|
|
||
| private void decrementRunning() { | ||
| if (engineRunningObserver == null) { | ||
| return; | ||
| } | ||
| assertTrue(isRunning.get() > 0); | ||
| if (isRunning.decrementAndGet() == 0) { | ||
| changeOfState(NOT_RUNNING); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| private void incrementRunning() { | ||
| if (engineRunningObserver == null) { | ||
| return; | ||
| } | ||
| assertTrue(isRunning.get() >= 0); | ||
| if (isRunning.incrementAndGet() == 1) { | ||
| changeOfState(RUNNING); | ||
| } | ||
|
|
||
| } | ||
|
|
||
|
|
||
| public void updateExecutionId(ExecutionId executionId) { | ||
| if (engineRunningObserver == null) { | ||
| return; | ||
| } | ||
| this.executionId = executionId; | ||
| } | ||
|
|
||
| private void changeOfState(EngineRunningObserver.RunningState runningState) { | ||
| engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState); | ||
| } | ||
|
|
||
| private void run(Runnable runnable) { | ||
| if (engineRunningObserver == null) { | ||
| runnable.run(); | ||
| return; | ||
| } | ||
| incrementRunning(); | ||
| try { | ||
| runnable.run(); | ||
| } finally { | ||
| decrementRunning(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Only used once outside of this class: when the execution starts | ||
| */ | ||
| public <T> T call(Supplier<T> supplier) { | ||
| if (engineRunningObserver == null) { | ||
| return supplier.get(); | ||
| } | ||
| incrementRunning(); | ||
| try { | ||
| return supplier.get(); | ||
| } finally { | ||
| decrementRunning(); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -412,41 +412,44 @@ public CompletableFuture<ExecutionResult> executeAsync(UnaryOperator<ExecutionIn | |
| * @return a promise to an {@link ExecutionResult} which can include errors | ||
| */ | ||
| public CompletableFuture<ExecutionResult> executeAsync(ExecutionInput executionInput) { | ||
| ExecutionInput executionInputWithId = ensureInputHasId(executionInput); | ||
|
|
||
| CompletableFuture<InstrumentationState> instrumentationStateCF = instrumentation.createStateAsync(new InstrumentationCreateStateParameters(this.graphQLSchema, executionInputWithId)); | ||
| return Async.orNullCompletedFuture(instrumentationStateCF).thenCompose(instrumentationState -> { | ||
| try { | ||
| InstrumentationExecutionParameters inputInstrumentationParameters = new InstrumentationExecutionParameters(executionInputWithId, this.graphQLSchema); | ||
| ExecutionInput instrumentedExecutionInput = instrumentation.instrumentExecutionInput(executionInputWithId, inputInstrumentationParameters, instrumentationState); | ||
|
|
||
| InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(instrumentedExecutionInput, this.graphQLSchema); | ||
| InstrumentationContext<ExecutionResult> executionInstrumentation = nonNullCtx(instrumentation.beginExecution(instrumentationParameters, instrumentationState)); | ||
| executionInstrumentation.onDispatched(); | ||
|
|
||
| GraphQLSchema graphQLSchema = instrumentation.instrumentSchema(this.graphQLSchema, instrumentationParameters, instrumentationState); | ||
|
|
||
| CompletableFuture<ExecutionResult> executionResult = parseValidateAndExecute(instrumentedExecutionInput, graphQLSchema, instrumentationState); | ||
| // | ||
| // finish up instrumentation | ||
| executionResult = executionResult.whenComplete(completeInstrumentationCtxCF(executionInstrumentation)); | ||
| // | ||
| // allow instrumentation to tweak the result | ||
| executionResult = executionResult.thenCompose(result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters, instrumentationState)); | ||
| return executionResult; | ||
| } catch (AbortExecutionException abortException) { | ||
| return handleAbortException(executionInput, instrumentationState, abortException); | ||
| } | ||
| EngineRunningState engineRunningState = new EngineRunningState(executionInput); | ||
| return engineRunningState.call(() -> { | ||
| ExecutionInput executionInputWithId = ensureInputHasId(executionInput); | ||
| engineRunningState.updateExecutionId(executionInputWithId.getExecutionId()); | ||
|
|
||
| CompletableFuture<InstrumentationState> instrumentationStateCF = instrumentation.createStateAsync(new InstrumentationCreateStateParameters(this.graphQLSchema, executionInputWithId)); | ||
| instrumentationStateCF = Async.orNullCompletedFuture(instrumentationStateCF); | ||
|
|
||
| return engineRunningState.compose(instrumentationStateCF, (instrumentationState -> { | ||
| try { | ||
| InstrumentationExecutionParameters inputInstrumentationParameters = new InstrumentationExecutionParameters(executionInputWithId, this.graphQLSchema); | ||
| ExecutionInput instrumentedExecutionInput = instrumentation.instrumentExecutionInput(executionInputWithId, inputInstrumentationParameters, instrumentationState); | ||
|
|
||
| InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(instrumentedExecutionInput, this.graphQLSchema); | ||
| InstrumentationContext<ExecutionResult> executionInstrumentation = nonNullCtx(instrumentation.beginExecution(instrumentationParameters, instrumentationState)); | ||
| executionInstrumentation.onDispatched(); | ||
|
|
||
| GraphQLSchema graphQLSchema = instrumentation.instrumentSchema(this.graphQLSchema, instrumentationParameters, instrumentationState); | ||
|
|
||
| CompletableFuture<ExecutionResult> executionResult = parseValidateAndExecute(instrumentedExecutionInput, graphQLSchema, instrumentationState, engineRunningState); | ||
| // | ||
| // finish up instrumentation | ||
| executionResult = executionResult.whenComplete(completeInstrumentationCtxCF(executionInstrumentation)); | ||
| // | ||
| // allow instrumentation to tweak the result | ||
| executionResult = engineRunningState.compose(executionResult, (result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters, instrumentationState))); | ||
| return executionResult; | ||
| } catch (AbortExecutionException abortException) { | ||
| return handleAbortException(executionInput, instrumentationState, abortException); | ||
| } | ||
| })); | ||
| }); | ||
| } | ||
|
|
||
|
|
||
| private CompletableFuture<ExecutionResult> handleAbortException(ExecutionInput executionInput, InstrumentationState instrumentationState, AbortExecutionException abortException) { | ||
| CompletableFuture<ExecutionResult> executionResult = CompletableFuture.completedFuture(abortException.toExecutionResult()); | ||
| InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(executionInput, this.graphQLSchema); | ||
| // | ||
| // allow instrumentation to tweak the result | ||
| executionResult = executionResult.thenCompose(result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters, instrumentationState)); | ||
| return executionResult; | ||
| return instrumentation.instrumentExecutionResult(abortException.toExecutionResult(), instrumentationParameters, instrumentationState); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah that was weird code |
||
| } | ||
|
|
||
| private ExecutionInput ensureInputHasId(ExecutionInput executionInput) { | ||
|
|
@@ -460,24 +463,24 @@ private ExecutionInput ensureInputHasId(ExecutionInput executionInput) { | |
| } | ||
|
|
||
|
|
||
| private CompletableFuture<ExecutionResult> parseValidateAndExecute(ExecutionInput executionInput, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState) { | ||
| private CompletableFuture<ExecutionResult> parseValidateAndExecute(ExecutionInput executionInput, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState, EngineRunningState engineRunningState) { | ||
| AtomicReference<ExecutionInput> executionInputRef = new AtomicReference<>(executionInput); | ||
| Function<ExecutionInput, PreparsedDocumentEntry> computeFunction = transformedInput -> { | ||
| // if they change the original query in the pre-parser, then we want to see it downstream from then on | ||
| executionInputRef.set(transformedInput); | ||
| return parseAndValidate(executionInputRef, graphQLSchema, instrumentationState); | ||
| }; | ||
| CompletableFuture<PreparsedDocumentEntry> preparsedDoc = preparsedDocumentProvider.getDocumentAsync(executionInput, computeFunction); | ||
| return preparsedDoc.thenCompose(preparsedDocumentEntry -> { | ||
| return engineRunningState.compose(preparsedDoc, (preparsedDocumentEntry -> { | ||
| if (preparsedDocumentEntry.hasErrors()) { | ||
| return CompletableFuture.completedFuture(new ExecutionResultImpl(preparsedDocumentEntry.getErrors())); | ||
| } | ||
| try { | ||
| return execute(executionInputRef.get(), preparsedDocumentEntry.getDocument(), graphQLSchema, instrumentationState); | ||
| return execute(executionInputRef.get(), preparsedDocumentEntry.getDocument(), graphQLSchema, instrumentationState, engineRunningState); | ||
| } catch (AbortExecutionException e) { | ||
| return CompletableFuture.completedFuture(e.toExecutionResult()); | ||
| } | ||
| }); | ||
| })); | ||
| } | ||
|
|
||
| private PreparsedDocumentEntry parseAndValidate(AtomicReference<ExecutionInput> executionInputRef, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState) { | ||
|
|
@@ -536,13 +539,14 @@ private List<ValidationError> validate(ExecutionInput executionInput, Document d | |
| private CompletableFuture<ExecutionResult> execute(ExecutionInput executionInput, | ||
| Document document, | ||
| GraphQLSchema graphQLSchema, | ||
| InstrumentationState instrumentationState | ||
| InstrumentationState instrumentationState, | ||
| EngineRunningState engineRunningState | ||
| ) { | ||
|
|
||
| Execution execution = new Execution(queryStrategy, mutationStrategy, subscriptionStrategy, instrumentation, valueUnboxer, doNotAutomaticallyDispatchDataLoader); | ||
| ExecutionId executionId = executionInput.getExecutionId(); | ||
|
|
||
| return execution.execute(document, graphQLSchema, executionId, executionInput, instrumentationState); | ||
| return execution.execute(document, graphQLSchema, executionId, executionInput, instrumentationState, engineRunningState); | ||
| } | ||
|
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.