Pair programming

This commit is contained in:
Stephan Schroevers
2021-09-23 11:21:13 +02:00
parent 702987e2ca
commit 5db4e30140
6 changed files with 278 additions and 27 deletions

View File

@@ -39,6 +39,12 @@
<artifactId>error_prone_test_helpers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>migration-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>refaster-resource-compiler</artifactId>
@@ -191,12 +197,6 @@
<artifactId>testng</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>tech.picnic.error-prone-support</groupId>
<artifactId>migration-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View File

@@ -26,7 +26,6 @@ import java.util.concurrent.Callable;
import org.reactivestreams.Publisher;
import reactor.adapter.rxjava.RxJava2Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import tech.picnic.errorprone.migration.util.RxJavaReactorMigrationUtil;
@@ -729,6 +728,43 @@ final class RxJavaFlowableToReactorTemplates {
}
}
// // XXX: Delete this/move it.
// static final class XXXv1<I, O> {
// @BeforeTemplate
// Function<I, O> before(Function<I, O> function) {
// return i -> function.apply(i);
// }
//
// @BeforeTemplate
// Function<I, O> before2(Function<I, O> function) {
// return function::apply;
// }
//
// @AfterTemplate
// Function<I, O> after(Function<I, O> function) {
// return function;
// }
// }
//
// // XXX: Delete this/move it.
// static final class XXXv2<I, O> {
// @BeforeTemplate
// // Or: @LambdaExprOrMethodReferenceReceiverEnsuresType
// Function<I, O> before(Function<I, O> function) {
// return Refaster.<Function<I, O>>receiverEnsuresType(i -> function.apply(i));
// }
//
// @BeforeTemplate
// Function<I, O> before2(Function<I, O> function) {
// return function::apply;
// }
//
// @AfterTemplate
// Function<I, O> after(Function<I, O> function) {
// return function;
// }
// }
// XXX: final Completable flatMapCompletable(Function,boolean,int)
// XXX: Test this one. Doesnt pick up one in bad-word-service.
@@ -761,18 +797,18 @@ final class RxJavaFlowableToReactorTemplates {
// XXX: final Disposable forEachWhile(Predicate,Consumer)
// XXX: final Disposable forEachWhile(Predicate,Consumer,Action)
// XXX: Test this, and improve this, the GroupedType is not working... Hacked the GroupedFlux.
static final class FlowableGroupBy<K, V, T> {
static final class FlowableGroupBy<K, V> {
@BeforeTemplate
Flowable<GroupedFlowable<K, T>> before(Flowable<T> flowable, Function<T, K> keySelector) {
Flowable<GroupedFlowable<K, V>> before(Flowable<V> flowable, Function<V, K> keySelector) {
return flowable.groupBy(keySelector);
}
@AfterTemplate
Flowable<GroupedFlux<K, T>> after(Flowable<T> flowable, Function<T, K> keySelector) {
Flowable<GroupedFlowable<K, V>> after(Flowable<V> flowable, Function<V, K> keySelector) {
return RxJava2Adapter.fluxToFlowable(
RxJava2Adapter.flowableToFlux(flowable)
.groupBy(RxJavaReactorMigrationUtil.toJdkFunction(keySelector)));
.groupBy(RxJavaReactorMigrationUtil.toJdkFunction(keySelector))
.map(RxJavaReactorMigrationUtil::groupedFluxToGroupedFlowable));
}
}

View File

@@ -489,14 +489,16 @@ final class RxJavaMaybeToReactorTemplates {
// }
// XXX: This one is still not correct, see example above. Perhaps try this? <S, T extends S, O> {
static final class MaybeFlatMapSingleElement<T, O, S extends SingleSource<O>> {
static final class MaybeFlatMapSingleElement<T, O> {
@BeforeTemplate
Maybe<O> before(Maybe<T> maybe, Function<T, S> function) {
Maybe<O> before(
Maybe<T> maybe, Function<? super T, ? extends SingleSource<? extends O>> function) {
return maybe.flatMapSingleElement(function);
}
@AfterTemplate
Maybe<O> after(Maybe<T> maybe, Function<T, S> function) {
Maybe<O> after(
Maybe<T> maybe, Function<? super T, ? extends SingleSource<? extends O>> function) {
return RxJava2Adapter.monoToMaybe(
RxJava2Adapter.maybeToMono(maybe)
.flatMap(
@@ -645,7 +647,7 @@ final class RxJavaMaybeToReactorTemplates {
@AfterTemplate
Single<T> after(Maybe<T> maybe) {
return RxJava2Adapter.monoToSingle(RxJava2Adapter.maybeToMono(maybe));
return RxJava2Adapter.monoToSingle(RxJava2Adapter.maybeToMono(maybe).single());
}
}

View File

@@ -1,8 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>tech.picnic.error-prone-support</groupId>
<artifactId>error-prone-support</artifactId>
@@ -17,6 +16,10 @@
</properties>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
@@ -27,4 +30,4 @@
<scope>provided</scope>
</dependency>
</dependencies>
</project>
</project>

View File

@@ -1,8 +1,18 @@
package tech.picnic.errorprone.migration.util;
import io.reactivex.flowables.GroupedFlowable;
import io.reactivex.functions.Action;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.fuseable.QueueSubscription;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Operators;
public final class RxJavaReactorMigrationUtil {
private RxJavaReactorMigrationUtil() {}
@@ -152,4 +162,204 @@ public final class RxJavaReactorMigrationUtil {
// A.param 1 type extends B.param 1 type
// ....
// B throws a subset of the exceptions thrown by A
public static <K, V> GroupedFlowable<K, V> groupedFluxToGroupedFlowable(
GroupedFlux<K, V> source) {
return new GroupedFluxAsGroupedFlowable<>(source);
}
private static final class GroupedFluxAsGroupedFlowable<K, V> extends GroupedFlowable<K, V> {
private final GroupedFlux<K, V> source;
GroupedFluxAsGroupedFlowable(GroupedFlux<K, V> source) {
super(source.key());
this.source = source;
}
@Override
public void subscribeActual(Subscriber<? super V> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(
new FluxAsFlowableConditionalSubscriber<>((ConditionalSubscriber<? super V>) s));
} else {
source.subscribe(new FluxAsFlowableSubscriber<>(s));
}
}
private static final class FluxAsFlowableSubscriber<T>
implements CoreSubscriber<T>, QueueSubscription<T> {
private final Subscriber<? super T> actual;
private Subscription s;
private Fuseable.QueueSubscription<T> qs;
FluxAsFlowableSubscriber(Subscriber<? super T> actual) {
this.actual = actual;
}
@Override
@SuppressWarnings("unchecked")
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
if (s instanceof Fuseable.QueueSubscription) {
this.qs = (Fuseable.QueueSubscription<T>) s;
}
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
@Override
public T poll() {
return qs.poll();
}
@Override
public boolean isEmpty() {
return qs.isEmpty();
}
@Override
public void clear() {
qs.clear();
}
@Override
public int requestFusion(int requestedMode) {
if (qs != null) {
return qs.requestFusion(requestedMode);
}
return Fuseable.NONE;
}
@Override
public boolean offer(T value) {
throw new UnsupportedOperationException("Should not be called");
}
@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called");
}
}
private static final class FluxAsFlowableConditionalSubscriber<T>
implements Fuseable.ConditionalSubscriber<T>, QueueSubscription<T> {
private final ConditionalSubscriber<? super T> actual;
private Subscription s;
private QueueSubscription<T> qs;
FluxAsFlowableConditionalSubscriber(ConditionalSubscriber<? super T> actual) {
this.actual = actual;
}
@Override
@SuppressWarnings("unchecked")
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
if (s instanceof io.reactivex.internal.fuseable.QueueSubscription) {
this.qs = (QueueSubscription<T>) s;
}
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public boolean tryOnNext(T t) {
return actual.tryOnNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
@Override
public T poll() {
try {
return qs.poll();
} catch (Throwable ex) {
throw Exceptions.bubble(ex);
}
}
@Override
public boolean isEmpty() {
return qs.isEmpty();
}
@Override
public void clear() {
qs.clear();
}
@Override
public int requestFusion(int requestedMode) {
if (qs != null) {
return qs.requestFusion(requestedMode);
}
return NONE;
}
@Override
public boolean offer(T v1) {
throw new UnsupportedOperationException("Should not be called!");
}
@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called!");
}
}
}
}

14
pom.xml
View File

@@ -1153,13 +1153,13 @@
fork, some other dependencies depend on the official (i.e.,
non-forked) `error_prone_annotations`. Here we fix which
version is pulled in. -->
<!-- <dependencies>-->
<!-- <dependency>-->
<!-- <groupId>com.google.errorprone</groupId>-->
<!-- <artifactId>error_prone_annotations</artifactId>-->
<!-- <version>${version.error-prone-orig}</version>-->
<!-- </dependency>-->
<!-- </dependencies>-->
<!-- <dependencies>-->
<!-- <dependency>-->
<!-- <groupId>com.google.errorprone</groupId>-->
<!-- <artifactId>error_prone_annotations</artifactId>-->
<!-- <version>${version.error-prone-orig}</version>-->
<!-- </dependency>-->
<!-- </dependencies>-->
</dependencyManagement>
</profile>
<profile>