mirror of
https://github.com/jlengrand/error-prone-support.git
synced 2026-03-10 08:11:25 +00:00
Have FluxFlatMapUsage better handle nested Publishers (#224)
This commit is contained in:
@@ -5,6 +5,10 @@ import static com.google.errorprone.BugPattern.SeverityLevel.ERROR;
|
||||
import static com.google.errorprone.BugPattern.StandardTags.LIKELY_ERROR;
|
||||
import static com.google.errorprone.matchers.method.MethodMatchers.instanceMethod;
|
||||
import static tech.picnic.errorprone.bugpatterns.util.Documentation.BUG_PATTERNS_BASE_URL;
|
||||
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.generic;
|
||||
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.subOf;
|
||||
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.type;
|
||||
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.unbound;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import com.google.common.collect.Iterables;
|
||||
@@ -17,11 +21,14 @@ import com.google.errorprone.fixes.SuggestedFix;
|
||||
import com.google.errorprone.fixes.SuggestedFixes;
|
||||
import com.google.errorprone.matchers.Description;
|
||||
import com.google.errorprone.matchers.Matcher;
|
||||
import com.google.errorprone.suppliers.Supplier;
|
||||
import com.google.errorprone.suppliers.Suppliers;
|
||||
import com.google.errorprone.util.ASTHelpers;
|
||||
import com.sun.source.tree.ExpressionTree;
|
||||
import com.sun.source.tree.MemberReferenceTree;
|
||||
import com.sun.source.tree.MethodInvocationTree;
|
||||
import com.sun.tools.javac.code.Type;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
/**
|
||||
@@ -33,11 +40,12 @@ import reactor.core.publisher.Flux;
|
||||
* former interleaves values as they are emitted, yielding nondeterministic results. In most cases
|
||||
* {@link Flux#concatMap(Function)} should be preferred, as it produces consistent results and
|
||||
* avoids potentially saturating the thread pool on which subscription happens. If {@code
|
||||
* concatMap}'s single-subscription semantics are undesirable one should invoke a {@code flatMap} or
|
||||
* {@code flatMapSequential} overload with an explicit concurrency level.
|
||||
* concatMap}'s sequential-subscription semantics are undesirable one should invoke a {@code
|
||||
* flatMap} or {@code flatMapSequential} overload with an explicit concurrency level.
|
||||
*
|
||||
* <p>NB: The rarely-used overload {@link Flux#flatMap(Function, Function, Supplier)} is not flagged
|
||||
* by this check because there is no clear alternative to point to.
|
||||
* <p>NB: The rarely-used overload {@link Flux#flatMap(Function, Function,
|
||||
* java.util.function.Supplier)} is not flagged by this check because there is no clear alternative
|
||||
* to point to.
|
||||
*/
|
||||
@AutoService(BugChecker.class)
|
||||
@BugPattern(
|
||||
@@ -52,11 +60,16 @@ public final class FluxFlatMapUsage extends BugChecker
|
||||
implements MethodInvocationTreeMatcher, MemberReferenceTreeMatcher {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final String MAX_CONCURRENCY_ARG_NAME = "MAX_CONCURRENCY";
|
||||
private static final Supplier<Type> FLUX =
|
||||
Suppliers.typeFromString("reactor.core.publisher.Flux");
|
||||
private static final Matcher<ExpressionTree> FLUX_FLATMAP =
|
||||
instanceMethod()
|
||||
.onDescendantOf("reactor.core.publisher.Flux")
|
||||
.onDescendantOf(FLUX)
|
||||
.namedAnyOf("flatMap", "flatMapSequential")
|
||||
.withParameters(Function.class.getName());
|
||||
private static final Supplier<Type> FLUX_OF_PUBLISHERS =
|
||||
VisitorState.memoize(
|
||||
generic(FLUX, subOf(generic(type("org.reactivestreams.Publisher"), unbound()))));
|
||||
|
||||
/** Instantiates a new {@link FluxFlatMapUsage} instance. */
|
||||
public FluxFlatMapUsage() {}
|
||||
@@ -67,14 +80,27 @@ public final class FluxFlatMapUsage extends BugChecker
|
||||
return Description.NO_MATCH;
|
||||
}
|
||||
|
||||
return buildDescription(tree)
|
||||
.addFix(SuggestedFixes.renameMethodInvocation(tree, "concatMap", state))
|
||||
.addFix(
|
||||
SuggestedFix.builder()
|
||||
.postfixWith(
|
||||
Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME)
|
||||
.build())
|
||||
.build();
|
||||
SuggestedFix serializationFix = SuggestedFixes.renameMethodInvocation(tree, "concatMap", state);
|
||||
SuggestedFix concurrencyCapFix =
|
||||
SuggestedFix.builder()
|
||||
.postfixWith(
|
||||
Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME)
|
||||
.build();
|
||||
|
||||
Description.Builder description = buildDescription(tree);
|
||||
|
||||
if (state.getTypes().isSubtype(ASTHelpers.getType(tree), FLUX_OF_PUBLISHERS.get(state))) {
|
||||
/*
|
||||
* Nested publishers may need to be subscribed to eagerly in order to avoid a deadlock, e.g.
|
||||
* if they are produced by `Flux#groupBy`. In this case we suggest specifying an explicit
|
||||
* concurrently bound, in favour of sequential subscriptions using `Flux#concatMap`.
|
||||
*/
|
||||
description.addFix(concurrencyCapFix).addFix(serializationFix);
|
||||
} else {
|
||||
description.addFix(serializationFix).addFix(concurrencyCapFix);
|
||||
}
|
||||
|
||||
return description.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -4,6 +4,7 @@ import static com.google.errorprone.BugCheckerRefactoringTestHelper.newInstance;
|
||||
|
||||
import com.google.errorprone.BugCheckerRefactoringTestHelper;
|
||||
import com.google.errorprone.BugCheckerRefactoringTestHelper.FixChoosers;
|
||||
import com.google.errorprone.BugCheckerRefactoringTestHelper.TestMode;
|
||||
import com.google.errorprone.CompilationTestHelper;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -33,6 +34,14 @@ final class FluxFlatMapUsageTest {
|
||||
" Flux.just(1).flatMapSequential(Flux::just);",
|
||||
" // BUG: Diagnostic contains:",
|
||||
" Flux.just(1).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));",
|
||||
" // BUG: Diagnostic contains:",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);",
|
||||
" // BUG: Diagnostic contains:",
|
||||
" Flux.just(1, 2).groupBy(i -> i).<String>flatMap(i -> Flux.just(String.valueOf(i)));",
|
||||
" // BUG: Diagnostic contains:",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);",
|
||||
" // BUG: Diagnostic contains:",
|
||||
" Flux.just(1, 2).groupBy(i -> i).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));",
|
||||
"",
|
||||
" Mono.just(1).flatMap(Mono::just);",
|
||||
" Flux.just(1).concatMap(Flux::just);",
|
||||
@@ -71,9 +80,13 @@ final class FluxFlatMapUsageTest {
|
||||
"import reactor.core.publisher.Flux;",
|
||||
"",
|
||||
"class A {",
|
||||
" private static final int MAX_CONCURRENCY = 8;",
|
||||
"",
|
||||
" void m() {",
|
||||
" Flux.just(1).flatMap(Flux::just);",
|
||||
" Flux.just(1).flatMapSequential(Flux::just);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);",
|
||||
" }",
|
||||
"}")
|
||||
.addOutputLines(
|
||||
@@ -81,12 +94,16 @@ final class FluxFlatMapUsageTest {
|
||||
"import reactor.core.publisher.Flux;",
|
||||
"",
|
||||
"class A {",
|
||||
" private static final int MAX_CONCURRENCY = 8;",
|
||||
"",
|
||||
" void m() {",
|
||||
" Flux.just(1).concatMap(Flux::just);",
|
||||
" Flux.just(1).concatMap(Flux::just);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just, MAX_CONCURRENCY);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just, MAX_CONCURRENCY);",
|
||||
" }",
|
||||
"}")
|
||||
.doTest();
|
||||
.doTest(TestMode.TEXT_MATCH);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -103,6 +120,8 @@ final class FluxFlatMapUsageTest {
|
||||
" void m() {",
|
||||
" Flux.just(1).flatMap(Flux::just);",
|
||||
" Flux.just(1).flatMapSequential(Flux::just);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);",
|
||||
" }",
|
||||
"}")
|
||||
.addOutputLines(
|
||||
@@ -115,8 +134,10 @@ final class FluxFlatMapUsageTest {
|
||||
" void m() {",
|
||||
" Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);",
|
||||
" Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);",
|
||||
" Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);",
|
||||
" }",
|
||||
"}")
|
||||
.doTest();
|
||||
.doTest(TestMode.TEXT_MATCH);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user