From 15f69523c6367061b659b4c25f494a872b05a4db Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 22 May 2025 12:44:24 +1000 Subject: [PATCH 01/14] make dataloader work inside defer blocks --- .../execution/AsyncExecutionStrategy.java | 4 +- .../AsyncSerialExecutionStrategy.java | 4 +- .../execution/DataLoaderDispatchStrategy.java | 12 +- .../java/graphql/execution/Execution.java | 9 +- .../graphql/execution/ExecutionStrategy.java | 6 +- .../ExecutionStrategyParameters.java | 1 + .../incremental/DeferredCallContext.java | 18 + .../incremental/DeferredExecutionSupport.java | 40 +- .../incremental/IncrementalCallState.java | 1 + .../PerLevelDataLoaderDispatchStrategy.java | 129 ++-- ...spatchStrategyWithDeferAlwaysDispatch.java | 558 +++++++++--------- .../schema/DataFetchingEnvironmentImpl.java | 22 +- .../graphql/schema/DataLoaderWithContext.java | 4 +- ...eferExecutionSupportIntegrationTest.groovy | 58 ++ 14 files changed, 513 insertions(+), 353 deletions(-) diff --git a/src/main/java/graphql/execution/AsyncExecutionStrategy.java b/src/main/java/graphql/execution/AsyncExecutionStrategy.java index f7734df9fb..4c1c9a5bed 100644 --- a/src/main/java/graphql/execution/AsyncExecutionStrategy.java +++ b/src/main/java/graphql/execution/AsyncExecutionStrategy.java @@ -72,14 +72,14 @@ public CompletableFuture execute(ExecutionContext executionCont for (FieldValueInfo completeValueInfo : completeValueInfos) { fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject()); } - dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos); + dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos, parameters); executionStrategyCtx.onFieldValuesInfo(completeValueInfos); fieldValuesFutures.await().whenComplete(handleResultsConsumer); }).exceptionally((ex) -> { // if there are any issues with combining/handling the field results, // complete the future at all costs and bubble up any thrown exception so // the execution does not hang. - dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex); + dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex, parameters); executionStrategyCtx.onFieldValuesException(); overallResult.completeExceptionally(ex); return null; diff --git a/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java b/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java index 98c6ce478b..665777731d 100644 --- a/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java +++ b/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java @@ -74,13 +74,13 @@ private Object resolveSerialField(ExecutionContext executionContext, if (fieldWithInfo instanceof CompletableFuture) { //noinspection unchecked return ((CompletableFuture) fieldWithInfo).thenCompose(fvi -> { - dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi)); + dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi), newParameters); CompletableFuture fieldValueFuture = fvi.getFieldValueFuture(); return fieldValueFuture; }); } else { FieldValueInfo fvi = (FieldValueInfo) fieldWithInfo; - dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi)); + dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi), newParameters); return fvi.getFieldValueObject(); } } diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index d91bf46814..b7f7854197 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -22,11 +22,11 @@ default void executionSerialStrategy(ExecutionContext executionContext, Executio } - default void executionStrategyOnFieldValuesInfo(List fieldValueInfoList) { + default void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { } - default void executionStrategyOnFieldValuesException(Throwable t) { + default void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { } @@ -39,6 +39,10 @@ default void executeObjectOnFieldValuesInfo(List fieldValueInfoL } + default void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) { + + } + default void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { } @@ -59,4 +63,8 @@ default DataFetcher modifyDataFetcher(DataFetcher dataFetcher) { default void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { } + + default void startIncrementalCall() { + + } } diff --git a/src/main/java/graphql/execution/Execution.java b/src/main/java/graphql/execution/Execution.java index f35854a188..aada60c72f 100644 --- a/src/main/java/graphql/execution/Execution.java +++ b/src/main/java/graphql/execution/Execution.java @@ -6,7 +6,6 @@ import graphql.ExecutionInput; import graphql.ExecutionResult; import graphql.ExecutionResultImpl; -import graphql.ExperimentalApi; import graphql.GraphQLContext; import graphql.GraphQLError; import graphql.Internal; @@ -16,7 +15,6 @@ import graphql.execution.instrumentation.InstrumentationState; import graphql.execution.instrumentation.dataloader.FallbackDataLoaderDispatchStrategy; import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy; -import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch; import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters; import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters; import graphql.extensions.ExtensionsBuilder; @@ -37,7 +35,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -261,9 +258,9 @@ private DataLoaderDispatchStrategy createDataLoaderDispatchStrategy(ExecutionCon boolean deferEnabled = executionContext.hasIncrementalSupport(); // Dedicated strategy for defer support, for safety purposes. - return deferEnabled ? - new PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(executionContext) : - new PerLevelDataLoaderDispatchStrategy(executionContext); +// return deferEnabled ? +// new PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(executionContext) : + return new PerLevelDataLoaderDispatchStrategy(executionContext); } else { return new FallbackDataLoaderDispatchStrategy(executionContext); } diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index 355f13106b..1ef600c596 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -5,7 +5,6 @@ import graphql.EngineRunningState; import graphql.ExecutionResult; import graphql.ExecutionResultImpl; -import graphql.ExperimentalApi; import graphql.GraphQLError; import graphql.Internal; import graphql.PublicSpi; @@ -50,7 +49,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -300,7 +298,7 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi ) { MergedSelectionSet fields = parameters.getFields(); - executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls(parameters)); + executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls()); // Only non-deferred fields should be considered for calculating the expected size of futures. Async.CombinedBuilder futures = Async @@ -400,7 +398,6 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec } MergedField field = parameters.getField(); - String pathString = parameters.getPath().toString(); GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType(); // if the DF (like PropertyDataFetcher) does not use the arguments or execution step info then dont build any @@ -435,6 +432,7 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec .parentType(parentType) .selectionSet(fieldCollector) .queryDirectives(queryDirectives) + .deferredCallContext(parameters.getDeferredCallContext()) .build(); }); diff --git a/src/main/java/graphql/execution/ExecutionStrategyParameters.java b/src/main/java/graphql/execution/ExecutionStrategyParameters.java index 58eb3d1767..87dd7057ae 100644 --- a/src/main/java/graphql/execution/ExecutionStrategyParameters.java +++ b/src/main/java/graphql/execution/ExecutionStrategyParameters.java @@ -94,6 +94,7 @@ public ExecutionStrategyParameters getParent() { * @return the deferred call context or null if we're not in the scope of a deferred call */ @Nullable + @Internal public DeferredCallContext getDeferredCallContext() { return deferredCallContext; } diff --git a/src/main/java/graphql/execution/incremental/DeferredCallContext.java b/src/main/java/graphql/execution/incremental/DeferredCallContext.java index d7d494aace..855cad944d 100644 --- a/src/main/java/graphql/execution/incremental/DeferredCallContext.java +++ b/src/main/java/graphql/execution/incremental/DeferredCallContext.java @@ -18,6 +18,22 @@ @Internal public class DeferredCallContext { + private final int startLevel; + private final int fields; + + public DeferredCallContext(int startLevel, int fields) { + this.startLevel = startLevel; + this.fields = fields; + } + + public int getStartLevel() { + return startLevel; + } + + public int getFields() { + return fields; + } + private final List errors = new CopyOnWriteArrayList<>(); public void addErrors(List errors) { @@ -34,4 +50,6 @@ public void addError(GraphQLError graphqlError) { public List getErrors() { return errors; } + + } diff --git a/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java b/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java index a347d9b0cd..6a685bf0a0 100644 --- a/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java +++ b/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java @@ -45,7 +45,7 @@ public interface DeferredExecutionSupport { List getNonDeferredFieldNames(List allFieldNames); - Set> createCalls(ExecutionStrategyParameters executionStrategyParameters); + Set> createCalls(); DeferredExecutionSupport NOOP = new DeferredExecutionSupport.NoOp(); @@ -106,23 +106,25 @@ public List getNonDeferredFieldNames(List allFieldNames) { } @Override - public Set> createCalls(ExecutionStrategyParameters executionStrategyParameters) { + public Set> createCalls() { ImmutableSet deferredExecutions = deferredExecutionToFields.keySet(); Set> set = new HashSet<>(deferredExecutions.size()); for (DeferredExecution deferredExecution : deferredExecutions) { - set.add(this.createDeferredFragmentCall(deferredExecution, executionStrategyParameters)); + set.add(this.createDeferredFragmentCall(deferredExecution)); } return set; } - private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution, ExecutionStrategyParameters executionStrategyParameters) { - DeferredCallContext deferredCallContext = new DeferredCallContext(); + private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution) { + int level = parameters.getPath().getLevel() + 1; + System.out.println("new DeferredFragmentCall for level " + level + " with fields " + deferredFields.size()); + DeferredCallContext deferredCallContext = new DeferredCallContext(level, deferredFields.size()); List mergedFields = deferredExecutionToFields.get(deferredExecution); List>> calls = FpKit.arrayListSizedTo(mergedFields); for (MergedField currentField : mergedFields) { - calls.add(this.createResultSupplier(currentField, deferredCallContext, executionStrategyParameters)); + calls.add(this.createResultSupplier(currentField, deferredCallContext)); } return new DeferredFragmentCall( @@ -135,13 +137,12 @@ private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferr private Supplier> createResultSupplier( MergedField currentField, - DeferredCallContext deferredCallContext, - ExecutionStrategyParameters executionStrategyParameters + DeferredCallContext deferredCallContext ) { Map fields = new LinkedHashMap<>(); fields.put(currentField.getResultKey(), currentField); - ExecutionStrategyParameters callParameters = parameters.transform(builder -> + ExecutionStrategyParameters executionStrategyParameters = parameters.transform(builder -> { MergedSelectionSet mergedSelectionSet = MergedSelectionSet.newMergedSelectionSet().subFields(fields).build(); ResultPath path = parameters.getPath().segment(currentField.getResultKey()); @@ -158,22 +159,23 @@ private Supplier FpKit.interThreadMemoize(() -> { - CompletableFuture fieldValueResult = resolveFieldWithInfoFn - .apply(executionContext, callParameters); + CompletableFuture fieldValueResult = resolveFieldWithInfoFn.apply(executionContext, executionStrategyParameters); + + fieldValueResult.whenComplete((fieldValueInfo, throwable) -> { + executionContext.getDataLoaderDispatcherStrategy().deferredOnFieldValue(currentField.getResultKey(), fieldValueInfo, throwable, executionStrategyParameters); + }); - CompletableFuture executionResultCF = fieldValueResult - .thenCompose(fvi -> { - executionContext.getDataLoaderDispatcherStrategy().executeDeferredOnFieldValueInfo(fvi, executionStrategyParameters); - return fvi - .getFieldValueFuture() - .thenApply(fv -> ExecutionResultImpl.newExecutionResult().data(fv).build()); - } + CompletableFuture executionResultCF = fieldValueResult + .thenCompose(fvi -> fvi + .getFieldValueFuture() + .thenApply(fv -> ExecutionResultImpl.newExecutionResult().data(fv).build()) ); return executionResultCF @@ -207,7 +209,7 @@ public List getNonDeferredFieldNames(List allFieldNames) { } @Override - public Set> createCalls(ExecutionStrategyParameters executionStrategyParameters) { + public Set> createCalls() { return Collections.emptySet(); } } diff --git a/src/main/java/graphql/execution/incremental/IncrementalCallState.java b/src/main/java/graphql/execution/incremental/IncrementalCallState.java index 2f5c9742be..f2c0b9dbc7 100644 --- a/src/main/java/graphql/execution/incremental/IncrementalCallState.java +++ b/src/main/java/graphql/execution/incremental/IncrementalCallState.java @@ -103,4 +103,5 @@ private Supplier> cre public Publisher startDeferredCalls() { return publisher.get(); } + } diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 30ccd838d4..4cce0eac1a 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -7,12 +7,15 @@ import graphql.execution.ExecutionContext; import graphql.execution.ExecutionStrategyParameters; import graphql.execution.FieldValueInfo; +import graphql.execution.incremental.DeferredCallContext; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import graphql.util.InterThreadMemoizedSupplier; import graphql.util.LockKit; import org.dataloader.DataLoader; import org.dataloader.DataLoaderRegistry; +import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import java.util.ArrayList; import java.util.Collections; @@ -30,6 +33,7 @@ import java.util.stream.Collectors; @Internal +@NullMarked public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStrategy { private final CallStack callStack; @@ -44,6 +48,8 @@ public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStr static final long DEFAULT_BATCH_WINDOW_NANO_SECONDS_DEFAULT = 500_000L; + private final Map callStackMap = new ConcurrentHashMap<>(); + private static class CallStack { @@ -84,6 +90,8 @@ private static class CallStack { private boolean batchWindowOpen; + private List deferredFragmentRootFieldsFetched; + public CallStack() { // in the first level there is only one sub selection, // so we only expect one execute object call (which is actually an executionStrategy call) @@ -107,6 +115,7 @@ void increaseFetchCount(int level) { fetchCountPerLevel.increment(level, 1); } + void clearFetchCount() { fetchCountPerLevel.clear(); } @@ -197,60 +206,102 @@ public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, Execu @Override public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { Assert.assertTrue(parameters.getExecutionStepInfo().getPath().isRootPath()); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, parameters); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, parameters, callStack); } @Override public void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { - resetCallStack(); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, 1); + CallStack callStack = getCallStack(parameters); + resetCallStack(callStack); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, 1, callStack); } @Override - public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList) { - onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1); + public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { + CallStack callStack = getCallStack(parameters); + onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1, callStack); } - public void executionStrategyOnFieldValuesException(Throwable t) { + @Override + public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { + CallStack callStack = getCallStack(parameters); callStack.lock.runLocked(() -> callStack.increaseHappenedOnFieldValueCalls(1) ); } + private CallStack getCallStack(ExecutionStrategyParameters parameters) { + return getCallStack(parameters.getDeferredCallContext()); + } + + private CallStack getCallStack(@Nullable DeferredCallContext deferredCallContext) { + if (deferredCallContext == null) { + return this.callStack; + } else { + return callStackMap.computeIfAbsent(deferredCallContext, k -> { + CallStack callStack = new CallStack(); + int startLevel = deferredCallContext.getStartLevel(); + int fields = deferredCallContext.getFields(); + callStack.lock.runLocked(() -> { + callStack.increaseExpectedFetchCount(startLevel, fields); + // we make sure that startLevel-1 is considered done + callStack.expectedExecuteObjectCallsPerLevel.set(startLevel - 1, 0); + callStack.happenedOnFieldValueCallsPerLevel.set(startLevel - 1, 0); + callStack.highestReadyLevel = startLevel - 1; + }); + return callStack; + }); + } + } @Override public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { + CallStack callStack = getCallStack(parameters); int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, parameters); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, parameters, callStack); } @Override public void executeObjectOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { int curLevel = parameters.getPath().getLevel() + 1; - onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel); + CallStack callStack = getCallStack(parameters); + onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, callStack); } + @Override + public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) { + CallStack callStack = getCallStack(parameters); + boolean ready = callStack.lock.callLocked(() -> { + callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); + return callStack.deferredFragmentRootFieldsFetched.size() == parameters.getDeferredCallContext().getFields(); + }); + if (ready) { + int curLevel = parameters.getPath().getLevel() + 1; + onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, curLevel, callStack); + } + } @Override public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { + CallStack callStack = getCallStack(parameters); int curLevel = parameters.getPath().getLevel() + 1; callStack.lock.runLocked(() -> callStack.increaseHappenedOnFieldValueCalls(curLevel) ); } - private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, ExecutionStrategyParameters executionStrategyParameters) { - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, executionStrategyParameters.getFields().size()); + private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, ExecutionStrategyParameters executionStrategyParameters, CallStack callStack) { + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, executionStrategyParameters.getFields().size(), callStack); } - private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, int fieldCount) { + private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, int fieldCount, CallStack callStack) { callStack.lock.runLocked(() -> { callStack.increaseHappenedExecuteObjectCalls(curLevel); callStack.increaseExpectedFetchCount(curLevel, fieldCount); }); } - private void resetCallStack() { + private void resetCallStack(CallStack callStack) { callStack.lock.runLocked(() -> { callStack.clearDispatchLevels(); callStack.clearExpectedObjectCalls(); @@ -269,20 +320,20 @@ private void resetCallStack() { }); } - private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, int curLevel) { + private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, int curLevel, CallStack callStack) { Integer dispatchLevel = callStack.lock.callLocked(() -> - handleOnFieldValuesInfo(fieldValueInfoList, curLevel) + handleOnFieldValuesInfo(fieldValueInfoList, curLevel, callStack) ); // the handle on field values check for the next level if it is ready if (dispatchLevel != null) { - dispatch(dispatchLevel); + dispatch(dispatchLevel, callStack); } } // // thread safety: called with callStack.lock // - private Integer handleOnFieldValuesInfo(List fieldValueInfos, int curLevel) { + private Integer handleOnFieldValuesInfo(List fieldValueInfos, int curLevel, CallStack callStack) { callStack.increaseHappenedOnFieldValueCalls(curLevel); int expectedOnObjectCalls = getObjectCountForList(fieldValueInfos); // on the next level we expect the following on object calls because we found non null objects @@ -296,7 +347,7 @@ private Integer handleOnFieldValuesInfo(List fieldValueInfos, in // if data loader chaining is disabled (the old algo) the level we dispatch is not really relevant as // we dispatch the whole registry anyway - return getHighestReadyLevel(curLevel + 1); + return getHighestReadyLevel(curLevel + 1, callStack); } /** @@ -321,13 +372,14 @@ public void fieldFetched(ExecutionContext executionContext, DataFetcher dataFetcher, Object fetchedValue, Supplier dataFetchingEnvironment) { + CallStack callStack = getCallStack(executionStrategyParameters); int level = executionStrategyParameters.getPath().getLevel(); boolean dispatchNeeded = callStack.lock.callLocked(() -> { callStack.increaseFetchCount(level); - return dispatchIfNeeded(level); + return dispatchIfNeeded(level, callStack); }); if (dispatchNeeded) { - dispatch(level); + dispatch(level, callStack); } } @@ -336,8 +388,8 @@ public void fieldFetched(ExecutionContext executionContext, // // thread safety : called with callStack.lock // - private boolean dispatchIfNeeded(int level) { - boolean ready = checkLevelBeingReady(level); + private boolean dispatchIfNeeded(int level, CallStack callStack) { + boolean ready = checkLevelBeingReady(level, callStack); if (ready) { callStack.setDispatchedLevel(level); return true; @@ -348,10 +400,10 @@ private boolean dispatchIfNeeded(int level) { // // thread safety: called with callStack.lock // - private Integer getHighestReadyLevel(int startFrom) { + private Integer getHighestReadyLevel(int startFrom, CallStack callStack) { int curLevel = callStack.highestReadyLevel; while (true) { - if (!checkLevelImpl(curLevel + 1)) { + if (!checkLevelImpl(curLevel + 1, callStack)) { callStack.highestReadyLevel = curLevel; return curLevel >= startFrom ? curLevel : null; } @@ -359,14 +411,14 @@ private Integer getHighestReadyLevel(int startFrom) { } } - private boolean checkLevelBeingReady(int level) { + private boolean checkLevelBeingReady(int level, CallStack callStack) { Assert.assertTrue(level > 0); if (level <= callStack.highestReadyLevel) { return true; } for (int i = callStack.highestReadyLevel + 1; i <= level; i++) { - if (!checkLevelImpl(i)) { + if (!checkLevelImpl(i, callStack)) { return false; } } @@ -374,7 +426,7 @@ private boolean checkLevelBeingReady(int level) { return true; } - private boolean checkLevelImpl(int level) { + private boolean checkLevelImpl(int level, CallStack callStack) { // a level with zero expectations can't be ready if (callStack.expectedFetchCountPerLevel.get(level) == 0) { return false; @@ -393,7 +445,7 @@ private boolean checkLevelImpl(int level) { return true; } - void dispatch(int level) { + void dispatch(int level, CallStack callStack) { if (!enableDataLoaderChaining) { DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); dataLoaderRegistry.dispatchAll(); @@ -409,7 +461,7 @@ void dispatch(int level) { .map(resultPathWithDataLoader -> resultPathWithDataLoader.resultPath) .collect(Collectors.toSet()); }); - dispatchDLCFImpl(resultPathToDispatch, level); + dispatchDLCFImpl(resultPathToDispatch, level, callStack); } else { callStack.lock.runLocked(() -> { callStack.dispatchingStartedPerLevel.add(level); @@ -419,7 +471,7 @@ void dispatch(int level) { } - public void dispatchDLCFImpl(Set resultPathsToDispatch, Integer level) { + public void dispatchDLCFImpl(Set resultPathsToDispatch, Integer level, CallStack callStack) { // filter out all DataLoaderCFS that are matching the fields we want to dispatch List relevantResultPathWithDataLoader = new ArrayList<>(); @@ -444,18 +496,19 @@ public void dispatchDLCFImpl(Set resultPathsToDispatch, Integer level) { } CompletableFuture.allOf(allDispatchedCFs.toArray(new CompletableFuture[0])) .whenComplete((unused, throwable) -> { - dispatchDLCFImpl(resultPathsToDispatch, level); + dispatchDLCFImpl(resultPathsToDispatch, level, callStack); } ); } - public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataLoader, String dataLoaderName, Object key) { + public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataLoader, String dataLoaderName, Object key, @Nullable DeferredCallContext deferredCallContext) { if (!enableDataLoaderChaining) { return; } ResultPathWithDataLoader resultPathWithDataLoader = new ResultPathWithDataLoader(resultPath, level, dataLoader, dataLoaderName, key); + CallStack callStack = getCallStack(deferredCallContext); boolean levelFinished = callStack.lock.callLocked(() -> { boolean finished = callStack.dispatchingFinishedPerLevel.contains(level); callStack.allResultPathWithDataLoader.add(resultPathWithDataLoader); @@ -466,7 +519,7 @@ public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataL return finished; }); if (levelFinished) { - newDelayedDataLoader(resultPathWithDataLoader); + newDelayedDataLoader(resultPathWithDataLoader, callStack); } @@ -474,6 +527,12 @@ public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataL class DispatchDelayedDataloader implements Runnable { + private final CallStack callStack; + + public DispatchDelayedDataloader(CallStack callStack) { + this.callStack = callStack; + } + @Override public void run() { AtomicReference> resultPathToDispatch = new AtomicReference<>(); @@ -482,16 +541,16 @@ public void run() { callStack.batchWindowOfDelayedDataLoaderToDispatch.clear(); callStack.batchWindowOpen = false; }); - dispatchDLCFImpl(resultPathToDispatch.get(), null); + dispatchDLCFImpl(resultPathToDispatch.get(), null, callStack); } } - private void newDelayedDataLoader(ResultPathWithDataLoader resultPathWithDataLoader) { + private void newDelayedDataLoader(ResultPathWithDataLoader resultPathWithDataLoader, CallStack callStack) { callStack.lock.runLocked(() -> { callStack.batchWindowOfDelayedDataLoaderToDispatch.add(resultPathWithDataLoader.resultPath); if (!callStack.batchWindowOpen) { callStack.batchWindowOpen = true; - delayedDataLoaderDispatchExecutor.get().schedule(new DispatchDelayedDataloader(), this.batchWindowNs, TimeUnit.NANOSECONDS); + delayedDataLoaderDispatchExecutor.get().schedule(new DispatchDelayedDataloader(callStack), this.batchWindowNs, TimeUnit.NANOSECONDS); } }); diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java index 7f996f664b..193ffdd27e 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java @@ -1,279 +1,279 @@ -package graphql.execution.instrumentation.dataloader; - -import graphql.Assert; -import graphql.Internal; -import graphql.execution.DataLoaderDispatchStrategy; -import graphql.execution.ExecutionContext; -import graphql.execution.ExecutionStrategyParameters; -import graphql.execution.FieldValueInfo; -import graphql.execution.MergedField; -import graphql.schema.DataFetcher; -import graphql.schema.DataFetchingEnvironment; -import graphql.util.LockKit; -import org.dataloader.DataLoaderRegistry; - -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; - -/** - * The execution of a query can be divided into 2 phases: first, the non-deferred fields are executed and only once - * they are completely resolved, we start to execute the deferred fields. - * The behavior of this Data Loader strategy is quite different during those 2 phases. During the execution of the - * deferred fields the Data Loader will not attempt to dispatch in a optimal way. It will essentially dispatch for - * every field fetched, which is quite ineffective. - * This is the first iteration of the Data Loader strategy with support for @defer, and it will be improved in the - * future. - */ -@Internal -public class PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch implements DataLoaderDispatchStrategy { - - private final CallStack callStack; - private final ExecutionContext executionContext; - - /** - * This flag is used to determine if we have started the deferred execution. - * The value of this flag is set to true as soon as we identified that a deferred field is being executed, and then - * the flag stays on that state for the remainder of the execution. - */ - private final AtomicBoolean startedDeferredExecution = new AtomicBoolean(false); - - - private static class CallStack { - - private final LockKit.ReentrantLock lock = new LockKit.ReentrantLock(); - private final LevelMap expectedFetchCountPerLevel = new LevelMap(); - private final LevelMap fetchCountPerLevel = new LevelMap(); - private final LevelMap expectedStrategyCallsPerLevel = new LevelMap(); - private final LevelMap happenedStrategyCallsPerLevel = new LevelMap(); - private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap(); - - private final Set dispatchedLevels = new LinkedHashSet<>(); - - public CallStack() { - expectedStrategyCallsPerLevel.set(1, 1); - } - - void increaseExpectedFetchCount(int level, int count) { - expectedFetchCountPerLevel.increment(level, count); - } - - void increaseFetchCount(int level) { - fetchCountPerLevel.increment(level, 1); - } - - void increaseExpectedStrategyCalls(int level, int count) { - expectedStrategyCallsPerLevel.increment(level, count); - } - - void increaseHappenedStrategyCalls(int level) { - happenedStrategyCallsPerLevel.increment(level, 1); - } - - void increaseHappenedOnFieldValueCalls(int level) { - happenedOnFieldValueCallsPerLevel.increment(level, 1); - } - - boolean allStrategyCallsHappened(int level) { - return happenedStrategyCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level); - } - - boolean allOnFieldCallsHappened(int level) { - return happenedOnFieldValueCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level); - } - - boolean allFetchesHappened(int level) { - return fetchCountPerLevel.get(level) == expectedFetchCountPerLevel.get(level); - } - - @Override - public String toString() { - return "CallStack{" + - "expectedFetchCountPerLevel=" + expectedFetchCountPerLevel + - ", fetchCountPerLevel=" + fetchCountPerLevel + - ", expectedStrategyCallsPerLevel=" + expectedStrategyCallsPerLevel + - ", happenedStrategyCallsPerLevel=" + happenedStrategyCallsPerLevel + - ", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel + - ", dispatchedLevels" + dispatchedLevels + - '}'; - } - - - public boolean dispatchIfNotDispatchedBefore(int level) { - if (dispatchedLevels.contains(level)) { - Assert.assertShouldNeverHappen("level " + level + " already dispatched"); - return false; - } - dispatchedLevels.add(level); - return true; - } - } - - public PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(ExecutionContext executionContext) { - this.callStack = new CallStack(); - this.executionContext = executionContext; - } - - @Override - public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { - this.startedDeferredExecution.set(true); - } - - @Override - public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { - if (this.startedDeferredExecution.get()) { - return; - } - int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; - increaseCallCounts(curLevel, parameters); - } - - @Override - public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { - if (this.startedDeferredExecution.get()) { - return; - } - int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; - increaseCallCounts(curLevel, parameters); - } - - @Override - public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList) { - if (this.startedDeferredExecution.get()) { - this.dispatch(); - } - onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1); - } - - @Override - public void executionStrategyOnFieldValuesException(Throwable t) { - callStack.lock.runLocked(() -> - callStack.increaseHappenedOnFieldValueCalls(1) - ); - } - - @Override - public void executeObjectOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { - if (this.startedDeferredExecution.get()) { - this.dispatch(); - } - int curLevel = parameters.getPath().getLevel() + 1; - onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel); - } - - - @Override - public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { - int curLevel = parameters.getPath().getLevel() + 1; - callStack.lock.runLocked(() -> - callStack.increaseHappenedOnFieldValueCalls(curLevel) - ); - } - - @Override - public void fieldFetched(ExecutionContext executionContext, - ExecutionStrategyParameters parameters, - DataFetcher dataFetcher, - Object fetchedValue, - Supplier dataFetchingEnvironment) { - - final boolean dispatchNeeded; - - if (parameters.getField().isDeferred() || this.startedDeferredExecution.get()) { - this.startedDeferredExecution.set(true); - dispatchNeeded = true; - } else { - int level = parameters.getPath().getLevel(); - dispatchNeeded = callStack.lock.callLocked(() -> { - callStack.increaseFetchCount(level); - return dispatchIfNeeded(level); - }); - } - - if (dispatchNeeded) { - dispatch(); - } - - } - - private void increaseCallCounts(int curLevel, ExecutionStrategyParameters parameters) { - int count = 0; - for (MergedField field : parameters.getFields().getSubFieldsList()) { - if (!field.isDeferred()) { - count++; - } - } - int nonDeferredFieldCount = count; - callStack.lock.runLocked(() -> { - callStack.increaseExpectedFetchCount(curLevel, nonDeferredFieldCount); - callStack.increaseHappenedStrategyCalls(curLevel); - }); - } - - private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, int curLevel) { - boolean dispatchNeeded = callStack.lock.callLocked(() -> - handleOnFieldValuesInfo(fieldValueInfoList, curLevel) - ); - if (dispatchNeeded) { - dispatch(); - } - } - - // - // thread safety: called with callStack.lock - // - private boolean handleOnFieldValuesInfo(List fieldValueInfos, int curLevel) { - callStack.increaseHappenedOnFieldValueCalls(curLevel); - int expectedStrategyCalls = getCountForList(fieldValueInfos); - callStack.increaseExpectedStrategyCalls(curLevel + 1, expectedStrategyCalls); - return dispatchIfNeeded(curLevel + 1); - } - - private int getCountForList(List fieldValueInfos) { - int result = 0; - for (FieldValueInfo fieldValueInfo : fieldValueInfos) { - if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.OBJECT) { - result += 1; - } else if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.LIST) { - result += getCountForList(fieldValueInfo.getFieldValueInfos()); - } - } - return result; - } - - // - // thread safety : called with callStack.lock - // - private boolean dispatchIfNeeded(int level) { - boolean ready = levelReady(level); - if (ready) { - return callStack.dispatchIfNotDispatchedBefore(level); - } - return false; - } - - // - // thread safety: called with callStack.lock - // - private boolean levelReady(int level) { - if (level == 1) { - // level 1 is special: there is only one strategy call and that's it - return callStack.allFetchesHappened(1); - } - if (levelReady(level - 1) && callStack.allOnFieldCallsHappened(level - 1) - && callStack.allStrategyCallsHappened(level) && callStack.allFetchesHappened(level)) { - - return true; - } - return false; - } - - void dispatch() { - DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); - dataLoaderRegistry.dispatchAll(); - } - -} - +//package graphql.execution.instrumentation.dataloader; +// +//import graphql.Assert; +//import graphql.Internal; +//import graphql.execution.DataLoaderDispatchStrategy; +//import graphql.execution.ExecutionContext; +//import graphql.execution.ExecutionStrategyParameters; +//import graphql.execution.FieldValueInfo; +//import graphql.execution.MergedField; +//import graphql.schema.DataFetcher; +//import graphql.schema.DataFetchingEnvironment; +//import graphql.util.LockKit; +//import org.dataloader.DataLoaderRegistry; +// +//import java.util.LinkedHashSet; +//import java.util.List; +//import java.util.Set; +//import java.util.concurrent.atomic.AtomicBoolean; +//import java.util.function.Supplier; +// +/// ** +// * The execution of a query can be divided into 2 phases: first, the non-deferred fields are executed and only once +// * they are completely resolved, we start to execute the deferred fields. +// * The behavior of this Data Loader strategy is quite different during those 2 phases. During the execution of the +// * deferred fields the Data Loader will not attempt to dispatch in a optimal way. It will essentially dispatch for +// * every field fetched, which is quite ineffective. +// * This is the first iteration of the Data Loader strategy with support for @defer, and it will be improved in the +// * future. +// */ +//@Internal +//public class PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch implements DataLoaderDispatchStrategy { +// +// private final CallStack callStack; +// private final ExecutionContext executionContext; +// +// /** +// * This flag is used to determine if we have started the deferred execution. +// * The value of this flag is set to true as soon as we identified that a deferred field is being executed, and then +// * the flag stays on that state for the remainder of the execution. +// */ +// private final AtomicBoolean startedDeferredExecution = new AtomicBoolean(false); +// +// +// private static class CallStack { +// +// private final LockKit.ReentrantLock lock = new LockKit.ReentrantLock(); +// private final LevelMap expectedFetchCountPerLevel = new LevelMap(); +// private final LevelMap fetchCountPerLevel = new LevelMap(); +// private final LevelMap expectedStrategyCallsPerLevel = new LevelMap(); +// private final LevelMap happenedStrategyCallsPerLevel = new LevelMap(); +// private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap(); +// +// private final Set dispatchedLevels = new LinkedHashSet<>(); +// +// public CallStack() { +// expectedStrategyCallsPerLevel.set(1, 1); +// } +// +// void increaseExpectedFetchCount(int level, int count) { +// expectedFetchCountPerLevel.increment(level, count); +// } +// +// void increaseFetchCount(int level) { +// fetchCountPerLevel.increment(level, 1); +// } +// +// void increaseExpectedStrategyCalls(int level, int count) { +// expectedStrategyCallsPerLevel.increment(level, count); +// } +// +// void increaseHappenedStrategyCalls(int level) { +// happenedStrategyCallsPerLevel.increment(level, 1); +// } +// +// void increaseHappenedOnFieldValueCalls(int level) { +// happenedOnFieldValueCallsPerLevel.increment(level, 1); +// } +// +// boolean allStrategyCallsHappened(int level) { +// return happenedStrategyCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level); +// } +// +// boolean allOnFieldCallsHappened(int level) { +// return happenedOnFieldValueCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level); +// } +// +// boolean allFetchesHappened(int level) { +// return fetchCountPerLevel.get(level) == expectedFetchCountPerLevel.get(level); +// } +// +// @Override +// public String toString() { +// return "CallStack{" + +// "expectedFetchCountPerLevel=" + expectedFetchCountPerLevel + +// ", fetchCountPerLevel=" + fetchCountPerLevel + +// ", expectedStrategyCallsPerLevel=" + expectedStrategyCallsPerLevel + +// ", happenedStrategyCallsPerLevel=" + happenedStrategyCallsPerLevel + +// ", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel + +// ", dispatchedLevels" + dispatchedLevels + +// '}'; +// } +// +// +// public boolean dispatchIfNotDispatchedBefore(int level) { +// if (dispatchedLevels.contains(level)) { +// Assert.assertShouldNeverHappen("level " + level + " already dispatched"); +// return false; +// } +// dispatchedLevels.add(level); +// return true; +// } +// } +// +// public PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(ExecutionContext executionContext) { +// this.callStack = new CallStack(); +// this.executionContext = executionContext; +// } +// +// @Override +// public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { +// this.startedDeferredExecution.set(true); +// } +// +// @Override +// public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { +// if (this.startedDeferredExecution.get()) { +// return; +// } +// int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; +// increaseCallCounts(curLevel, parameters); +// } +// +// @Override +// public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { +// if (this.startedDeferredExecution.get()) { +// return; +// } +// int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; +// increaseCallCounts(curLevel, parameters); +// } +// +// @Override +// public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList) { +// if (this.startedDeferredExecution.get()) { +// this.dispatch(); +// } +// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1); +// } +// +// @Override +// public void executionStrategyOnFieldValuesException(Throwable t) { +// callStack.lock.runLocked(() -> +// callStack.increaseHappenedOnFieldValueCalls(1) +// ); +// } +// +// @Override +// public void executeObjectOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { +// if (this.startedDeferredExecution.get()) { +// this.dispatch(); +// } +// int curLevel = parameters.getPath().getLevel() + 1; +// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel); +// } +// +// +// @Override +// public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { +// int curLevel = parameters.getPath().getLevel() + 1; +// callStack.lock.runLocked(() -> +// callStack.increaseHappenedOnFieldValueCalls(curLevel) +// ); +// } +// +// @Override +// public void fieldFetched(ExecutionContext executionContext, +// ExecutionStrategyParameters parameters, +// DataFetcher dataFetcher, +// Object fetchedValue, +// Supplier dataFetchingEnvironment) { +// +// final boolean dispatchNeeded; +// +// if (parameters.getField().isDeferred() || this.startedDeferredExecution.get()) { +// this.startedDeferredExecution.set(true); +// dispatchNeeded = true; +// } else { +// int level = parameters.getPath().getLevel(); +// dispatchNeeded = callStack.lock.callLocked(() -> { +// callStack.increaseFetchCount(level); +// return dispatchIfNeeded(level); +// }); +// } +// +// if (dispatchNeeded) { +// dispatch(); +// } +// +// } +// +// private void increaseCallCounts(int curLevel, ExecutionStrategyParameters parameters) { +// int count = 0; +// for (MergedField field : parameters.getFields().getSubFieldsList()) { +// if (!field.isDeferred()) { +// count++; +// } +// } +// int nonDeferredFieldCount = count; +// callStack.lock.runLocked(() -> { +// callStack.increaseExpectedFetchCount(curLevel, nonDeferredFieldCount); +// callStack.increaseHappenedStrategyCalls(curLevel); +// }); +// } +// +// private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, int curLevel) { +// boolean dispatchNeeded = callStack.lock.callLocked(() -> +// handleOnFieldValuesInfo(fieldValueInfoList, curLevel) +// ); +// if (dispatchNeeded) { +// dispatch(); +// } +// } +// +// // +// // thread safety: called with callStack.lock +// // +// private boolean handleOnFieldValuesInfo(List fieldValueInfos, int curLevel) { +// callStack.increaseHappenedOnFieldValueCalls(curLevel); +// int expectedStrategyCalls = getCountForList(fieldValueInfos); +// callStack.increaseExpectedStrategyCalls(curLevel + 1, expectedStrategyCalls); +// return dispatchIfNeeded(curLevel + 1); +// } +// +// private int getCountForList(List fieldValueInfos) { +// int result = 0; +// for (FieldValueInfo fieldValueInfo : fieldValueInfos) { +// if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.OBJECT) { +// result += 1; +// } else if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.LIST) { +// result += getCountForList(fieldValueInfo.getFieldValueInfos()); +// } +// } +// return result; +// } +// +// // +// // thread safety : called with callStack.lock +// // +// private boolean dispatchIfNeeded(int level) { +// boolean ready = levelReady(level); +// if (ready) { +// return callStack.dispatchIfNotDispatchedBefore(level); +// } +// return false; +// } +// +// // +// // thread safety: called with callStack.lock +// // +// private boolean levelReady(int level) { +// if (level == 1) { +// // level 1 is special: there is only one strategy call and that's it +// return callStack.allFetchesHappened(1); +// } +// if (levelReady(level - 1) && callStack.allOnFieldCallsHappened(level - 1) +// && callStack.allStrategyCallsHappened(level) && callStack.allFetchesHappened(level)) { +// +// return true; +// } +// return false; +// } +// +// void dispatch() { +// DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); +// dataLoaderRegistry.dispatchAll(); +// } +// +//} +// diff --git a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java index 0dd0e30674..20d3bbdb8c 100644 --- a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java +++ b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java @@ -12,6 +12,7 @@ import graphql.execution.ExecutionStepInfo; import graphql.execution.MergedField; import graphql.execution.directives.QueryDirectives; +import graphql.execution.incremental.DeferredCallContext; import graphql.language.Document; import graphql.language.Field; import graphql.language.FragmentDefinition; @@ -78,7 +79,7 @@ private DataFetchingEnvironmentImpl(Builder builder) { this.queryDirectives = builder.queryDirectives; // internal state - this.dfeInternalState = new DFEInternalState(builder.dataLoaderDispatchStrategy); + this.dfeInternalState = new DFEInternalState(builder.dataLoaderDispatchStrategy, builder.deferredCallContext); } /** @@ -105,7 +106,9 @@ public static Builder newDataFetchingEnvironment(ExecutionContext executionConte .operationDefinition(executionContext.getOperationDefinition()) .variables(executionContext.getCoercedVariables().toMap()) .executionId(executionContext.getExecutionId()) - .dataLoaderDispatchStrategy(executionContext.getDataLoaderDispatcherStrategy()); + .dataLoaderDispatchStrategy(executionContext.getDataLoaderDispatcherStrategy()) + ; + } @@ -282,6 +285,7 @@ public static class Builder { private ImmutableMapWithNullValues variables; private QueryDirectives queryDirectives; private DataLoaderDispatchStrategy dataLoaderDispatchStrategy; + private DeferredCallContext deferredCallContext; public Builder(DataFetchingEnvironmentImpl env) { this.source = env.source; @@ -306,6 +310,7 @@ public Builder(DataFetchingEnvironmentImpl env) { this.variables = env.variables; this.queryDirectives = env.queryDirectives; this.dataLoaderDispatchStrategy = env.dfeInternalState.dataLoaderDispatchStrategy; + this.deferredCallContext = env.dfeInternalState.deferredCallContext; } public Builder() { @@ -425,6 +430,11 @@ public Builder queryDirectives(QueryDirectives queryDirectives) { return this; } + public Builder deferredCallContext(DeferredCallContext deferredCallContext) { + this.deferredCallContext = deferredCallContext; + return this; + } + public DataFetchingEnvironment build() { return new DataFetchingEnvironmentImpl(this); } @@ -438,13 +448,19 @@ public Builder dataLoaderDispatchStrategy(DataLoaderDispatchStrategy dataLoaderD @Internal public static class DFEInternalState { final DataLoaderDispatchStrategy dataLoaderDispatchStrategy; + final DeferredCallContext deferredCallContext; - public DFEInternalState(DataLoaderDispatchStrategy dataLoaderDispatchStrategy) { + public DFEInternalState(DataLoaderDispatchStrategy dataLoaderDispatchStrategy, DeferredCallContext deferredCallContext) { this.dataLoaderDispatchStrategy = dataLoaderDispatchStrategy; + this.deferredCallContext = deferredCallContext; } public DataLoaderDispatchStrategy getDataLoaderDispatchStrategy() { return dataLoaderDispatchStrategy; } + + public DeferredCallContext getDeferredCallContext() { + return deferredCallContext; + } } } diff --git a/src/main/java/graphql/schema/DataLoaderWithContext.java b/src/main/java/graphql/schema/DataLoaderWithContext.java index a4b56814ca..0c6ae9e1d7 100644 --- a/src/main/java/graphql/schema/DataLoaderWithContext.java +++ b/src/main/java/graphql/schema/DataLoaderWithContext.java @@ -1,6 +1,7 @@ package graphql.schema; import graphql.Internal; +import graphql.execution.incremental.DeferredCallContext; import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy; import org.dataloader.DataLoader; import org.dataloader.DelegatingDataLoader; @@ -32,7 +33,8 @@ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { String path = dfe.getExecutionStepInfo().getPath().toString(); DataFetchingEnvironmentImpl.DFEInternalState dfeInternalState = (DataFetchingEnvironmentImpl.DFEInternalState) dfeImpl.toInternal(); if (dfeInternalState.getDataLoaderDispatchStrategy() instanceof PerLevelDataLoaderDispatchStrategy) { - ((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderLoadCall(path, level, delegate, dataLoaderName, key); + DeferredCallContext deferredCallContext = dfeInternalState.getDeferredCallContext(); + ((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderLoadCall(path, level, delegate, dataLoaderName, key, deferredCallContext); } return result; } diff --git a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy index 49e51eaf5a..850634be91 100644 --- a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy @@ -18,6 +18,10 @@ import graphql.schema.DataFetchingEnvironment import graphql.schema.TypeResolver import graphql.schema.idl.RuntimeWiring import org.awaitility.Awaitility +import org.dataloader.BatchLoader +import org.dataloader.DataLoader +import org.dataloader.DataLoaderFactory +import org.dataloader.DataLoaderRegistry import org.reactivestreams.Publisher import spock.lang.Specification import spock.lang.Unroll @@ -62,6 +66,8 @@ class DeferExecutionSupportIntegrationTest extends Specification { typeMismatchError: [String] nonNullableError: String! wordCount: Int + fieldWithDataLoader1: String + fieldWithDataLoader2: String } type Comment { @@ -90,6 +96,13 @@ class DeferExecutionSupportIntegrationTest extends Specification { return resolve(value, sleepMs, false) } + private static DataFetcher fieldWithDataLoader(String key) { + return (dfe) -> { + def dataLoader = dfe.getDataLoader("someDataLoader") + return dataLoader.load(dfe.getSource().id + "-" + key) + }; + } + private static DataFetcher resolve(Object value, Integer sleepMs, boolean allowMultipleCalls) { return new DataFetcher() { boolean executed = false @@ -163,6 +176,8 @@ class DeferExecutionSupportIntegrationTest extends Specification { .dataFetcher("item", resolveItem()) ) .type(newTypeWiring("Post").dataFetcher("summary", resolve("A summary", 10))) + .type(newTypeWiring("Post").dataFetcher("fieldWithDataLoader1", fieldWithDataLoader("fieldWithDataLoader1"))) + .type(newTypeWiring("Post").dataFetcher("fieldWithDataLoader2", fieldWithDataLoader("fieldWithDataLoader2"))) .type(newTypeWiring("Post").dataFetcher("text", resolve("The full text", 100))) .type(newTypeWiring("Post").dataFetcher("wordCount", resolve(45999, 10, true))) .type(newTypeWiring("Post").dataFetcher("latestComment", resolve([title: "Comment title"], 10))) @@ -1674,6 +1689,41 @@ class DeferExecutionSupportIntegrationTest extends Specification { } + def "dataloader used inside defer"() { + given: + def query = ''' + query { + post { + id + ...@defer { + fieldWithDataLoader1 + fieldWithDataLoader2 + } + } + } + ''' + when: + def initialResult = executeQuery(query) + + then: + initialResult.toSpecification() == [ + data : [post: [id: "1001"]], + hasNext: true + ] + + println "initialResult = $initialResult" + when: + def incrementalResults = getIncrementalResults(initialResult) + + then: + + incrementalResults.size() == 1 + incrementalResults[0] == [incremental: [[path: ["post"], data: [fieldWithDataLoader1: "1001-fieldWithDataLoader1", fieldWithDataLoader2: "1001-fieldWithDataLoader2"]]], + hasNext : false + ] + + } + private ExecutionResult executeQuery(String query) { return this.executeQuery(query, true, [:]) @@ -1684,11 +1734,19 @@ class DeferExecutionSupportIntegrationTest extends Specification { } private ExecutionResult executeQuery(String query, boolean incrementalSupport, Map variables) { + BatchLoader batchLoader = { keys -> + println "batchlaoder called with keys $keys" + return CompletableFuture.completedFuture(keys) + } + DataLoader dl = DataLoaderFactory.newDataLoader(batchLoader) + DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry(); + dataLoaderRegistry.register("someDataLoader", dl) return graphQL.execute( ExecutionInput.newExecutionInput() .graphQLContext([(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT): incrementalSupport]) .query(query) .variables(variables) + .dataLoaderRegistry(dataLoaderRegistry) .build() ) } From da6886a15022cdf549956f7b1ed095a92e39e447 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 22 May 2025 22:21:56 +1000 Subject: [PATCH 02/14] green tests --- .../execution/AsyncExecutionStrategy.java | 4 +- .../execution/DataLoaderDispatchStrategy.java | 4 +- .../graphql/execution/ExecutionStrategy.java | 2 +- .../incremental/DeferredCallContext.java | 7 ++ .../incremental/DeferredExecutionSupport.java | 1 - .../PerLevelDataLoaderDispatchStrategy.java | 85 +++++++++++-------- .../IncrementalCallStateDeferTest.groovy | 1 + .../dataloader/BatchCompareDataFetchers.java | 2 + .../DataLoaderPerformanceData.groovy | 12 +-- .../dataloader/DeferWithDataLoaderTest.groovy | 12 ++- 10 files changed, 75 insertions(+), 55 deletions(-) diff --git a/src/main/java/graphql/execution/AsyncExecutionStrategy.java b/src/main/java/graphql/execution/AsyncExecutionStrategy.java index 4c1c9a5bed..7f51149089 100644 --- a/src/main/java/graphql/execution/AsyncExecutionStrategy.java +++ b/src/main/java/graphql/execution/AsyncExecutionStrategy.java @@ -39,7 +39,6 @@ public AsyncExecutionStrategy(DataFetcherExceptionHandler exceptionHandler) { @SuppressWarnings("FutureReturnValueIgnored") public CompletableFuture execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException { DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy(); - dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters); Instrumentation instrumentation = executionContext.getInstrumentation(); InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters); @@ -54,6 +53,9 @@ public CompletableFuture execute(ExecutionContext executionCont } DeferredExecutionSupport deferredExecutionSupport = createDeferredExecutionSupport(executionContext, parameters); + + dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters, deferredExecutionSupport.getNonDeferredFieldNames(fieldNames).size()); + Async.CombinedBuilder futures = getAsyncFieldValueInfo(executionContext, parameters, deferredExecutionSupport); CompletableFuture overallResult = new CompletableFuture<>(); diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index b7f7854197..871fcb15b8 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -14,7 +14,7 @@ public interface DataLoaderDispatchStrategy { }; - default void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { + default void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { } @@ -31,7 +31,7 @@ default void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrat } - default void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters executionStrategyParameters) { + default void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters executionStrategyParameters, int fieldCount) { } diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index 1ef600c596..c1323f0b84 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -195,7 +195,6 @@ public static String mkNameForPath(List currentField) { @DuckTyped(shape = "CompletableFuture> | Map") protected Object executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException { DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy(); - dataLoaderDispatcherStrategy.executeObject(executionContext, parameters); Instrumentation instrumentation = executionContext.getInstrumentation(); InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters); @@ -210,6 +209,7 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat CompletableFuture> overallResult = new CompletableFuture<>(); List fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames); + dataLoaderDispatcherStrategy.executeObject(executionContext, parameters, fieldsExecutedOnInitialResult.size()); BiConsumer, Throwable> handleResultsConsumer = buildFieldValueMap(fieldsExecutedOnInitialResult, overallResult, executionContext); resolveObjectCtx.onDispatched(); diff --git a/src/main/java/graphql/execution/incremental/DeferredCallContext.java b/src/main/java/graphql/execution/incremental/DeferredCallContext.java index 855cad944d..638577f729 100644 --- a/src/main/java/graphql/execution/incremental/DeferredCallContext.java +++ b/src/main/java/graphql/execution/incremental/DeferredCallContext.java @@ -2,6 +2,7 @@ import graphql.GraphQLError; import graphql.Internal; +import graphql.VisibleForTesting; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -26,6 +27,12 @@ public DeferredCallContext(int startLevel, int fields) { this.fields = fields; } + @VisibleForTesting + public DeferredCallContext() { + this.startLevel = 0; + this.fields = 0; + } + public int getStartLevel() { return startLevel; } diff --git a/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java b/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java index 6a685bf0a0..ade6242d24 100644 --- a/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java +++ b/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java @@ -117,7 +117,6 @@ public Set> createCalls() { private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution) { int level = parameters.getPath().getLevel() + 1; - System.out.println("new DeferredFragmentCall for level " + level + " with fields " + deferredFields.size()); DeferredCallContext deferredCallContext = new DeferredCallContext(level, deferredFields.size()); List mergedFields = deferredExecutionToFields.get(deferredExecution); diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 4cce0eac1a..7ce27277a3 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -36,7 +36,7 @@ @NullMarked public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStrategy { - private final CallStack callStack; + private final CallStack initialCallStack; private final ExecutionContext executionContext; private final long batchWindowNs; private final boolean enableDataLoaderChaining; @@ -90,12 +90,12 @@ private static class CallStack { private boolean batchWindowOpen; - private List deferredFragmentRootFieldsFetched; + private final List deferredFragmentRootFieldsFetched = new ArrayList<>(); public CallStack() { // in the first level there is only one sub selection, // so we only expect one execute object call (which is actually an executionStrategy call) - expectedExecuteObjectCallsPerLevel.set(1, 1); + expectedExecuteObjectCallsPerLevel.set(0, 1); } public void addResultPathWithDataLoader(int level, ResultPathWithDataLoader resultPathWithDataLoader) { @@ -148,8 +148,8 @@ boolean allExecuteObjectCallsHappened(int level) { return happenedExecuteObjectCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level); } - boolean allSubSelectionsFetchingHappened(int level) { - return happenedOnFieldValueCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level); + boolean allSubSelectionsFetchingHappened(int subSelectionLevel) { + return happenedOnFieldValueCallsPerLevel.get(subSelectionLevel) == expectedExecuteObjectCallsPerLevel.get(subSelectionLevel - 1); } boolean allFetchesHappened(int level) { @@ -181,7 +181,7 @@ public void setDispatchedLevel(int level) { } public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) { - this.callStack = new CallStack(); + this.initialCallStack = new CallStack(); this.executionContext = executionContext; GraphQLContext graphQLContext = executionContext.getGraphQLContext(); @@ -204,21 +204,22 @@ public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, Execu } @Override - public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { + public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { Assert.assertTrue(parameters.getExecutionStepInfo().getPath().isRootPath()); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, parameters, callStack); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(0, fieldCount, initialCallStack); } @Override public void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); resetCallStack(callStack); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, 1, callStack); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(0, 1, callStack); } @Override public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); + // the root fields are the root sub selection on level 1 onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1, callStack); } @@ -236,18 +237,19 @@ private CallStack getCallStack(ExecutionStrategyParameters parameters) { private CallStack getCallStack(@Nullable DeferredCallContext deferredCallContext) { if (deferredCallContext == null) { - return this.callStack; + return this.initialCallStack; } else { return callStackMap.computeIfAbsent(deferredCallContext, k -> { CallStack callStack = new CallStack(); int startLevel = deferredCallContext.getStartLevel(); int fields = deferredCallContext.getFields(); callStack.lock.runLocked(() -> { + callStack.expectedExecuteObjectCallsPerLevel.set(0, 0); // set to 1 in the constructor of CallStack + callStack.expectedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); + callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); + callStack.highestReadyLevel = startLevel - 1; callStack.increaseExpectedFetchCount(startLevel, fields); // we make sure that startLevel-1 is considered done - callStack.expectedExecuteObjectCallsPerLevel.set(startLevel - 1, 0); - callStack.happenedOnFieldValueCallsPerLevel.set(startLevel - 1, 0); - callStack.highestReadyLevel = startLevel - 1; }); return callStack; }); @@ -255,28 +257,31 @@ private CallStack getCallStack(@Nullable DeferredCallContext deferredCallContext } @Override - public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { + public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { CallStack callStack = getCallStack(parameters); - int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, parameters, callStack); + int curLevel = parameters.getPath().getLevel(); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, fieldCount, callStack); } @Override - public void executeObjectOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { + public void executeObjectOnFieldValuesInfo + (List fieldValueInfoList, ExecutionStrategyParameters parameters) { + // the level of the sub selection that is fully fetched is one level more than parameters level int curLevel = parameters.getPath().getLevel() + 1; CallStack callStack = getCallStack(parameters); onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, callStack); } @Override - public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) { + public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable + throwable, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); boolean ready = callStack.lock.callLocked(() -> { callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); return callStack.deferredFragmentRootFieldsFetched.size() == parameters.getDeferredCallContext().getFields(); }); if (ready) { - int curLevel = parameters.getPath().getLevel() + 1; + int curLevel = parameters.getPath().getLevel(); onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, curLevel, callStack); } } @@ -284,20 +289,20 @@ public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo @Override public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); + // the level of the sub selection that is errored is one level more than parameters level int curLevel = parameters.getPath().getLevel() + 1; callStack.lock.runLocked(() -> callStack.increaseHappenedOnFieldValueCalls(curLevel) ); } - private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, ExecutionStrategyParameters executionStrategyParameters, CallStack callStack) { - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, executionStrategyParameters.getFields().size(), callStack); - } - private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, int fieldCount, CallStack callStack) { + private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, + int fieldCount, + CallStack callStack) { callStack.lock.runLocked(() -> { callStack.increaseHappenedExecuteObjectCalls(curLevel); - callStack.increaseExpectedFetchCount(curLevel, fieldCount); + callStack.increaseExpectedFetchCount(curLevel + 1, fieldCount); }); } @@ -309,7 +314,7 @@ private void resetCallStack(CallStack callStack) { callStack.clearFetchCount(); callStack.clearHappenedExecuteObjectCalls(); callStack.clearHappenedOnFieldValueCalls(); - callStack.expectedExecuteObjectCallsPerLevel.set(1, 1); + callStack.expectedExecuteObjectCallsPerLevel.set(0, 1); callStack.dispatchingFinishedPerLevel.clear(); callStack.dispatchingStartedPerLevel.clear(); callStack.allResultPathWithDataLoader.clear(); @@ -320,9 +325,11 @@ private void resetCallStack(CallStack callStack) { }); } - private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, int curLevel, CallStack callStack) { + private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, + int subSelectionLevel, + CallStack callStack) { Integer dispatchLevel = callStack.lock.callLocked(() -> - handleOnFieldValuesInfo(fieldValueInfoList, curLevel, callStack) + handleSubSelectionFetched(fieldValueInfoList, subSelectionLevel, callStack) ); // the handle on field values check for the next level if it is ready if (dispatchLevel != null) { @@ -333,11 +340,12 @@ private void onFieldValuesInfoDispatchIfNeeded(List fieldValueIn // // thread safety: called with callStack.lock // - private Integer handleOnFieldValuesInfo(List fieldValueInfos, int curLevel, CallStack callStack) { - callStack.increaseHappenedOnFieldValueCalls(curLevel); + private Integer handleSubSelectionFetched(List fieldValueInfos, int subSelectionLevel, CallStack + callStack) { + callStack.increaseHappenedOnFieldValueCalls(subSelectionLevel); int expectedOnObjectCalls = getObjectCountForList(fieldValueInfos); - // on the next level we expect the following on object calls because we found non null objects - callStack.increaseExpectedExecuteObjectCalls(curLevel + 1, expectedOnObjectCalls); + // we expect on the level of the current sub selection #expectedOnObjectCalls execute object calls + callStack.increaseExpectedExecuteObjectCalls(subSelectionLevel, expectedOnObjectCalls); // maybe the object calls happened already (because the DataFetcher return directly values synchronously) // therefore we check the next levels if they are ready // this means we could skip some level because the higher level is also already ready, @@ -347,7 +355,7 @@ private Integer handleOnFieldValuesInfo(List fieldValueInfos, in // if data loader chaining is disabled (the old algo) the level we dispatch is not really relevant as // we dispatch the whole registry anyway - return getHighestReadyLevel(curLevel + 1, callStack); + return getHighestReadyLevel(subSelectionLevel + 1, callStack); } /** @@ -431,14 +439,16 @@ private boolean checkLevelImpl(int level, CallStack callStack) { if (callStack.expectedFetchCountPerLevel.get(level) == 0) { return false; } - // level 1 is special: there is no previous sub selections - // and the expected execution object calls is always 1 - if (level > 1 && !callStack.allSubSelectionsFetchingHappened(level - 1)) { + + // first we make sure that the expected fetch count is correct + // by verifying that the parent level all execute object + sub selection were fetched + if (!callStack.allExecuteObjectCallsHappened(level - 1)) { return false; } - if (!callStack.allExecuteObjectCallsHappened(level)) { + if (level > 1 && !callStack.allSubSelectionsFetchingHappened(level - 1)) { return false; } + // the main check: all fetches must have happened if (!callStack.allFetchesHappened(level)) { return false; } @@ -503,7 +513,8 @@ public void dispatchDLCFImpl(Set resultPathsToDispatch, Integer level, C } - public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataLoader, String dataLoaderName, Object key, @Nullable DeferredCallContext deferredCallContext) { + public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataLoader, String + dataLoaderName, Object key, @Nullable DeferredCallContext deferredCallContext) { if (!enableDataLoaderChaining) { return; } diff --git a/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy b/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy index d99b49fae4..a8ded32a3a 100644 --- a/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy @@ -34,6 +34,7 @@ class IncrementalCallStateDeferTest extends Specification { results[2].incremental[0].data["a"] == "A" } + // flaky def "calls within calls are enqueued correctly"() { given: def incrementalCallState = new IncrementalCallState() diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java b/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java index 08edd13248..db5989dec9 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java @@ -83,6 +83,7 @@ private static List> getDepartmentsForShops(List shops) { private BatchLoader> departmentsForShopsBatchLoader = ids -> maybeAsyncWithSleep(() -> { + System.out.println("departments for shops batch loader called with ids: " + ids); departmentsForShopsBatchLoaderCounter.incrementAndGet(); List shopList = new ArrayList<>(); for (String id : ids) { @@ -128,6 +129,7 @@ private static List> getProductsForDepartments(List de } private BatchLoader> productsForDepartmentsBatchLoader = ids -> maybeAsyncWithSleep(() -> { + System.out.println("products for deparments batch loader called with ids: " + ids); productsForDepartmentsBatchLoaderCounter.incrementAndGet(); List d = ids.stream().map(departments::get).collect(Collectors.toList()); return completedFuture(getProductsForDepartments(d)); diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceData.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceData.groovy index 7366ded562..5e72e5f2ad 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceData.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceData.groovy @@ -68,14 +68,14 @@ class DataLoaderPerformanceData { static String getQuery(boolean deferDepartments, boolean deferProducts) { return """ query { - shops { - id name + shops { # 1 + id name # 2 ... @defer(if: $deferDepartments) { - departments { - id name + departments { # 2 + id name # 3 ... @defer(if: $deferProducts) { - products { - id name + products { # 3 + id name # 4 } } } diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy index 6da9489c76..f6676bdf3e 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy @@ -7,8 +7,6 @@ import graphql.incremental.IncrementalExecutionResult import org.dataloader.DataLoaderRegistry import spock.lang.Specification -import java.util.stream.Collectors - import static graphql.ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.combineExecutionResults import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.expectedData @@ -90,7 +88,7 @@ class DeferWithDataLoaderTest extends Specification { combined.data == expectedData batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 3 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 9 + batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 3 } def "multiple fields on same defer block"() { @@ -320,10 +318,10 @@ class DeferWithDataLoaderTest extends Specification { [ ["expensiveShops", 0], ["expensiveShops", 1], ["expensiveShops", 2], ["shops", 0], ["shops", 1], ["shops", 2], - ["shops", 0, "departments", 0], ["shops", 0, "departments", 1],["shops", 0, "departments", 2], ["shops", 1, "departments", 0],["shops", 1, "departments", 1], ["shops", 1, "departments", 2], ["shops", 2, "departments", 0],["shops", 2, "departments", 1],["shops", 2, "departments", 2], - ["shops", 0, "expensiveDepartments", 0], ["shops", 0, "expensiveDepartments", 1], ["shops", 0, "expensiveDepartments", 2], ["shops", 1, "expensiveDepartments", 0], ["shops", 1, "expensiveDepartments", 1], ["shops", 1, "expensiveDepartments", 2], ["shops", 2, "expensiveDepartments", 0], ["shops", 2, "expensiveDepartments", 1],["shops", 2, "expensiveDepartments", 2], - ["expensiveShops", 0, "expensiveDepartments", 0], ["expensiveShops", 0, "expensiveDepartments", 1], ["expensiveShops", 0, "expensiveDepartments", 2], ["expensiveShops", 1, "expensiveDepartments", 0], ["expensiveShops", 1, "expensiveDepartments", 1], ["expensiveShops", 1, "expensiveDepartments", 2], ["expensiveShops", 2, "expensiveDepartments", 0], ["expensiveShops", 2, "expensiveDepartments", 1],["expensiveShops", 2, "expensiveDepartments", 2], - ["expensiveShops", 0, "departments", 0], ["expensiveShops", 0, "departments", 1], ["expensiveShops", 0, "departments", 2], ["expensiveShops", 1, "departments", 0], ["expensiveShops", 1, "departments", 1], ["expensiveShops", 1, "departments", 2], ["expensiveShops", 2, "departments", 0], ["expensiveShops", 2, "departments", 1],["expensiveShops", 2, "departments", 2]] + ["shops", 0, "departments", 0], ["shops", 0, "departments", 1], ["shops", 0, "departments", 2], ["shops", 1, "departments", 0], ["shops", 1, "departments", 1], ["shops", 1, "departments", 2], ["shops", 2, "departments", 0], ["shops", 2, "departments", 1], ["shops", 2, "departments", 2], + ["shops", 0, "expensiveDepartments", 0], ["shops", 0, "expensiveDepartments", 1], ["shops", 0, "expensiveDepartments", 2], ["shops", 1, "expensiveDepartments", 0], ["shops", 1, "expensiveDepartments", 1], ["shops", 1, "expensiveDepartments", 2], ["shops", 2, "expensiveDepartments", 0], ["shops", 2, "expensiveDepartments", 1], ["shops", 2, "expensiveDepartments", 2], + ["expensiveShops", 0, "expensiveDepartments", 0], ["expensiveShops", 0, "expensiveDepartments", 1], ["expensiveShops", 0, "expensiveDepartments", 2], ["expensiveShops", 1, "expensiveDepartments", 0], ["expensiveShops", 1, "expensiveDepartments", 1], ["expensiveShops", 1, "expensiveDepartments", 2], ["expensiveShops", 2, "expensiveDepartments", 0], ["expensiveShops", 2, "expensiveDepartments", 1], ["expensiveShops", 2, "expensiveDepartments", 2], + ["expensiveShops", 0, "departments", 0], ["expensiveShops", 0, "departments", 1], ["expensiveShops", 0, "departments", 2], ["expensiveShops", 1, "departments", 0], ["expensiveShops", 1, "departments", 1], ["expensiveShops", 1, "departments", 2], ["expensiveShops", 2, "departments", 0], ["expensiveShops", 2, "departments", 1], ["expensiveShops", 2, "departments", 2]] ) when: From 018248a8eeacf6eda11c3062e2189d72658bd6f6 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 22 May 2025 22:23:38 +1000 Subject: [PATCH 03/14] update gradle wrapper validation --- .github/workflows/master.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index fd83244111..f1e82b93c7 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -16,7 +16,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: gradle/wrapper-validation-action@v3 + - uses: gradle/actions/wrapper-validation@v4 - name: Set up JDK 11 uses: actions/setup-java@v4 with: From 9aaa1185bffe80d0eac2217cfa041bc2430f57f4 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 22 May 2025 22:24:01 +1000 Subject: [PATCH 04/14] update gradle wrapper validation --- .github/workflows/pull_request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index b3c336d59e..ff9dcb03e3 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: gradle/wrapper-validation-action@v3 + - uses: gradle/actions/wrapper-validation@v4 - name: Set up JDK 11 uses: actions/setup-java@v4 with: From 386ce8f822df8e3fbbc9551a821e8d405d00d895 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 07:39:51 +1000 Subject: [PATCH 05/14] cleanup --- .../java/graphql/execution/Execution.java | 5 - ...spatchStrategyWithDeferAlwaysDispatch.java | 279 ------------------ 2 files changed, 284 deletions(-) delete mode 100644 src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java diff --git a/src/main/java/graphql/execution/Execution.java b/src/main/java/graphql/execution/Execution.java index aada60c72f..60447c9996 100644 --- a/src/main/java/graphql/execution/Execution.java +++ b/src/main/java/graphql/execution/Execution.java @@ -255,11 +255,6 @@ private DataLoaderDispatchStrategy createDataLoaderDispatchStrategy(ExecutionCon return DataLoaderDispatchStrategy.NO_OP; } if (!executionContext.isSubscriptionOperation()) { - boolean deferEnabled = executionContext.hasIncrementalSupport(); - - // Dedicated strategy for defer support, for safety purposes. -// return deferEnabled ? -// new PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(executionContext) : return new PerLevelDataLoaderDispatchStrategy(executionContext); } else { return new FallbackDataLoaderDispatchStrategy(executionContext); diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java deleted file mode 100644 index 193ffdd27e..0000000000 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java +++ /dev/null @@ -1,279 +0,0 @@ -//package graphql.execution.instrumentation.dataloader; -// -//import graphql.Assert; -//import graphql.Internal; -//import graphql.execution.DataLoaderDispatchStrategy; -//import graphql.execution.ExecutionContext; -//import graphql.execution.ExecutionStrategyParameters; -//import graphql.execution.FieldValueInfo; -//import graphql.execution.MergedField; -//import graphql.schema.DataFetcher; -//import graphql.schema.DataFetchingEnvironment; -//import graphql.util.LockKit; -//import org.dataloader.DataLoaderRegistry; -// -//import java.util.LinkedHashSet; -//import java.util.List; -//import java.util.Set; -//import java.util.concurrent.atomic.AtomicBoolean; -//import java.util.function.Supplier; -// -/// ** -// * The execution of a query can be divided into 2 phases: first, the non-deferred fields are executed and only once -// * they are completely resolved, we start to execute the deferred fields. -// * The behavior of this Data Loader strategy is quite different during those 2 phases. During the execution of the -// * deferred fields the Data Loader will not attempt to dispatch in a optimal way. It will essentially dispatch for -// * every field fetched, which is quite ineffective. -// * This is the first iteration of the Data Loader strategy with support for @defer, and it will be improved in the -// * future. -// */ -//@Internal -//public class PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch implements DataLoaderDispatchStrategy { -// -// private final CallStack callStack; -// private final ExecutionContext executionContext; -// -// /** -// * This flag is used to determine if we have started the deferred execution. -// * The value of this flag is set to true as soon as we identified that a deferred field is being executed, and then -// * the flag stays on that state for the remainder of the execution. -// */ -// private final AtomicBoolean startedDeferredExecution = new AtomicBoolean(false); -// -// -// private static class CallStack { -// -// private final LockKit.ReentrantLock lock = new LockKit.ReentrantLock(); -// private final LevelMap expectedFetchCountPerLevel = new LevelMap(); -// private final LevelMap fetchCountPerLevel = new LevelMap(); -// private final LevelMap expectedStrategyCallsPerLevel = new LevelMap(); -// private final LevelMap happenedStrategyCallsPerLevel = new LevelMap(); -// private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap(); -// -// private final Set dispatchedLevels = new LinkedHashSet<>(); -// -// public CallStack() { -// expectedStrategyCallsPerLevel.set(1, 1); -// } -// -// void increaseExpectedFetchCount(int level, int count) { -// expectedFetchCountPerLevel.increment(level, count); -// } -// -// void increaseFetchCount(int level) { -// fetchCountPerLevel.increment(level, 1); -// } -// -// void increaseExpectedStrategyCalls(int level, int count) { -// expectedStrategyCallsPerLevel.increment(level, count); -// } -// -// void increaseHappenedStrategyCalls(int level) { -// happenedStrategyCallsPerLevel.increment(level, 1); -// } -// -// void increaseHappenedOnFieldValueCalls(int level) { -// happenedOnFieldValueCallsPerLevel.increment(level, 1); -// } -// -// boolean allStrategyCallsHappened(int level) { -// return happenedStrategyCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level); -// } -// -// boolean allOnFieldCallsHappened(int level) { -// return happenedOnFieldValueCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level); -// } -// -// boolean allFetchesHappened(int level) { -// return fetchCountPerLevel.get(level) == expectedFetchCountPerLevel.get(level); -// } -// -// @Override -// public String toString() { -// return "CallStack{" + -// "expectedFetchCountPerLevel=" + expectedFetchCountPerLevel + -// ", fetchCountPerLevel=" + fetchCountPerLevel + -// ", expectedStrategyCallsPerLevel=" + expectedStrategyCallsPerLevel + -// ", happenedStrategyCallsPerLevel=" + happenedStrategyCallsPerLevel + -// ", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel + -// ", dispatchedLevels" + dispatchedLevels + -// '}'; -// } -// -// -// public boolean dispatchIfNotDispatchedBefore(int level) { -// if (dispatchedLevels.contains(level)) { -// Assert.assertShouldNeverHappen("level " + level + " already dispatched"); -// return false; -// } -// dispatchedLevels.add(level); -// return true; -// } -// } -// -// public PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(ExecutionContext executionContext) { -// this.callStack = new CallStack(); -// this.executionContext = executionContext; -// } -// -// @Override -// public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { -// this.startedDeferredExecution.set(true); -// } -// -// @Override -// public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { -// if (this.startedDeferredExecution.get()) { -// return; -// } -// int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; -// increaseCallCounts(curLevel, parameters); -// } -// -// @Override -// public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { -// if (this.startedDeferredExecution.get()) { -// return; -// } -// int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1; -// increaseCallCounts(curLevel, parameters); -// } -// -// @Override -// public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList) { -// if (this.startedDeferredExecution.get()) { -// this.dispatch(); -// } -// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1); -// } -// -// @Override -// public void executionStrategyOnFieldValuesException(Throwable t) { -// callStack.lock.runLocked(() -> -// callStack.increaseHappenedOnFieldValueCalls(1) -// ); -// } -// -// @Override -// public void executeObjectOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { -// if (this.startedDeferredExecution.get()) { -// this.dispatch(); -// } -// int curLevel = parameters.getPath().getLevel() + 1; -// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel); -// } -// -// -// @Override -// public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { -// int curLevel = parameters.getPath().getLevel() + 1; -// callStack.lock.runLocked(() -> -// callStack.increaseHappenedOnFieldValueCalls(curLevel) -// ); -// } -// -// @Override -// public void fieldFetched(ExecutionContext executionContext, -// ExecutionStrategyParameters parameters, -// DataFetcher dataFetcher, -// Object fetchedValue, -// Supplier dataFetchingEnvironment) { -// -// final boolean dispatchNeeded; -// -// if (parameters.getField().isDeferred() || this.startedDeferredExecution.get()) { -// this.startedDeferredExecution.set(true); -// dispatchNeeded = true; -// } else { -// int level = parameters.getPath().getLevel(); -// dispatchNeeded = callStack.lock.callLocked(() -> { -// callStack.increaseFetchCount(level); -// return dispatchIfNeeded(level); -// }); -// } -// -// if (dispatchNeeded) { -// dispatch(); -// } -// -// } -// -// private void increaseCallCounts(int curLevel, ExecutionStrategyParameters parameters) { -// int count = 0; -// for (MergedField field : parameters.getFields().getSubFieldsList()) { -// if (!field.isDeferred()) { -// count++; -// } -// } -// int nonDeferredFieldCount = count; -// callStack.lock.runLocked(() -> { -// callStack.increaseExpectedFetchCount(curLevel, nonDeferredFieldCount); -// callStack.increaseHappenedStrategyCalls(curLevel); -// }); -// } -// -// private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, int curLevel) { -// boolean dispatchNeeded = callStack.lock.callLocked(() -> -// handleOnFieldValuesInfo(fieldValueInfoList, curLevel) -// ); -// if (dispatchNeeded) { -// dispatch(); -// } -// } -// -// // -// // thread safety: called with callStack.lock -// // -// private boolean handleOnFieldValuesInfo(List fieldValueInfos, int curLevel) { -// callStack.increaseHappenedOnFieldValueCalls(curLevel); -// int expectedStrategyCalls = getCountForList(fieldValueInfos); -// callStack.increaseExpectedStrategyCalls(curLevel + 1, expectedStrategyCalls); -// return dispatchIfNeeded(curLevel + 1); -// } -// -// private int getCountForList(List fieldValueInfos) { -// int result = 0; -// for (FieldValueInfo fieldValueInfo : fieldValueInfos) { -// if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.OBJECT) { -// result += 1; -// } else if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.LIST) { -// result += getCountForList(fieldValueInfo.getFieldValueInfos()); -// } -// } -// return result; -// } -// -// // -// // thread safety : called with callStack.lock -// // -// private boolean dispatchIfNeeded(int level) { -// boolean ready = levelReady(level); -// if (ready) { -// return callStack.dispatchIfNotDispatchedBefore(level); -// } -// return false; -// } -// -// // -// // thread safety: called with callStack.lock -// // -// private boolean levelReady(int level) { -// if (level == 1) { -// // level 1 is special: there is only one strategy call and that's it -// return callStack.allFetchesHappened(1); -// } -// if (levelReady(level - 1) && callStack.allOnFieldCallsHappened(level - 1) -// && callStack.allStrategyCallsHappened(level) && callStack.allFetchesHappened(level)) { -// -// return true; -// } -// return false; -// } -// -// void dispatch() { -// DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); -// dataLoaderRegistry.dispatchAll(); -// } -// -//} -// From fcf72612718048d893bda8d0e22a4b367b7d9869 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 10:19:10 +1000 Subject: [PATCH 06/14] more complex test --- .../dataloader/DeferWithDataLoaderTest.groovy | 158 +++++++++++++++++- 1 file changed, 155 insertions(+), 3 deletions(-) diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy index f6676bdf3e..cf87c238b7 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy @@ -2,11 +2,20 @@ package graphql.execution.instrumentation.dataloader import graphql.ExecutionInput import graphql.ExecutionResult +import graphql.ExperimentalApi import graphql.GraphQL +import graphql.TestUtil import graphql.incremental.IncrementalExecutionResult +import graphql.schema.DataFetcher +import org.awaitility.Awaitility +import org.dataloader.BatchLoader +import org.dataloader.DataLoaderFactory import org.dataloader.DataLoaderRegistry import spock.lang.Specification +import java.time.Duration +import java.util.concurrent.CompletableFuture + import static graphql.ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.combineExecutionResults import static graphql.execution.instrumentation.dataloader.DataLoaderPerformanceData.expectedData @@ -34,7 +43,7 @@ class DeferWithDataLoaderTest extends Specification { * @param results a list of the incremental results from the execution * @param expectedPaths a list of the expected paths in the incremental results. The order of the elements in the list is not important. */ - private static void assertIncrementalResults(List> results, List> expectedPaths) { + private static void assertIncrementalResults(List> results, List> expectedPaths, List expectedData = null) { assert results.size() == expectedPaths.size(), "Expected ${expectedPaths.size()} results, got ${results.size()}" assert results.dropRight(1).every { it.hasNext == true }, "Expected all but the last result to have hasNext=true" @@ -42,8 +51,12 @@ class DeferWithDataLoaderTest extends Specification { assert results.every { it.incremental.size() == 1 }, "Expected every result to have exactly one incremental item" - expectedPaths.each { path -> - assert results.any { it.incremental[0].path == path }, "Expected path $path not found in $results" + expectedPaths.eachWithIndex { path, index -> + def result = results.find { it.incremental[0].path == path } + assert result != null, "Expected path $path not found in $results" + if (expectedData != null) { + assert result.incremental[0].data == expectedData[index], "Expected data $expectedData[index] for path $path, got ${result.incremental[0].data}" + } } } @@ -335,4 +348,143 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1 } + def "dataloader in initial result and chained dataloader inside nested defer block"() { + given: + def sdl = ''' + type Query { + pets: [Pet] + } + + type Pet { + name: String + owner: Owner + } + type Owner { + name: String + address: String + } + + ''' + + def query = ''' + query { + pets { + name + ... @defer { + owner { + name + ... @defer { + address + } + } + } + } + } + ''' + + BatchLoader petNameBatchLoader = { List keys -> + println "petNameBatchLoader called with $keys" + assert keys.size() == 3 + return CompletableFuture.completedFuture(["Pet 1", "Pet 2", "Pet 3"]) + } + BatchLoader addressBatchLoader = { List keys -> + println "addressBatchLoader called with $keys" + assert keys.size() == 3 + return CompletableFuture.completedFuture(keys.collect { it -> + if (it == "owner-1") { + return "Address 1" + } else if (it == "owner-2") { + return "Address 2" + } else if (it == "owner-3") { + return "Address 3" + } + }) + } + + DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry() + def petNameDL = DataLoaderFactory.newDataLoader("petName", petNameBatchLoader) + def addressDL = DataLoaderFactory.newDataLoader("address", addressBatchLoader) + dataLoaderRegistry.register("petName", petNameDL) + dataLoaderRegistry.register("address", addressDL) + + DataFetcher petsDF = { env -> + return [ + [id: "pet-1"], + [id: "pet-2"], + [id: "pet-3"] + ] + } + DataFetcher petNameDF = { env -> + env.getDataLoader("petName").load(env.getSource().id) + } + + DataFetcher petOwnerDF = { env -> + String id = env.getSource().id + if (id == "pet-1") { + return [id: "owner-1", name: "Owner 1"] + } else if (id == "pet-2") { + return [id: "owner-2", name: "Owner 2"] + } else if (id == "pet-3") { + return [id: "owner-3", name: "Owner 3"] + } + } + DataFetcher ownerAddressDF = { env -> + return CompletableFuture.supplyAsync { + Thread.sleep(500) + return "foo" + }.thenCompose { + return env.getDataLoader("address").load(env.getSource().id) + } + .thenCompose { + return env.getDataLoader("address").load(env.getSource().id) + } + } + + def schema = TestUtil.schema(sdl, [Query: [pets: petsDF], + Pet : [name: petNameDF, owner: petOwnerDF], + Owner: [address: ownerAddressDF]]) + def graphQL = GraphQL.newGraphQL(schema).build() + def ei = ExecutionInput.newExecutionInput(query).dataLoaderRegistry(dataLoaderRegistry).build() + ei.getGraphQLContext().put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, true) + ei.getGraphQLContext().put(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING, true) + ei.getGraphQLContext().put(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS, Duration.ofSeconds(1).toNanos()) + + when: + CompletableFuture erCF = graphQL.executeAsync(ei) + Awaitility.await().until { erCF.isDone() } + def er = erCF.get() + + then: + er.toSpecification() == [data : [pets: [[name: "Pet 1"], [name: "Pet 2"], [name: "Pet 3"]]], + hasNext: true] + + when: + def incrementalResults = getIncrementalResults(er) + println "incrementalResults: $incrementalResults" + + then: + assertIncrementalResults(incrementalResults, + [ + ["pets", 0], ["pets", 1], ["pets", 2], + ["pets", 0, "owner"], ["pets", 1, "owner"], ["pets", 2, "owner"], + ], + [ + [owner: [name: "Owner 1"]], + [owner: [name: "Owner 2"]], + [owner: [name: "Owner 3"]], + [address: "Address 1"], + [address: "Address 2"], + [address: "Address 3"] + ] + ) + +// when: ] ) +// incrementalResults == [[hasNext: true, incremental: [[path: ["pets", 0], data: [owner: [name: "Owner 1"]]]]], [hasNext: true, incremental: [[path: ["pets", 1], data: [owner: [name: "Owner 2"]]]]], [hasNext: true, incremental: [[path: ["pets", 2], data: [owner: [name: "Owner 3"]]]]], [hasNext: true, incremental: [[path: ["pets", 0, "owner"], data: [address: "Address 1"]]]], [hasNext: true, incremental: [[path: ["pets", 1, "owner"], data: [address: "Address 2"]]]], [hasNext: false, incremental: [[path: ["pets", 2, "owner"], data: [address: "Address 3"]]]]] +// +// assertIncrementalResults(incrementalResults, +//// [ +// + + } + } From 17911d34f762154c855eace979545dde2858bb9e Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 10:24:43 +1000 Subject: [PATCH 07/14] cleanup --- .../graphql/execution/DataLoaderDispatchStrategy.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index 871fcb15b8..8799797d1f 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -59,12 +59,4 @@ default void fieldFetched(ExecutionContext executionContext, default DataFetcher modifyDataFetcher(DataFetcher dataFetcher) { return dataFetcher; } - - default void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { - - } - - default void startIncrementalCall() { - - } } From 5c64ad4e2cec77d2684f36c9edd009211262be40 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 10:39:38 +1000 Subject: [PATCH 08/14] cleanup and docs --- .../PerLevelDataLoaderDispatchStrategy.java | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 7ce27277a3..b48f641de6 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -56,10 +56,32 @@ private static class CallStack { private final LockKit.ReentrantLock lock = new LockKit.ReentrantLock(); /** - * A level is ready when all fields in this level are fetched - * The expected field fetch count is accurate when all execute object calls happened - * The expected execute object count is accurate when all sub selections fetched - * are done in the previous level + * A general overview of teh tracked data: + * There are three aspects tracked per level: + * - number of execute object calls (executeObject) + * - number of fetches + * - number of sub selections finished fetching + *

+ * The level for an execute object call is the level of the field in the query: for + * { a {b {c}}} the level of a is 1, b is 2 and c is not an object + *

+ * For fetches the level is the level of the field fetched + *

+ * For sub selections finished it is the level of the fields inside the sub selection: + * {a1 { b c} a2 } the level of {a1 a2} is 1, the level of {b c} is 2 + *

+ *

+ * A finished subselection means we can predict the number of execute object calls in the same level as the subselection: + * { a {x} b {y} } + * If a is a list of 3 objects and b is a list of 2 objects we expect 3 + 2 = 5 execute object calls on the level 1 to be happening + *

+ * An execute objects again means we can predict the number of fetches in the next level: + * Execute Object a with { a {f1 f2 f3} } means we expect 3 fetches on level 2. + *

+ * This means we know a level is ready to be dispatched if: + * - all subselections done in the parent level + * - all execute objects calls in the parent level are done + * - all expected fetched happened in the current level */ private final LevelMap expectedFetchCountPerLevel = new LevelMap(); @@ -244,12 +266,12 @@ private CallStack getCallStack(@Nullable DeferredCallContext deferredCallContext int startLevel = deferredCallContext.getStartLevel(); int fields = deferredCallContext.getFields(); callStack.lock.runLocked(() -> { + // we make sure that startLevel-1 is considered done callStack.expectedExecuteObjectCallsPerLevel.set(0, 0); // set to 1 in the constructor of CallStack callStack.expectedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); callStack.highestReadyLevel = startLevel - 1; callStack.increaseExpectedFetchCount(startLevel, fields); - // we make sure that startLevel-1 is considered done }); return callStack; }); From c8f4362e895a2e2cb5b01324cdca3a90596eaa5c Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 11:14:07 +1000 Subject: [PATCH 09/14] cleanup --- .../dataloader/PerLevelDataLoaderDispatchStrategy.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index b48f641de6..e8f198c231 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -220,10 +220,6 @@ public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) { this.enableDataLoaderChaining = graphQLContext.getBoolean(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING, false); } - @Override - public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { - throw new UnsupportedOperationException("Data Loaders cannot be used to resolve deferred fields"); - } @Override public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { From e0181e387e4f2ad1cfd8e495b2b34c5c5fd93e36 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 11:30:59 +1000 Subject: [PATCH 10/14] testing --- .../DeferExecutionSupportIntegrationTest.groovy | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy index 850634be91..b3b522d90b 100644 --- a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy @@ -27,6 +27,7 @@ import spock.lang.Specification import spock.lang.Unroll import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicInteger import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring @@ -1702,8 +1703,10 @@ class DeferExecutionSupportIntegrationTest extends Specification { } } ''' + + def batchLoaderCallCount = new AtomicInteger(0) when: - def initialResult = executeQuery(query) + def initialResult = executeQuery(query, true, [:], batchLoaderCallCount) then: initialResult.toSpecification() == [ @@ -1711,12 +1714,11 @@ class DeferExecutionSupportIntegrationTest extends Specification { hasNext: true ] - println "initialResult = $initialResult" when: def incrementalResults = getIncrementalResults(initialResult) then: - + batchLoaderCallCount.get() == 1 incrementalResults.size() == 1 incrementalResults[0] == [incremental: [[path: ["post"], data: [fieldWithDataLoader1: "1001-fieldWithDataLoader1", fieldWithDataLoader2: "1001-fieldWithDataLoader2"]]], hasNext : false @@ -1733,9 +1735,11 @@ class DeferExecutionSupportIntegrationTest extends Specification { return this.executeQuery(query, true, variables) } - private ExecutionResult executeQuery(String query, boolean incrementalSupport, Map variables) { + private ExecutionResult executeQuery(String query, boolean incrementalSupport, Map variables, AtomicInteger batchLoaderCallCount = null) { BatchLoader batchLoader = { keys -> - println "batchlaoder called with keys $keys" + if (batchLoaderCallCount != null) { + batchLoaderCallCount.incrementAndGet() + } return CompletableFuture.completedFuture(keys) } DataLoader dl = DataLoaderFactory.newDataLoader(batchLoader) From f345f147071ba7b14e3638b24a924b7041679987 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 11:32:11 +1000 Subject: [PATCH 11/14] cleanup --- .../dataloader/DeferWithDataLoaderTest.groovy | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy index cf87c238b7..5427f7e504 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy @@ -478,13 +478,6 @@ class DeferWithDataLoaderTest extends Specification { ] ) -// when: ] ) -// incrementalResults == [[hasNext: true, incremental: [[path: ["pets", 0], data: [owner: [name: "Owner 1"]]]]], [hasNext: true, incremental: [[path: ["pets", 1], data: [owner: [name: "Owner 2"]]]]], [hasNext: true, incremental: [[path: ["pets", 2], data: [owner: [name: "Owner 3"]]]]], [hasNext: true, incremental: [[path: ["pets", 0, "owner"], data: [address: "Address 1"]]]], [hasNext: true, incremental: [[path: ["pets", 1, "owner"], data: [address: "Address 2"]]]], [hasNext: false, incremental: [[path: ["pets", 2, "owner"], data: [address: "Address 3"]]]]] -// -// assertIncrementalResults(incrementalResults, -//// [ -// - } } From c12be775723a6f73d04cc0effb295127e9ae0f31 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 11:35:19 +1000 Subject: [PATCH 12/14] cleanup --- .../graphql/execution/incremental/DeferredCallContext.java | 3 ++- .../java/graphql/schema/DataFetchingEnvironmentImpl.java | 5 +---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/graphql/execution/incremental/DeferredCallContext.java b/src/main/java/graphql/execution/incremental/DeferredCallContext.java index 638577f729..e7e2ec0658 100644 --- a/src/main/java/graphql/execution/incremental/DeferredCallContext.java +++ b/src/main/java/graphql/execution/incremental/DeferredCallContext.java @@ -22,6 +22,8 @@ public class DeferredCallContext { private final int startLevel; private final int fields; + private final List errors = new CopyOnWriteArrayList<>(); + public DeferredCallContext(int startLevel, int fields) { this.startLevel = startLevel; this.fields = fields; @@ -41,7 +43,6 @@ public int getFields() { return fields; } - private final List errors = new CopyOnWriteArrayList<>(); public void addErrors(List errors) { this.errors.addAll(errors); diff --git a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java index 20d3bbdb8c..dc9c3776cd 100644 --- a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java +++ b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java @@ -106,10 +106,7 @@ public static Builder newDataFetchingEnvironment(ExecutionContext executionConte .operationDefinition(executionContext.getOperationDefinition()) .variables(executionContext.getCoercedVariables().toMap()) .executionId(executionContext.getExecutionId()) - .dataLoaderDispatchStrategy(executionContext.getDataLoaderDispatcherStrategy()) - ; - - + .dataLoaderDispatchStrategy(executionContext.getDataLoaderDispatcherStrategy()); } @Override From aa206d79b2d3430b8c74ed2d7c9afc5e34209156 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 23 May 2025 12:26:54 +1000 Subject: [PATCH 13/14] cleanup --- .../execution/incremental/IncrementalCallStateDeferTest.groovy | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy b/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy index a8ded32a3a..d99b49fae4 100644 --- a/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy @@ -34,7 +34,6 @@ class IncrementalCallStateDeferTest extends Specification { results[2].incremental[0].data["a"] == "A" } - // flaky def "calls within calls are enqueued correctly"() { given: def incrementalCallState = new IncrementalCallState() From a7cc7947042e592aca3f7e4e426b5b62f7567a7a Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Mon, 26 May 2025 10:36:49 +1000 Subject: [PATCH 14/14] PR feedback --- .../dataloader/PerLevelDataLoaderDispatchStrategy.java | 6 +++--- .../dataloader/BatchCompareDataFetchers.java | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index e8f198c231..ee80d4aa61 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -48,7 +48,7 @@ public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStr static final long DEFAULT_BATCH_WINDOW_NANO_SECONDS_DEFAULT = 500_000L; - private final Map callStackMap = new ConcurrentHashMap<>(); + private final Map deferredCallStackMap = new ConcurrentHashMap<>(); private static class CallStack { @@ -75,7 +75,7 @@ private static class CallStack { * { a {x} b {y} } * If a is a list of 3 objects and b is a list of 2 objects we expect 3 + 2 = 5 execute object calls on the level 1 to be happening *

- * An execute objects again means we can predict the number of fetches in the next level: + * An executed object call again means we can predict the number of fetches in the next level: * Execute Object a with { a {f1 f2 f3} } means we expect 3 fetches on level 2. *

* This means we know a level is ready to be dispatched if: @@ -257,7 +257,7 @@ private CallStack getCallStack(@Nullable DeferredCallContext deferredCallContext if (deferredCallContext == null) { return this.initialCallStack; } else { - return callStackMap.computeIfAbsent(deferredCallContext, k -> { + return deferredCallStackMap.computeIfAbsent(deferredCallContext, k -> { CallStack callStack = new CallStack(); int startLevel = deferredCallContext.getStartLevel(); int fields = deferredCallContext.getFields(); diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java b/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java index db5989dec9..08edd13248 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java @@ -83,7 +83,6 @@ private static List> getDepartmentsForShops(List shops) { private BatchLoader> departmentsForShopsBatchLoader = ids -> maybeAsyncWithSleep(() -> { - System.out.println("departments for shops batch loader called with ids: " + ids); departmentsForShopsBatchLoaderCounter.incrementAndGet(); List shopList = new ArrayList<>(); for (String id : ids) { @@ -129,7 +128,6 @@ private static List> getProductsForDepartments(List de } private BatchLoader> productsForDepartmentsBatchLoader = ids -> maybeAsyncWithSleep(() -> { - System.out.println("products for deparments batch loader called with ids: " + ids); productsForDepartmentsBatchLoaderCounter.incrementAndGet(); List d = ids.stream().map(departments::get).collect(Collectors.toList()); return completedFuture(getProductsForDepartments(d));