Failing test

This commit is contained in:
Julien Viet
2019-08-19 16:28:19 +02:00
parent 14effa3691
commit 1e9b90ddd1
2 changed files with 58 additions and 43 deletions

View File

@@ -37,9 +37,9 @@ public class FakeStream<T> implements ReadStream<T>, WriteStream<T> {
private Handler<Throwable> exceptionHandler;
private Handler<T> itemHandler;
private Handler<Void> endHandler;
private final Deque<T> pending = new ArrayDeque<>();
private final Deque<Object> pending = new ArrayDeque<>();
private long demand = Long.MAX_VALUE;
private Promise<Void> ended;
private boolean ended;
private boolean overflow;
private Handler<Void> drainHandler;
private int pauseCount;
@@ -59,7 +59,7 @@ public class FakeStream<T> implements ReadStream<T>, WriteStream<T> {
}
public synchronized boolean isEnded() {
return ended != null;
return ended;
}
public synchronized long demand() {
@@ -82,7 +82,7 @@ public class FakeStream<T> implements ReadStream<T>, WriteStream<T> {
}
public synchronized boolean emit(Stream<T> stream) {
if (ended != null) {
if (ended) {
throw new IllegalStateException();
}
stream.forEach(pending::add);
@@ -100,26 +100,24 @@ public class FakeStream<T> implements ReadStream<T>, WriteStream<T> {
@Override
public void end(Handler<AsyncResult<Void>> h) {
Promise<Void> promise = Promise.promise();
promise.future().setHandler(ar -> {
if (h != null) {
h.handle(ar);
}
Handler<Void> handler = endHandler();
if (handler != null) {
handler.handle(null);
}
});
synchronized(this) {
if (ended != null) {
if (ended) {
throw new IllegalStateException();
}
ended = promise;
if (pending.size() > 0) {
return;
}
ended = true;
Promise<Void> promise = Promise.promise();
promise.future().setHandler(ar -> {
if (h != null) {
h.handle(ar);
}
Handler<Void> handler = endHandler();
if (handler != null) {
handler.handle(null);
}
});
pending.add(promise);
}
completion.setHandler(promise);
checkPending();
}
public synchronized void fail(Throwable err) {
@@ -151,14 +149,18 @@ public class FakeStream<T> implements ReadStream<T>, WriteStream<T> {
return;
}
emitting = true;
T elt;
Object elt;
while (demand > 0L && (elt = pending.poll()) != null) {
if (demand != Long.MAX_VALUE) {
demand--;
}
Handler<T> handler = itemHandler;
if (handler != null) {
handler.handle(elt);
if (elt instanceof Promise) {
completion.setHandler((Promise) elt);
} else {
Handler<T> handler = itemHandler;
if (handler != null) {
handler.handle((T) elt);
}
}
}
if (pending.isEmpty() && overflow) {
@@ -180,9 +182,6 @@ public class FakeStream<T> implements ReadStream<T>, WriteStream<T> {
demand = Long.MAX_VALUE;
}
checkPending();
if (pending.isEmpty() && ended != null) {
completion.setHandler(ended);
}
return this;
}

View File

@@ -125,37 +125,53 @@ public class FakeStreamTest extends AsyncTestBase {
}
@Test
public void testAsyncEnd() {
Promise<Void> end = Promise.promise();
stream.completion(end.future());
AtomicBoolean ended = new AtomicBoolean();
stream.endHandler(v -> ended.set(true));
public void testFetchAfterEnd() {
AtomicInteger ended = new AtomicInteger();
AtomicReference<AsyncResult> endRes = new AtomicReference<>();
stream.endHandler(v -> ended.incrementAndGet());
stream.end(endRes::set);
assertFalse(ended.get());
assertNull(endRes.get());
end.complete();
assertTrue(ended.get());
assertEquals(1, ended.get());
assertTrue(endRes.get().succeeded());
stream.fetch(1);
assertEquals(1, ended.get());
assertTrue(endRes.get().succeeded());
}
@Test
public void testAsyncEndOnFetch() {
public void testAsyncEnd() {
Promise<Void> end = Promise.promise();
AtomicInteger ended = new AtomicInteger();
AtomicReference<AsyncResult> endRes = new AtomicReference<>();
stream.completion(end.future());
stream.endHandler(v -> ended.incrementAndGet());
stream.end(endRes::set);
assertEquals(0, ended.get());
assertNull(endRes.get());
end.complete();
assertEquals(1, ended.get());
assertTrue(endRes.get().succeeded());
}
@Test
public void testAsyncEndDeferred() {
Promise<Void> end = Promise.promise();
AtomicInteger ended = new AtomicInteger();
AtomicReference<AsyncResult> endRes = new AtomicReference<>();
stream.completion(end.future());
stream.pause();
stream.emit(3);
AtomicBoolean ended = new AtomicBoolean();
stream.endHandler(v -> ended.set(true));
AtomicReference<AsyncResult> endRes = new AtomicReference<>();
stream.endHandler(v -> ended.incrementAndGet());
stream.end(endRes::set);
assertFalse(ended.get());
assertEquals(0, ended.get());
assertNull(endRes.get());
end.complete();
assertFalse(ended.get());
assertEquals(0, ended.get());
assertNull(endRes.get());
stream.fetch(1);
assertTrue(ended.get());
assertEquals(0, ended.get());
assertNull(endRes.get());
stream.fetch(1);
assertEquals(1, ended.get());
assertTrue(endRes.get().succeeded());
}
}