diff --git a/src/jmh/java/performance/DataLoaderPerformance.java b/src/jmh/java/performance/DataLoaderPerformance.java index 98a365c2d..83a62b507 100644 --- a/src/jmh/java/performance/DataLoaderPerformance.java +++ b/src/jmh/java/performance/DataLoaderPerformance.java @@ -35,7 +35,7 @@ @State(Scope.Benchmark) @Warmup(iterations = 2, time = 5) -@Measurement(iterations = 3) +@Measurement(iterations = 5) @Fork(2) public class DataLoaderPerformance { @@ -481,25 +481,17 @@ public Pet(String id, String name, String ownerId, List friendsIds) { static BatchLoader ownerBatchLoader = list -> { +// System.out.println("OwnerBatchLoader with " + list.size() ); List collect = list.stream().map(key -> { Owner owner = owners.get(key); - try { - Thread.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } return owner; }).collect(Collectors.toList()); return CompletableFuture.completedFuture(collect); }; static BatchLoader petBatchLoader = list -> { +// System.out.println("PetBatchLoader with list: " + list.size()); List collect = list.stream().map(key -> { Pet owner = pets.get(key); - try { - Thread.sleep(5); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } return owner; }).collect(Collectors.toList()); return CompletableFuture.completedFuture(collect); @@ -520,9 +512,6 @@ public void setup() { try { String sdl = PerformanceTestingUtils.loadResource("dataLoaderPerformanceSchema.graphqls"); - - DataLoaderRegistry registry = new DataLoaderRegistry(); - DataFetcher ownersDF = (env -> { // Load all 103 owners (O-1 through O-103) List allOwnerIds = List.of( @@ -542,20 +531,20 @@ public void setup() { }); DataFetcher petsDf = (env -> { Owner owner = env.getSource(); - return env.getDataLoader(petDLName).loadMany((List) owner.petIds); -// .thenCompose((result) -> CompletableFuture.supplyAsync(() -> null).thenApply((__) -> result)); + return env.getDataLoader(petDLName).loadMany((List) owner.petIds) + .thenCompose((result) -> CompletableFuture.supplyAsync(() -> null).thenApply((__) -> result)); }); DataFetcher petFriendsDF = (env -> { Pet pet = env.getSource(); - return env.getDataLoader(petDLName).loadMany((List) pet.friendsIds); -// .thenCompose((result) -> CompletableFuture.supplyAsync(() -> null).thenApply((__) -> result)); + return env.getDataLoader(petDLName).loadMany((List) pet.friendsIds) + .thenCompose((result) -> CompletableFuture.supplyAsync(() -> null).thenApply((__) -> result)); }); DataFetcher petOwnerDF = (env -> { Pet pet = env.getSource(); - return env.getDataLoader(ownerDLName).load(pet.ownerId); -// .thenCompose((result) -> CompletableFuture.supplyAsync(() -> null).thenApply((__) -> result)); + return env.getDataLoader(ownerDLName).load(pet.ownerId) + .thenCompose((result) -> CompletableFuture.supplyAsync(() -> null).thenApply((__) -> result)); }); @@ -598,6 +587,7 @@ public void executeRequestWithDataLoaders(MyState myState, Blackhole blackhole) // .profileExecution(true) .build(); executionInput.getGraphQLContext().put(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING, true); +// executionInput.getGraphQLContext().put(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_EXHAUSTED_DISPATCHING, true); ExecutionResult execute = myState.graphQL.execute(executionInput); // ProfilerResult profilerResult = executionInput.getGraphQLContext().get(ProfilerResult.PROFILER_CONTEXT_KEY); // System.out.println("execute: " + execute); diff --git a/src/main/java/graphql/execution/Async.java b/src/main/java/graphql/execution/Async.java index aefe80595..f26834734 100644 --- a/src/main/java/graphql/execution/Async.java +++ b/src/main/java/graphql/execution/Async.java @@ -11,14 +11,17 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static graphql.Assert.assertTrue; +import static java.util.stream.Collectors.toList; @Internal @SuppressWarnings("FutureReturnValueIgnored") @@ -408,4 +411,24 @@ public static CompletableFuture exceptionallyCompletedFuture(Throwable ex public static @NonNull CompletableFuture orNullCompletedFuture(@Nullable CompletableFuture completableFuture) { return completableFuture != null ? completableFuture : CompletableFuture.completedFuture(null); } + + public static CompletableFuture> allOf(List> cfs) { + return CompletableFuture.allOf(cfs.toArray(CompletableFuture[]::new)) + .thenApply(v -> cfs.stream() + .map(CompletableFuture::join) + .collect(toList()) + ); + } + + public static CompletableFuture> allOf(Map> cfs) { + return CompletableFuture.allOf(cfs.values().toArray(CompletableFuture[]::new)) + .thenApply(v -> cfs.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + task -> task.getValue().join()) + ) + ); + } + } diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index a8354a6a7..39d4b0870 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -448,6 +448,7 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec .selectionSet(fieldCollector) .queryDirectives(queryDirectives) .deferredCallContext(parameters.getDeferredCallContext()) + .level(parameters.getPath().getLevel()) .build(); }); diff --git a/src/main/java/graphql/execution/ResultPath.java b/src/main/java/graphql/execution/ResultPath.java index 3c12b559e..3edb86558 100644 --- a/src/main/java/graphql/execution/ResultPath.java +++ b/src/main/java/graphql/execution/ResultPath.java @@ -44,8 +44,8 @@ public static ResultPath rootPath() { private ResultPath() { parent = null; segment = null; - this.toStringValue = initString(); this.level = 0; + this.toStringValue = initString(); } private ResultPath(ResultPath parent, String segment) { diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 221219171..61b48b2bc 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -46,18 +46,18 @@ private static class ChainedDLStack { // a state for level points to a previous one // all the invocations that are linked together are the relevant invocations for the next dispatch private static class StateForLevel { - final @Nullable DataLoaderInvocation dataLoaderInvocation; + final @Nullable DataLoader dataLoader; final boolean dispatchingStarted; final boolean dispatchingFinished; final boolean currentlyDelayedDispatching; final @Nullable StateForLevel prev; - public StateForLevel(@Nullable DataLoaderInvocation dataLoaderInvocation, + public StateForLevel(@Nullable DataLoader dataLoader, boolean dispatchingStarted, boolean dispatchingFinished, boolean currentlyDelayedDispatching, @Nullable StateForLevel prev) { - this.dataLoaderInvocation = dataLoaderInvocation; + this.dataLoader = dataLoader; this.dispatchingStarted = dispatchingStarted; this.dispatchingFinished = dispatchingFinished; this.currentlyDelayedDispatching = currentlyDelayedDispatching; @@ -91,7 +91,7 @@ public StateForLevel(@Nullable DataLoaderInvocation dataLoaderInvocation, } } - if (currentState == null || currentState.dataLoaderInvocation == null) { + if (currentState == null || currentState.dataLoader == null) { if (normalDispatchOrDelayed) { dispatchingFinished = true; } else { @@ -108,8 +108,7 @@ public StateForLevel(@Nullable DataLoaderInvocation dataLoaderInvocation, } - public boolean newDataLoaderInvocation(DataLoaderInvocation dataLoaderInvocation) { - int level = dataLoaderInvocation.level; + public boolean newDataLoaderInvocation(int level, DataLoader dataLoader) { AtomicReference<@Nullable StateForLevel> currentStateRef = stateMapPerLevel.computeIfAbsent(level, __ -> new AtomicReference<>()); while (true) { StateForLevel currentState = currentStateRef.get(); @@ -132,7 +131,7 @@ public boolean newDataLoaderInvocation(DataLoaderInvocation dataLoaderInvocation currentlyDelayedDispatching = true; } - StateForLevel newState = new StateForLevel(dataLoaderInvocation, dispatchingStarted, dispatchingFinished, currentlyDelayedDispatching, currentState); + StateForLevel newState = new StateForLevel(dataLoader, dispatchingStarted, dispatchingFinished, currentlyDelayedDispatching, currentState); if (currentStateRef.compareAndSet(currentState, newState)) { return newDelayedInvocation; @@ -487,20 +486,14 @@ private void dispatchAll(DataLoaderRegistry dataLoaderRegistry, int level) { private void dispatchDLCFImpl(Integer level, CallStack callStack, boolean normalOrDelayed, boolean chained) { ChainedDLStack.StateForLevel stateForLevel = callStack.chainedDLStack.aboutToStartDispatching(level, normalOrDelayed, chained); - if (stateForLevel == null || stateForLevel.dataLoaderInvocation == null) { + if (stateForLevel == null || stateForLevel.dataLoader == null) { return; } List allDispatchedCFs = new ArrayList<>(); - while (stateForLevel != null && stateForLevel.dataLoaderInvocation != null) { - final DataLoaderInvocation invocation = stateForLevel.dataLoaderInvocation; - CompletableFuture dispatch = invocation.dataLoader.dispatch(); + while (stateForLevel != null && stateForLevel.dataLoader != null) { + CompletableFuture dispatch = stateForLevel.dataLoader.dispatch(); allDispatchedCFs.add(dispatch); - dispatch.whenComplete((objects, throwable) -> { - if (objects != null && objects.size() > 0) { - profiler.batchLoadedNewStrategy(invocation.name, level, objects.size(), !normalOrDelayed, chained); - } - }); stateForLevel = stateForLevel.prev; } CompletableFuture.allOf(allDispatchedCFs.toArray(new CompletableFuture[0])) @@ -512,51 +505,19 @@ private void dispatchDLCFImpl(Integer level, CallStack callStack, boolean normal } - public void newDataLoaderInvocation(String resultPath, - int level, + public void newDataLoaderInvocation(int level, DataLoader dataLoader, - String dataLoaderName, - Object key, @Nullable AlternativeCallContext alternativeCallContext) { if (!enableDataLoaderChaining) { return; } - DataLoaderInvocation dataLoaderInvocation = new DataLoaderInvocation(resultPath, level, dataLoader, dataLoaderName, key); CallStack callStack = getCallStack(alternativeCallContext); - boolean newDelayedInvocation = callStack.chainedDLStack.newDataLoaderInvocation(dataLoaderInvocation); + boolean newDelayedInvocation = callStack.chainedDLStack.newDataLoaderInvocation(level, dataLoader); if (newDelayedInvocation) { dispatchDLCFImpl(level, callStack, false, false); } } - /** - * A single data loader invocation. - */ - private static class DataLoaderInvocation { - final String resultPath; - final int level; - final DataLoader dataLoader; - final String name; - final Object key; - - public DataLoaderInvocation(String resultPath, int level, DataLoader dataLoader, String name, Object key) { - this.resultPath = resultPath; - this.level = level; - this.dataLoader = dataLoader; - this.name = name; - this.key = key; - } - - @Override - public String toString() { - return "ResultPathWithDataLoader{" + - "resultPath='" + resultPath + '\'' + - ", level=" + level + - ", key=" + key + - ", name='" + name + '\'' + - '}'; - } - } } diff --git a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java index 82f8bf1a4..a3a75f457 100644 --- a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java +++ b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java @@ -60,6 +60,7 @@ public class DataFetchingEnvironmentImpl implements DataFetchingEnvironment { private final Document document; private final ImmutableMapWithNullValues variables; private final QueryDirectives queryDirectives; + private final int level; // used for internal() method private final DFEInternalState dfeInternalState; @@ -86,6 +87,7 @@ private DataFetchingEnvironmentImpl(Builder builder) { this.document = builder.document; this.variables = builder.variables == null ? ImmutableMapWithNullValues.emptyMap() : builder.variables; this.queryDirectives = builder.queryDirectives; + this.level = builder.level; // internal state this.dfeInternalState = new DFEInternalState(builder.dataLoaderDispatchStrategy, builder.alternativeCallContext, builder.profiler); @@ -278,6 +280,10 @@ public String toString() { '}'; } + public int getLevel() { + return level; + } + @NullUnmarked public static class Builder { @@ -305,6 +311,7 @@ public static class Builder { private DataLoaderDispatchStrategy dataLoaderDispatchStrategy; private Profiler profiler; private AlternativeCallContext alternativeCallContext; + private int level; public Builder(DataFetchingEnvironmentImpl env) { this.source = env.source; @@ -331,6 +338,7 @@ public Builder(DataFetchingEnvironmentImpl env) { this.dataLoaderDispatchStrategy = env.dfeInternalState.dataLoaderDispatchStrategy; this.profiler = env.dfeInternalState.profiler; this.alternativeCallContext = env.dfeInternalState.alternativeCallContext; + this.level = env.level; } public Builder() { @@ -468,6 +476,11 @@ public Builder profiler(Profiler profiler) { this.profiler = profiler; return this; } + + public Builder level(int level) { + this.level = level; + return this; + } } @Internal diff --git a/src/main/java/graphql/schema/DataLoaderWithContext.java b/src/main/java/graphql/schema/DataLoaderWithContext.java index 8090e0325..329412938 100644 --- a/src/main/java/graphql/schema/DataLoaderWithContext.java +++ b/src/main/java/graphql/schema/DataLoaderWithContext.java @@ -1,6 +1,7 @@ package graphql.schema; import graphql.Internal; +import graphql.execution.Async; import graphql.execution.incremental.AlternativeCallContext; import graphql.execution.instrumentation.dataloader.ExhaustedDataLoaderDispatchStrategy; import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy; @@ -10,9 +11,14 @@ import org.jspecify.annotations.NullMarked; import org.jspecify.annotations.Nullable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import static graphql.Assert.assertNotNull; + @Internal @NullMarked public class DataLoaderWithContext extends DelegatingDataLoader { @@ -30,30 +36,65 @@ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { // calling super.load() is important, because otherwise the data loader will sometimes called // later than the dispatch, which results in a hanging DL CompletableFuture result = super.load(key, keyContext); + newDataLoaderInvocation(); + return result; + } + + + @Override + public CompletableFuture> loadMany(List keys, List keyContexts) { + assertNotNull(keys); + assertNotNull(keyContexts); + + CompletableFuture> result; + synchronized (this) { + List> collect = new ArrayList<>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + K key = keys.get(i); + Object keyContext = null; + if (i < keyContexts.size()) { + keyContext = keyContexts.get(i); + } + collect.add(delegate.load(key, keyContext)); + } + result = Async.allOf(collect); + } + newDataLoaderInvocation(); + return result; + } + + @Override + public CompletableFuture> loadMany(Map keysAndContexts) { + assertNotNull(keysAndContexts); + + CompletableFuture> result; + synchronized (this) { + Map> collect = new HashMap<>(keysAndContexts.size()); + for (Map.Entry entry : keysAndContexts.entrySet()) { + K key = entry.getKey(); + Object keyContext = entry.getValue(); + collect.put(key, delegate.load(key, keyContext)); + } + result = Async.allOf(collect); + } + newDataLoaderInvocation(); + return result; + } + + private void newDataLoaderInvocation() { DataFetchingEnvironmentImpl dfeImpl = (DataFetchingEnvironmentImpl) dfe; DataFetchingEnvironmentImpl.DFEInternalState dfeInternalState = (DataFetchingEnvironmentImpl.DFEInternalState) dfeImpl.toInternal(); - dfeInternalState.getProfiler().dataLoaderUsed(dataLoaderName); if (dfeInternalState.getDataLoaderDispatchStrategy() instanceof PerLevelDataLoaderDispatchStrategy) { AlternativeCallContext alternativeCallContext = dfeInternalState.getDeferredCallContext(); - int level = dfe.getExecutionStepInfo().getPath().getLevel(); - String path = dfe.getExecutionStepInfo().getPath().toString(); - ((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderInvocation(path, level, delegate, dataLoaderName, key, alternativeCallContext); + int level = dfeImpl.getLevel(); + ((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderInvocation(level, delegate, alternativeCallContext); } else if (dfeInternalState.getDataLoaderDispatchStrategy() instanceof ExhaustedDataLoaderDispatchStrategy) { AlternativeCallContext alternativeCallContext = dfeInternalState.getDeferredCallContext(); ((ExhaustedDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderInvocation(alternativeCallContext); } - return result; } - @Override - public CompletableFuture> dispatch() { - CompletableFuture> dispatchResult = delegate.dispatch(); - dispatchResult.whenComplete((result, error) -> { - if (result != null && result.size() > 0) { - DataFetchingEnvironmentImpl.DFEInternalState dfeInternalState = (DataFetchingEnvironmentImpl.DFEInternalState) dfe.toInternal(); - dfeInternalState.getProfiler().manualDispatch(dataLoaderName, dfe.getExecutionStepInfo().getPath().getLevel(), result.size()); - } - }); - return dispatchResult; - } + + + } diff --git a/src/test/groovy/graphql/ProfilerTest.groovy b/src/test/groovy/graphql/ProfilerTest.groovy index 96cc2098c..d047edcc7 100644 --- a/src/test/groovy/graphql/ProfilerTest.groovy +++ b/src/test/groovy/graphql/ProfilerTest.groovy @@ -14,6 +14,7 @@ import org.dataloader.BatchLoader import org.dataloader.DataLoader import org.dataloader.DataLoaderFactory import org.dataloader.DataLoaderRegistry +import spock.lang.Ignore import spock.lang.Specification import java.time.Duration @@ -176,6 +177,7 @@ class ProfilerTest extends Specification { } + @Ignore("not available for performance reasons at the moment") def "manual dataloader dispatch"() { given: def sdl = ''' @@ -230,6 +232,7 @@ class ProfilerTest extends Specification { } + @Ignore("not available for performance reasons at the moment") def "cached dataloader values"() { given: def sdl = ''' @@ -517,6 +520,7 @@ class ProfilerTest extends Specification { } + @Ignore("not available for performance reasons at the moment") def "dataloader usage"() { given: def sdl = '''