From 17d05d0fb852b52022d263878a7b9f9036775ff2 Mon Sep 17 00:00:00 2001 From: purplefox Date: Thu, 19 Jun 2014 18:07:16 +0100 Subject: [PATCH] added some thread checking logic in tests and fix(ing) issue with wrong netty thread being used --- .../org/vertx/java/core/impl/ContextImpl.java | 13 +- .../vertx/java/tests/core/AsyncTestBase.java | 64 +++++++ .../java/tests/core/AsyncTestBaseTest.java | 1 + .../org/vertx/java/tests/core/NetTest.java | 168 +++++++++++++++++- 4 files changed, 242 insertions(+), 4 deletions(-) diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java b/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java index ede54a694..8244961f6 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java @@ -130,10 +130,21 @@ public abstract class ContextImpl implements Context { public abstract boolean isOnCorrectWorker(EventLoop worker); + // FIXME - make sure this is right and get rid of worker param public void execute(EventLoop worker, Runnable handler) { - if (isOnCorrectWorker(worker)) { + boolean correctThread; + Thread thread = Thread.currentThread(); + if (thread instanceof VertxThread) { + VertxThread vthread = (VertxThread)thread; + Context ctx = vthread.getContext(); + correctThread = ctx == this; + } else { + correctThread = false; + } + if (correctThread) { wrapTask(handler).run(); } else { + //System.out.println("Wrong thread, will execute on correct one: " + Thread.currentThread()); execute(handler); } } diff --git a/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBase.java b/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBase.java index 3d2e24b7f..fd399f3e0 100644 --- a/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBase.java +++ b/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBase.java @@ -22,6 +22,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.internal.ArrayComparisonFailure; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -35,8 +37,11 @@ public class AsyncTestBase { private volatile boolean testCompleteCalled; private volatile boolean awaitCalled; private volatile boolean timedOut; + private boolean threadChecksEnabled = true; + private Map threadNames = new ConcurrentHashMap<>(); protected void testComplete() { + checkThread(); if (testCompleteCalled) { throw new IllegalStateException("testComplete() already called"); } @@ -83,9 +88,22 @@ public class AsyncTestBase { } } + protected void disableThreadChecks() { + threadChecksEnabled = false; + } + @After public void afterAsyncTestBase() { checkTestCompleteCalled(); + if (threadChecksEnabled) { + for (Map.Entry entry: threadNames.entrySet()) { + if (!entry.getKey().equals("main") && !entry.getKey().startsWith("vert.x-")) { + IllegalStateException is = new IllegalStateException("Non Vert.x thread! :" + entry.getKey()); + is.setStackTrace(entry.getValue().getStackTrace()); + throw is; + } + } + } } private void handleThrowable(Throwable t) { @@ -100,7 +118,12 @@ public class AsyncTestBase { throwable = null; } + protected void checkThread() { + threadNames.put(Thread.currentThread().getName(), new Exception()); + } + protected void assertTrue(String message, boolean condition) { + checkThread(); try { Assert.assertTrue(message, condition); } catch (AssertionError e) { @@ -109,6 +132,7 @@ public class AsyncTestBase { } protected void assertFalse(boolean condition) { + checkThread(); try { Assert.assertFalse(condition); } catch (AssertionError e) { @@ -117,6 +141,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(String message, char[] expecteds, char[] actuals) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals); } catch (AssertionError e) { @@ -125,6 +150,7 @@ public class AsyncTestBase { } protected void assertSame(String message, Object expected, Object actual) { + checkThread(); try { Assert.assertSame(message, expected, actual); } catch (AssertionError e) { @@ -133,6 +159,7 @@ public class AsyncTestBase { } protected void assertEquals(long expected, long actual) { + checkThread(); try { Assert.assertEquals(expected, actual); } catch (AssertionError e) { @@ -141,6 +168,7 @@ public class AsyncTestBase { } protected void assertNull(Object object) { + checkThread(); try { Assert.assertNull(object); } catch (AssertionError e) { @@ -149,6 +177,7 @@ public class AsyncTestBase { } protected void assertFalse(String message, boolean condition) { + checkThread(); try { Assert.assertFalse(message, condition); } catch (AssertionError e) { @@ -157,6 +186,7 @@ public class AsyncTestBase { } protected void fail(String message) { + checkThread(); try { Assert.fail(message); } catch (AssertionError e) { @@ -165,6 +195,7 @@ public class AsyncTestBase { } protected void assertNull(String message, Object object) { + checkThread(); try { Assert.assertNull(message, object); } catch (AssertionError e) { @@ -173,6 +204,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(String message, float[] expecteds, float[] actuals, float delta) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals, delta); } catch (AssertionError e) { @@ -182,6 +214,7 @@ public class AsyncTestBase { @Deprecated protected void assertEquals(String message, double expected, double actual) { + checkThread(); try { Assert.assertEquals(message, expected, actual); } catch (AssertionError e) { @@ -191,6 +224,7 @@ public class AsyncTestBase { protected void assertArrayEquals(String message, double[] expecteds, double[] actuals, double delta) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals, delta); } catch (AssertionError e) { @@ -199,6 +233,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(String message, Object[] expecteds, Object[] actuals) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals); } catch (AssertionError e) { @@ -207,6 +242,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(String message, short[] expecteds, short[] actuals) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals); } catch (AssertionError e) { @@ -215,6 +251,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(short[] expecteds, short[] actuals) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals); } catch (AssertionError e) { @@ -223,6 +260,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(long[] expecteds, long[] actuals) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals); } catch (AssertionError e) { @@ -231,6 +269,7 @@ public class AsyncTestBase { } protected void assertNotNull(Object object) { + checkThread(); try { Assert.assertNotNull(object); } catch (AssertionError e) { @@ -239,6 +278,7 @@ public class AsyncTestBase { } protected void assertEquals(Object expected, Object actual) { + checkThread(); try { Assert.assertEquals(expected, actual); } catch (AssertionError e) { @@ -247,6 +287,7 @@ public class AsyncTestBase { } protected void assertEquals(String message, Object expected, Object actual) { + checkThread(); try { Assert.assertEquals(message, expected, actual); } catch (AssertionError e) { @@ -255,6 +296,7 @@ public class AsyncTestBase { } protected void assertTrue(boolean condition) { + checkThread(); try { Assert.assertTrue(condition); } catch (AssertionError e) { @@ -263,6 +305,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(Object[] expecteds, Object[] actuals) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals); } catch (AssertionError e) { @@ -271,6 +314,7 @@ public class AsyncTestBase { } protected void assertNotNull(String message, Object object) { + checkThread(); try { Assert.assertNotNull(message, object); } catch (AssertionError e) { @@ -279,6 +323,7 @@ public class AsyncTestBase { } protected void assertEquals(String message, double expected, double actual, double delta) { + checkThread(); try { Assert.assertEquals(message, expected, actual, delta); } catch (AssertionError e) { @@ -287,6 +332,7 @@ public class AsyncTestBase { } protected void fail() { + checkThread(); try { Assert.fail(); } catch (AssertionError e) { @@ -295,6 +341,7 @@ public class AsyncTestBase { } protected void assertSame(Object expected, Object actual) { + checkThread(); try { Assert.assertSame(expected, actual); } catch (AssertionError e) { @@ -303,6 +350,7 @@ public class AsyncTestBase { } protected void assertEquals(String message, long expected, long actual) { + checkThread(); try { Assert.assertEquals(message, expected, actual); } catch (AssertionError e) { @@ -311,6 +359,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(String message, byte[] expecteds, byte[] actuals) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals); } catch (AssertionError e) { @@ -319,6 +368,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(String message, long[] expecteds, long[] actuals) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals); } catch (AssertionError e) { @@ -327,6 +377,7 @@ public class AsyncTestBase { } protected void assertEquals(double expected, double actual, double delta) { + checkThread(); try { Assert.assertEquals(expected, actual, delta); } catch (AssertionError e) { @@ -335,6 +386,7 @@ public class AsyncTestBase { } protected void assertThat(T actual, Matcher matcher) { + checkThread(); try { Assert.assertThat(actual, matcher); } catch (AssertionError e) { @@ -344,6 +396,7 @@ public class AsyncTestBase { @Deprecated protected void assertEquals(String message, Object[] expecteds, Object[] actuals) { + checkThread(); try { Assert.assertEquals(message, expecteds, actuals); } catch (AssertionError e) { @@ -353,6 +406,7 @@ public class AsyncTestBase { @Deprecated protected void assertEquals(Object[] expecteds, Object[] actuals) { + checkThread(); try { Assert.assertEquals(expecteds, actuals); } catch (AssertionError e) { @@ -361,6 +415,7 @@ public class AsyncTestBase { } protected void assertNotSame(String message, Object unexpected, Object actual) { + checkThread(); try { Assert.assertNotSame(message, unexpected, actual); } catch (AssertionError e) { @@ -369,6 +424,7 @@ public class AsyncTestBase { } protected void assertThat(String reason, T actual, Matcher matcher) { + checkThread(); try { Assert.assertThat(reason, actual, matcher); } catch (AssertionError e) { @@ -377,6 +433,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(float[] expecteds, float[] actuals, float delta) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals, delta); } catch (AssertionError e) { @@ -385,6 +442,7 @@ public class AsyncTestBase { } protected void assertNotSame(Object unexpected, Object actual) { + checkThread(); try { Assert.assertNotSame(unexpected, actual); } catch (AssertionError e) { @@ -393,6 +451,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(byte[] expecteds, byte[] actuals) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals); } catch (AssertionError e) { @@ -401,6 +460,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(char[] expecteds, char[] actuals) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals); } catch (AssertionError e) { @@ -409,6 +469,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(double[] expecteds, double[] actuals, double delta) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals, delta); } catch (AssertionError e) { @@ -417,6 +478,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(int[] expecteds, int[] actuals) { + checkThread(); try { Assert.assertArrayEquals(expecteds, actuals); } catch (AssertionError e) { @@ -426,6 +488,7 @@ public class AsyncTestBase { @Deprecated protected void assertEquals(double expected, double actual) { + checkThread(); try { Assert.assertEquals(expected, actual); } catch (AssertionError e) { @@ -434,6 +497,7 @@ public class AsyncTestBase { } protected void assertArrayEquals(String message, int[] expecteds, int[] actuals) throws ArrayComparisonFailure { + checkThread(); try { Assert.assertArrayEquals(message, expecteds, actuals); } catch (AssertionError e) { diff --git a/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBaseTest.java b/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBaseTest.java index 262dbb51e..8fc8d305e 100644 --- a/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBaseTest.java +++ b/vertx-core/src/test/java/org/vertx/java/tests/core/AsyncTestBaseTest.java @@ -35,6 +35,7 @@ public class AsyncTestBaseTest extends AsyncTestBase { @Before public void before() { + disableThreadChecks(); executor = Executors.newFixedThreadPool(10); } diff --git a/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java b/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java index 12f9be6f8..bff5b17eb 100644 --- a/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java +++ b/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java @@ -24,11 +24,15 @@ import org.vertx.java.core.AsyncResultHandler; import org.vertx.java.core.Handler; import org.vertx.java.core.buffer.Buffer; import org.vertx.java.core.eventbus.Message; +import org.vertx.java.core.http.RequestOptions; import org.vertx.java.core.impl.ConcurrentHashSet; import org.vertx.java.core.net.*; import org.vertx.java.core.net.impl.SocketDefaults; +import java.io.BufferedWriter; import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; import java.nio.file.Files; import java.util.ArrayList; import java.util.List; @@ -992,7 +996,7 @@ public class NetTest extends VertxTestBase { @Test public void testWriteSameBufferMoreThanOnce() throws Exception { server.connectHandler(socket -> { - final Buffer received = new Buffer(); + Buffer received = new Buffer(); socket.dataHandler(buff -> { received.appendBuffer(buff); if (received.toString().equals("foofoo")) { @@ -1011,9 +1015,65 @@ public class NetTest extends VertxTestBase { await(); } + @Test + public void sendFileClientToServer() throws Exception { + File fDir = Files.createTempDirectory("vertx-test").toFile(); + String content = randomUnicodeString(10000); + File file = setupFile(fDir.toString(), "some-file.txt", content); + Buffer expected = new Buffer(content); + Buffer received = new Buffer(); + server.connectHandler(sock -> { + sock.dataHandler(buff -> { + received.appendBuffer(buff); + if (received.length() == expected.length()) { + assertTrue(buffersEqual(expected, received)); + testComplete(); + } + }); + }); + server.listen(ar -> { + assertTrue(ar.succeeded()); + client.connect(1234, ar2 -> { + assertTrue(ar2.succeeded()); + NetSocket sock = ar2.result(); + sock.sendFile(file.getAbsolutePath()); + }); + }); + + await(); + } + + @Test + public void sendFileServerToClient() throws Exception { + File fDir = Files.createTempDirectory("vertx-test").toFile(); + String content = randomUnicodeString(10000); + File file = setupFile(fDir.toString(), "some-file.txt", content); + Buffer expected = new Buffer(content); + Buffer received = new Buffer(); + server.connectHandler(sock -> { + sock.sendFile(file.getAbsolutePath()); + }); + server.listen(ar -> { + assertTrue(ar.succeeded()); + client.connect(1234, ar2 -> { + assertTrue(ar2.succeeded()); + NetSocket sock = ar2.result(); + sock.dataHandler(buff -> { + received.appendBuffer(buff); + if (received.length() == expected.length()) { + assertTrue(buffersEqual(expected, received)); + testComplete(); + } + }); + }); + }); + + await(); + } + @Test public void testSendFileDirectory() throws Exception { - final File fDir = Files.createTempDirectory("vertx-test").toFile(); + File fDir = Files.createTempDirectory("vertx-test").toFile(); fDir.deleteOnExit(); server.connectHandler(socket -> { SocketAddress addr = socket.remoteAddress(); @@ -1141,7 +1201,8 @@ public class NetTest extends VertxTestBase { server.listen(ar -> { assertTrue(ar.succeeded()); try { - server.listen(sock -> {}); + server.listen(sock -> { + }); fail("Should throw exception"); } catch (IllegalStateException e) { // OK @@ -1151,4 +1212,105 @@ public class NetTest extends VertxTestBase { await(); } + private void checkContext() { + + } + +// /* +// Make sure all the different handlers are called on the right context +// */ +// @Test +// public void testContext() throws Exception { +// File fDir = Files.createTempDirectory("vertx-test").toFile(); +// String content = randomUnicodeString(10000); +// File file = setupFile(fDir.toString(), "some-file.txt", content); +// /* +// create conn to server +// +// */ +// +// server.connectHandler(sock -> { +// +// checkContext(); +// +// sock.ssl(v -> { +// checkContext(); +// +// }); +// +// sock.sendFile(file.getAbsolutePath(), ar -> { +// checkContext(); +// assertTrue(ar.succeeded()); +// }); +// +// +// sock.dataHandler(buff -> { +// sock.write(buff); +// +// }); +// +// sock.endHandler(v -> { +// +// }); +// sock.closeHandler(v -> { +// +// }); +// +// +// sock.drainHandler(v -> { +// +// }); +// sock.exceptionHandler(t -> { +// +// }); +// +// }); +// +// server.listen(ar -> { +// +// }); +// +// client.connect(1234, ar -> { +// assertTrue(ar.succeeded()); +// NetSocket sock = ar.result(); +// sock.dataHandler(buff -> { +// +// +// }); +// sock.endHandler(v -> { +// +// }); +// sock.closeHandler(v -> { +// +// }); +// sock.sendFile(file.getAbsolutePath(), ar -> { +// +// }); +// sock.ssl(v -> { +// +// }); +// sock.drainHandler(v -> { +// +// }); +// sock.exceptionHandler(t -> { +// +// }); +// }); +// +// +// await(); +// } + + private File setupFile(String testDir, String fileName, String content) throws Exception { + File file = new File(testDir, fileName); + if (file.exists()) { + file.delete(); + } + file.deleteOnExit(); + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8")); + out.write(content); + out.close(); + return file; + } + }