added some thread checking logic in tests and fix(ing) issue with wrong netty thread being used

This commit is contained in:
purplefox
2014-06-19 18:07:16 +01:00
parent f6aa1e5a49
commit 17d05d0fb8
4 changed files with 242 additions and 4 deletions

View File

@@ -130,10 +130,21 @@ public abstract class ContextImpl implements Context {
public abstract boolean isOnCorrectWorker(EventLoop worker);
// FIXME - make sure this is right and get rid of worker param
public void execute(EventLoop worker, Runnable handler) {
if (isOnCorrectWorker(worker)) {
boolean correctThread;
Thread thread = Thread.currentThread();
if (thread instanceof VertxThread) {
VertxThread vthread = (VertxThread)thread;
Context ctx = vthread.getContext();
correctThread = ctx == this;
} else {
correctThread = false;
}
if (correctThread) {
wrapTask(handler).run();
} else {
//System.out.println("Wrong thread, will execute on correct one: " + Thread.currentThread());
execute(handler);
}
}

View File

@@ -22,6 +22,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.internal.ArrayComparisonFailure;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -35,8 +37,11 @@ public class AsyncTestBase {
private volatile boolean testCompleteCalled;
private volatile boolean awaitCalled;
private volatile boolean timedOut;
private boolean threadChecksEnabled = true;
private Map<String, Exception> threadNames = new ConcurrentHashMap<>();
protected void testComplete() {
checkThread();
if (testCompleteCalled) {
throw new IllegalStateException("testComplete() already called");
}
@@ -83,9 +88,22 @@ public class AsyncTestBase {
}
}
protected void disableThreadChecks() {
threadChecksEnabled = false;
}
@After
public void afterAsyncTestBase() {
checkTestCompleteCalled();
if (threadChecksEnabled) {
for (Map.Entry<String, Exception> entry: threadNames.entrySet()) {
if (!entry.getKey().equals("main") && !entry.getKey().startsWith("vert.x-")) {
IllegalStateException is = new IllegalStateException("Non Vert.x thread! :" + entry.getKey());
is.setStackTrace(entry.getValue().getStackTrace());
throw is;
}
}
}
}
private void handleThrowable(Throwable t) {
@@ -100,7 +118,12 @@ public class AsyncTestBase {
throwable = null;
}
protected void checkThread() {
threadNames.put(Thread.currentThread().getName(), new Exception());
}
protected void assertTrue(String message, boolean condition) {
checkThread();
try {
Assert.assertTrue(message, condition);
} catch (AssertionError e) {
@@ -109,6 +132,7 @@ public class AsyncTestBase {
}
protected void assertFalse(boolean condition) {
checkThread();
try {
Assert.assertFalse(condition);
} catch (AssertionError e) {
@@ -117,6 +141,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(String message, char[] expecteds, char[] actuals) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals);
} catch (AssertionError e) {
@@ -125,6 +150,7 @@ public class AsyncTestBase {
}
protected void assertSame(String message, Object expected, Object actual) {
checkThread();
try {
Assert.assertSame(message, expected, actual);
} catch (AssertionError e) {
@@ -133,6 +159,7 @@ public class AsyncTestBase {
}
protected void assertEquals(long expected, long actual) {
checkThread();
try {
Assert.assertEquals(expected, actual);
} catch (AssertionError e) {
@@ -141,6 +168,7 @@ public class AsyncTestBase {
}
protected void assertNull(Object object) {
checkThread();
try {
Assert.assertNull(object);
} catch (AssertionError e) {
@@ -149,6 +177,7 @@ public class AsyncTestBase {
}
protected void assertFalse(String message, boolean condition) {
checkThread();
try {
Assert.assertFalse(message, condition);
} catch (AssertionError e) {
@@ -157,6 +186,7 @@ public class AsyncTestBase {
}
protected void fail(String message) {
checkThread();
try {
Assert.fail(message);
} catch (AssertionError e) {
@@ -165,6 +195,7 @@ public class AsyncTestBase {
}
protected void assertNull(String message, Object object) {
checkThread();
try {
Assert.assertNull(message, object);
} catch (AssertionError e) {
@@ -173,6 +204,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(String message, float[] expecteds, float[] actuals, float delta) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals, delta);
} catch (AssertionError e) {
@@ -182,6 +214,7 @@ public class AsyncTestBase {
@Deprecated
protected void assertEquals(String message, double expected, double actual) {
checkThread();
try {
Assert.assertEquals(message, expected, actual);
} catch (AssertionError e) {
@@ -191,6 +224,7 @@ public class AsyncTestBase {
protected void assertArrayEquals(String message, double[] expecteds, double[] actuals, double delta) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals, delta);
} catch (AssertionError e) {
@@ -199,6 +233,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(String message, Object[] expecteds, Object[] actuals) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals);
} catch (AssertionError e) {
@@ -207,6 +242,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(String message, short[] expecteds, short[] actuals) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals);
} catch (AssertionError e) {
@@ -215,6 +251,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(short[] expecteds, short[] actuals) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals);
} catch (AssertionError e) {
@@ -223,6 +260,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(long[] expecteds, long[] actuals) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals);
} catch (AssertionError e) {
@@ -231,6 +269,7 @@ public class AsyncTestBase {
}
protected void assertNotNull(Object object) {
checkThread();
try {
Assert.assertNotNull(object);
} catch (AssertionError e) {
@@ -239,6 +278,7 @@ public class AsyncTestBase {
}
protected void assertEquals(Object expected, Object actual) {
checkThread();
try {
Assert.assertEquals(expected, actual);
} catch (AssertionError e) {
@@ -247,6 +287,7 @@ public class AsyncTestBase {
}
protected void assertEquals(String message, Object expected, Object actual) {
checkThread();
try {
Assert.assertEquals(message, expected, actual);
} catch (AssertionError e) {
@@ -255,6 +296,7 @@ public class AsyncTestBase {
}
protected void assertTrue(boolean condition) {
checkThread();
try {
Assert.assertTrue(condition);
} catch (AssertionError e) {
@@ -263,6 +305,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(Object[] expecteds, Object[] actuals) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals);
} catch (AssertionError e) {
@@ -271,6 +314,7 @@ public class AsyncTestBase {
}
protected void assertNotNull(String message, Object object) {
checkThread();
try {
Assert.assertNotNull(message, object);
} catch (AssertionError e) {
@@ -279,6 +323,7 @@ public class AsyncTestBase {
}
protected void assertEquals(String message, double expected, double actual, double delta) {
checkThread();
try {
Assert.assertEquals(message, expected, actual, delta);
} catch (AssertionError e) {
@@ -287,6 +332,7 @@ public class AsyncTestBase {
}
protected void fail() {
checkThread();
try {
Assert.fail();
} catch (AssertionError e) {
@@ -295,6 +341,7 @@ public class AsyncTestBase {
}
protected void assertSame(Object expected, Object actual) {
checkThread();
try {
Assert.assertSame(expected, actual);
} catch (AssertionError e) {
@@ -303,6 +350,7 @@ public class AsyncTestBase {
}
protected void assertEquals(String message, long expected, long actual) {
checkThread();
try {
Assert.assertEquals(message, expected, actual);
} catch (AssertionError e) {
@@ -311,6 +359,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(String message, byte[] expecteds, byte[] actuals) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals);
} catch (AssertionError e) {
@@ -319,6 +368,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(String message, long[] expecteds, long[] actuals) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals);
} catch (AssertionError e) {
@@ -327,6 +377,7 @@ public class AsyncTestBase {
}
protected void assertEquals(double expected, double actual, double delta) {
checkThread();
try {
Assert.assertEquals(expected, actual, delta);
} catch (AssertionError e) {
@@ -335,6 +386,7 @@ public class AsyncTestBase {
}
protected <T> void assertThat(T actual, Matcher<T> matcher) {
checkThread();
try {
Assert.assertThat(actual, matcher);
} catch (AssertionError e) {
@@ -344,6 +396,7 @@ public class AsyncTestBase {
@Deprecated
protected void assertEquals(String message, Object[] expecteds, Object[] actuals) {
checkThread();
try {
Assert.assertEquals(message, expecteds, actuals);
} catch (AssertionError e) {
@@ -353,6 +406,7 @@ public class AsyncTestBase {
@Deprecated
protected void assertEquals(Object[] expecteds, Object[] actuals) {
checkThread();
try {
Assert.assertEquals(expecteds, actuals);
} catch (AssertionError e) {
@@ -361,6 +415,7 @@ public class AsyncTestBase {
}
protected void assertNotSame(String message, Object unexpected, Object actual) {
checkThread();
try {
Assert.assertNotSame(message, unexpected, actual);
} catch (AssertionError e) {
@@ -369,6 +424,7 @@ public class AsyncTestBase {
}
protected <T> void assertThat(String reason, T actual, Matcher<T> matcher) {
checkThread();
try {
Assert.assertThat(reason, actual, matcher);
} catch (AssertionError e) {
@@ -377,6 +433,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(float[] expecteds, float[] actuals, float delta) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals, delta);
} catch (AssertionError e) {
@@ -385,6 +442,7 @@ public class AsyncTestBase {
}
protected void assertNotSame(Object unexpected, Object actual) {
checkThread();
try {
Assert.assertNotSame(unexpected, actual);
} catch (AssertionError e) {
@@ -393,6 +451,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(byte[] expecteds, byte[] actuals) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals);
} catch (AssertionError e) {
@@ -401,6 +460,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(char[] expecteds, char[] actuals) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals);
} catch (AssertionError e) {
@@ -409,6 +469,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(double[] expecteds, double[] actuals, double delta) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals, delta);
} catch (AssertionError e) {
@@ -417,6 +478,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(int[] expecteds, int[] actuals) {
checkThread();
try {
Assert.assertArrayEquals(expecteds, actuals);
} catch (AssertionError e) {
@@ -426,6 +488,7 @@ public class AsyncTestBase {
@Deprecated
protected void assertEquals(double expected, double actual) {
checkThread();
try {
Assert.assertEquals(expected, actual);
} catch (AssertionError e) {
@@ -434,6 +497,7 @@ public class AsyncTestBase {
}
protected void assertArrayEquals(String message, int[] expecteds, int[] actuals) throws ArrayComparisonFailure {
checkThread();
try {
Assert.assertArrayEquals(message, expecteds, actuals);
} catch (AssertionError e) {

View File

@@ -35,6 +35,7 @@ public class AsyncTestBaseTest extends AsyncTestBase {
@Before
public void before() {
disableThreadChecks();
executor = Executors.newFixedThreadPool(10);
}

View File

@@ -24,11 +24,15 @@ import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.RequestOptions;
import org.vertx.java.core.impl.ConcurrentHashSet;
import org.vertx.java.core.net.*;
import org.vertx.java.core.net.impl.SocketDefaults;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
@@ -992,7 +996,7 @@ public class NetTest extends VertxTestBase {
@Test
public void testWriteSameBufferMoreThanOnce() throws Exception {
server.connectHandler(socket -> {
final Buffer received = new Buffer();
Buffer received = new Buffer();
socket.dataHandler(buff -> {
received.appendBuffer(buff);
if (received.toString().equals("foofoo")) {
@@ -1011,9 +1015,65 @@ public class NetTest extends VertxTestBase {
await();
}
@Test
public void sendFileClientToServer() throws Exception {
File fDir = Files.createTempDirectory("vertx-test").toFile();
String content = randomUnicodeString(10000);
File file = setupFile(fDir.toString(), "some-file.txt", content);
Buffer expected = new Buffer(content);
Buffer received = new Buffer();
server.connectHandler(sock -> {
sock.dataHandler(buff -> {
received.appendBuffer(buff);
if (received.length() == expected.length()) {
assertTrue(buffersEqual(expected, received));
testComplete();
}
});
});
server.listen(ar -> {
assertTrue(ar.succeeded());
client.connect(1234, ar2 -> {
assertTrue(ar2.succeeded());
NetSocket sock = ar2.result();
sock.sendFile(file.getAbsolutePath());
});
});
await();
}
@Test
public void sendFileServerToClient() throws Exception {
File fDir = Files.createTempDirectory("vertx-test").toFile();
String content = randomUnicodeString(10000);
File file = setupFile(fDir.toString(), "some-file.txt", content);
Buffer expected = new Buffer(content);
Buffer received = new Buffer();
server.connectHandler(sock -> {
sock.sendFile(file.getAbsolutePath());
});
server.listen(ar -> {
assertTrue(ar.succeeded());
client.connect(1234, ar2 -> {
assertTrue(ar2.succeeded());
NetSocket sock = ar2.result();
sock.dataHandler(buff -> {
received.appendBuffer(buff);
if (received.length() == expected.length()) {
assertTrue(buffersEqual(expected, received));
testComplete();
}
});
});
});
await();
}
@Test
public void testSendFileDirectory() throws Exception {
final File fDir = Files.createTempDirectory("vertx-test").toFile();
File fDir = Files.createTempDirectory("vertx-test").toFile();
fDir.deleteOnExit();
server.connectHandler(socket -> {
SocketAddress addr = socket.remoteAddress();
@@ -1141,7 +1201,8 @@ public class NetTest extends VertxTestBase {
server.listen(ar -> {
assertTrue(ar.succeeded());
try {
server.listen(sock -> {});
server.listen(sock -> {
});
fail("Should throw exception");
} catch (IllegalStateException e) {
// OK
@@ -1151,4 +1212,105 @@ public class NetTest extends VertxTestBase {
await();
}
private void checkContext() {
}
// /*
// Make sure all the different handlers are called on the right context
// */
// @Test
// public void testContext() throws Exception {
// File fDir = Files.createTempDirectory("vertx-test").toFile();
// String content = randomUnicodeString(10000);
// File file = setupFile(fDir.toString(), "some-file.txt", content);
// /*
// create conn to server
//
// */
//
// server.connectHandler(sock -> {
//
// checkContext();
//
// sock.ssl(v -> {
// checkContext();
//
// });
//
// sock.sendFile(file.getAbsolutePath(), ar -> {
// checkContext();
// assertTrue(ar.succeeded());
// });
//
//
// sock.dataHandler(buff -> {
// sock.write(buff);
//
// });
//
// sock.endHandler(v -> {
//
// });
// sock.closeHandler(v -> {
//
// });
//
//
// sock.drainHandler(v -> {
//
// });
// sock.exceptionHandler(t -> {
//
// });
//
// });
//
// server.listen(ar -> {
//
// });
//
// client.connect(1234, ar -> {
// assertTrue(ar.succeeded());
// NetSocket sock = ar.result();
// sock.dataHandler(buff -> {
//
//
// });
// sock.endHandler(v -> {
//
// });
// sock.closeHandler(v -> {
//
// });
// sock.sendFile(file.getAbsolutePath(), ar -> {
//
// });
// sock.ssl(v -> {
//
// });
// sock.drainHandler(v -> {
//
// });
// sock.exceptionHandler(t -> {
//
// });
// });
//
//
// await();
// }
private File setupFile(String testDir, String fileName, String content) throws Exception {
File file = new File(testDir, fileName);
if (file.exists()) {
file.delete();
}
file.deleteOnExit();
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8"));
out.write(content);
out.close();
return file;
}
}