From 1e9b90ddd138102c4f17bc8cb4179b8a0e95c42e Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 19 Aug 2019 16:28:19 +0200 Subject: [PATCH] Failing test --- .../io/vertx/test/fakestream/FakeStream.java | 53 +++++++++---------- .../vertx/test/fakestream/FakeStreamTest.java | 48 +++++++++++------ 2 files changed, 58 insertions(+), 43 deletions(-) diff --git a/src/test/java/io/vertx/test/fakestream/FakeStream.java b/src/test/java/io/vertx/test/fakestream/FakeStream.java index 50ebf3f84..f26a0aa63 100644 --- a/src/test/java/io/vertx/test/fakestream/FakeStream.java +++ b/src/test/java/io/vertx/test/fakestream/FakeStream.java @@ -37,9 +37,9 @@ public class FakeStream implements ReadStream, WriteStream { private Handler exceptionHandler; private Handler itemHandler; private Handler endHandler; - private final Deque pending = new ArrayDeque<>(); + private final Deque pending = new ArrayDeque<>(); private long demand = Long.MAX_VALUE; - private Promise ended; + private boolean ended; private boolean overflow; private Handler drainHandler; private int pauseCount; @@ -59,7 +59,7 @@ public class FakeStream implements ReadStream, WriteStream { } public synchronized boolean isEnded() { - return ended != null; + return ended; } public synchronized long demand() { @@ -82,7 +82,7 @@ public class FakeStream implements ReadStream, WriteStream { } public synchronized boolean emit(Stream stream) { - if (ended != null) { + if (ended) { throw new IllegalStateException(); } stream.forEach(pending::add); @@ -100,26 +100,24 @@ public class FakeStream implements ReadStream, WriteStream { @Override public void end(Handler> h) { - Promise promise = Promise.promise(); - promise.future().setHandler(ar -> { - if (h != null) { - h.handle(ar); - } - Handler handler = endHandler(); - if (handler != null) { - handler.handle(null); - } - }); synchronized(this) { - if (ended != null) { + if (ended) { throw new IllegalStateException(); } - ended = promise; - if (pending.size() > 0) { - return; - } + ended = true; + Promise promise = Promise.promise(); + promise.future().setHandler(ar -> { + if (h != null) { + h.handle(ar); + } + Handler handler = endHandler(); + if (handler != null) { + handler.handle(null); + } + }); + pending.add(promise); } - completion.setHandler(promise); + checkPending(); } public synchronized void fail(Throwable err) { @@ -151,14 +149,18 @@ public class FakeStream implements ReadStream, WriteStream { return; } emitting = true; - T elt; + Object elt; while (demand > 0L && (elt = pending.poll()) != null) { if (demand != Long.MAX_VALUE) { demand--; } - Handler handler = itemHandler; - if (handler != null) { - handler.handle(elt); + if (elt instanceof Promise) { + completion.setHandler((Promise) elt); + } else { + Handler handler = itemHandler; + if (handler != null) { + handler.handle((T) elt); + } } } if (pending.isEmpty() && overflow) { @@ -180,9 +182,6 @@ public class FakeStream implements ReadStream, WriteStream { demand = Long.MAX_VALUE; } checkPending(); - if (pending.isEmpty() && ended != null) { - completion.setHandler(ended); - } return this; } diff --git a/src/test/java/io/vertx/test/fakestream/FakeStreamTest.java b/src/test/java/io/vertx/test/fakestream/FakeStreamTest.java index ad74cf32d..ec836c631 100644 --- a/src/test/java/io/vertx/test/fakestream/FakeStreamTest.java +++ b/src/test/java/io/vertx/test/fakestream/FakeStreamTest.java @@ -125,37 +125,53 @@ public class FakeStreamTest extends AsyncTestBase { } @Test - public void testAsyncEnd() { - Promise end = Promise.promise(); - stream.completion(end.future()); - AtomicBoolean ended = new AtomicBoolean(); - stream.endHandler(v -> ended.set(true)); + public void testFetchAfterEnd() { + AtomicInteger ended = new AtomicInteger(); AtomicReference endRes = new AtomicReference<>(); + stream.endHandler(v -> ended.incrementAndGet()); stream.end(endRes::set); - assertFalse(ended.get()); - assertNull(endRes.get()); - end.complete(); - assertTrue(ended.get()); + assertEquals(1, ended.get()); + assertTrue(endRes.get().succeeded()); + stream.fetch(1); + assertEquals(1, ended.get()); assertTrue(endRes.get().succeeded()); } @Test - public void testAsyncEndOnFetch() { + public void testAsyncEnd() { Promise end = Promise.promise(); + AtomicInteger ended = new AtomicInteger(); + AtomicReference endRes = new AtomicReference<>(); + stream.completion(end.future()); + stream.endHandler(v -> ended.incrementAndGet()); + stream.end(endRes::set); + assertEquals(0, ended.get()); + assertNull(endRes.get()); + end.complete(); + assertEquals(1, ended.get()); + assertTrue(endRes.get().succeeded()); + } + + @Test + public void testAsyncEndDeferred() { + Promise end = Promise.promise(); + AtomicInteger ended = new AtomicInteger(); + AtomicReference endRes = new AtomicReference<>(); stream.completion(end.future()); stream.pause(); stream.emit(3); - AtomicBoolean ended = new AtomicBoolean(); - stream.endHandler(v -> ended.set(true)); - AtomicReference endRes = new AtomicReference<>(); + stream.endHandler(v -> ended.incrementAndGet()); stream.end(endRes::set); - assertFalse(ended.get()); + assertEquals(0, ended.get()); assertNull(endRes.get()); end.complete(); - assertFalse(ended.get()); + assertEquals(0, ended.get()); assertNull(endRes.get()); stream.fetch(1); - assertTrue(ended.get()); + assertEquals(0, ended.get()); + assertNull(endRes.get()); + stream.fetch(1); + assertEquals(1, ended.get()); assertTrue(endRes.get().succeeded()); } }