Remove the atomic boolean usage in Vertx internal timer that was used in a racy manner, instead use the existing timeouts map and rely on the timeouts map removal to grant ownership of the timer termination (cancellation or timeout). - fixes #2912

This commit is contained in:
Julien Viet
2019-04-17 23:21:06 +02:00
parent edf26e44db
commit ef2b2b2dad
5 changed files with 141 additions and 68 deletions

View File

@@ -244,6 +244,6 @@ public interface Context {
void addCloseHook(Closeable hook);
@GenIgnore(GenIgnore.PERMITTED_TYPE)
void removeCloseHook(Closeable hook);
boolean removeCloseHook(Closeable hook);
}

View File

@@ -52,10 +52,13 @@ class CloseHooks {
* Remove an existing hook.
*
* @param hook the hook to remove
* @return {@code} true if the hook was removed
*/
synchronized void remove(Closeable hook) {
synchronized boolean remove(Closeable hook) {
if (closeHooks != null) {
closeHooks.remove(hook);
return closeHooks.remove(hook);
} else {
return false;
}
}

View File

@@ -122,8 +122,8 @@ abstract class ContextImpl implements ContextInternal {
closeHooks.add(hook);
}
public void removeCloseHook(Closeable hook) {
closeHooks.remove(hook);
public boolean removeCloseHook(Closeable hook) {
return closeHooks.remove(hook);
}
public void runCloseHooks(Handler<AsyncResult<Void>> completionHandler) {

View File

@@ -66,7 +66,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@@ -385,8 +384,8 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
public boolean cancelTimer(long id) {
InternalTimerHandler handler = timeouts.remove(id);
if (handler != null) {
handler.context.removeCloseHook(handler);
return handler.cancel();
handler.cancel();
return true;
} else {
return false;
}
@@ -863,75 +862,79 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
return haManager;
}
private class InternalTimerHandler implements Handler<Void>, Closeable {
final Handler<Long> handler;
final boolean periodic;
final long timerID;
final ContextImpl context;
final java.util.concurrent.Future<?> future;
final AtomicBoolean cancelled;
/**
* Timers are stored in the {@link #timeouts} map at creation time.
* <p/>
* Timers are removed from the {@link #timeouts} map when they are cancelled or are fired. The thread
* removing the timer successfully owns the timer termination (i.e cancel or timer) to avoid race conditions
* between timeout and cancellation.
* <p/>
* This class does not rely on the internal {@link #future} for the termination to handle the worker case
* since the actual timer {@link #handler} execution is scheduled when the {@link #future} executes.
*/
private class InternalTimerHandler implements Handler<Void>, Closeable, Runnable {
boolean cancel() {
if (cancelled.compareAndSet(false, true)) {
if (metrics != null) {
metrics.timerEnded(timerID, true);
}
future.cancel(false);
return true;
} else {
return false;
}
}
private final Handler<Long> handler;
private final boolean periodic;
private final long timerID;
private final ContextImpl context;
private final java.util.concurrent.Future<?> future;
InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextImpl context) {
this.context = context;
this.timerID = timerID;
this.handler = runnable;
this.periodic = periodic;
this.cancelled = new AtomicBoolean();
EventLoop el = context.nettyEventLoop();
Runnable toRun = () -> context.runOnContext(this);
if (periodic) {
future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
future = el.scheduleAtFixedRate(this, delay, delay, TimeUnit.MILLISECONDS);
} else {
future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS);
future = el.schedule(this, delay, TimeUnit.MILLISECONDS);
}
if (metrics != null) {
metrics.timerCreated(timerID);
}
}
@Override
public void run() {
context.executeFromIO(this);
}
public void handle(Void v) {
if (!cancelled.get()) {
if (periodic) {
if (timeouts.containsKey(timerID)) {
handler.handle(timerID);
}
} else if (timeouts.remove(timerID) != null) {
try {
handler.handle(timerID);
} finally {
if (!periodic) {
// Clean up after it's fired
cleanupNonPeriodic();
// Clean up after it's fired
if (context.removeCloseHook(this) && metrics != null) {
metrics.timerEnded(timerID, false);
}
}
}
}
private void cleanupNonPeriodic() {
VertxImpl.this.timeouts.remove(timerID);
if (metrics != null) {
metrics.timerEnded(timerID, false);
}
ContextImpl context = getContext();
if (context != null) {
context.removeCloseHook(this);
private void cancel() {
future.cancel(false);
if (context.removeCloseHook(this) && metrics != null) {
metrics.timerEnded(timerID, true);
}
}
// Called via Context close hook when Verticle is undeployed
public void close(Handler<AsyncResult<Void>> completionHandler) {
VertxImpl.this.timeouts.remove(timerID);
cancel();
if (timeouts.remove(timerID) != null) {
future.cancel(false);
if (metrics != null) {
metrics.timerEnded(timerID, true);
}
}
completionHandler.handle(Future.succeededFuture());
}
}
/*

View File

@@ -11,11 +11,6 @@
package io.vertx.core;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.TimeoutStream;
import io.vertx.core.Vertx;
import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.VertxTestBase;
import org.junit.Test;
@@ -23,6 +18,7 @@ import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
@@ -30,20 +26,20 @@ import java.util.concurrent.atomic.AtomicLong;
public class TimerTest extends VertxTestBase {
@Test
public void testTimer() throws Exception {
public void testTimer() {
timer(1);
}
@Test
public void testPeriodic() throws Exception {
public void testPeriodic() {
periodic(10);
}
@Test
/**
* Test the timers fire with approximately the correct delay
*/
public void testTimings() throws Exception {
@Test
public void testTimings() {
final long start = System.currentTimeMillis();
final long delay = 2000;
vertx.setTimer(delay, timerID -> {
@@ -58,7 +54,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testInVerticle() throws Exception {
public void testInVerticle() {
class MyVerticle extends AbstractVerticle {
AtomicInteger cnt = new AtomicInteger();
@Override
@@ -83,7 +79,7 @@ public class TimerTest extends VertxTestBase {
await();
}
private void periodic(long delay) throws Exception {
private void periodic(long delay) {
final int numFires = 10;
final AtomicLong id = new AtomicLong(-1);
id.set(vertx.setPeriodic(delay, new Handler<Long>() {
@@ -104,7 +100,7 @@ public class TimerTest extends VertxTestBase {
await();
}
private void timer(long delay) throws Exception {
private void timer(long delay) {
final AtomicLong id = new AtomicLong(-1);
id.set(vertx.setTimer(delay, new Handler<Long>() {
int count;
@@ -128,7 +124,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testTimerStreamSetHandlerSchedulesTheTimer() throws Exception {
public void testTimerStreamSetHandlerSchedulesTheTimer() {
vertx.runOnContext(v -> {
ReadStream<Long> timer = vertx.timerStream(200);
AtomicBoolean handled = new AtomicBoolean();
@@ -145,7 +141,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testTimerStreamExceptionDuringHandle() throws Exception {
public void testTimerStreamExceptionDuringHandle() {
vertx.runOnContext(v -> {
ReadStream<Long> timer = vertx.timerStream(200);
AtomicBoolean handled = new AtomicBoolean();
@@ -163,7 +159,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testTimerStreamCallingWithNullHandlerCancelsTheTimer() throws Exception {
public void testTimerStreamCallingWithNullHandlerCancelsTheTimer() {
vertx.runOnContext(v -> {
ReadStream<Long> timer = vertx.timerStream(200);
AtomicInteger count = new AtomicInteger();
@@ -183,7 +179,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testTimerStreamCancellation() throws Exception {
public void testTimerStreamCancellation() {
vertx.runOnContext(v -> {
TimeoutStream timer = vertx.timerStream(200);
AtomicBoolean called = new AtomicBoolean();
@@ -200,7 +196,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testTimerSetHandlerTwice() throws Exception {
public void testTimerSetHandlerTwice() {
vertx.runOnContext(v -> {
ReadStream<Long> timer = vertx.timerStream(200);
timer.handler(l -> testComplete());
@@ -214,7 +210,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testTimerPauseResume() throws Exception {
public void testTimerPauseResume() {
ReadStream<Long> timer = vertx.timerStream(10);
timer.handler(l -> testComplete());
timer.pause();
@@ -223,7 +219,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testTimerPause() throws Exception {
public void testTimerPause() {
vertx.runOnContext(v -> {
ReadStream<Long> timer = vertx.timerStream(10);
timer.handler(l -> fail());
@@ -234,7 +230,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testPeriodicStreamHandler() throws Exception {
public void testPeriodicStreamHandler() {
TimeoutStream timer = vertx.periodicStream(10);
AtomicInteger count = new AtomicInteger();
timer.handler(l -> {
@@ -259,7 +255,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testPeriodicSetHandlerTwice() throws Exception {
public void testPeriodicSetHandlerTwice() {
vertx.runOnContext(v -> {
ReadStream<Long> timer = vertx.periodicStream(200);
timer.handler(l -> testComplete());
@@ -273,7 +269,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testPeriodicPauseResume() throws Exception {
public void testPeriodicPauseResume() {
ReadStream<Long> timer = vertx.periodicStream(200);
AtomicInteger count = new AtomicInteger();
timer.handler(id -> {
@@ -307,7 +303,7 @@ public class TimerTest extends VertxTestBase {
}
@Test
public void testCancelTimerWhenScheduledOnWorker() throws Exception {
public void testCancelTimerWhenScheduledOnWorker() {
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
@@ -321,4 +317,75 @@ public class TimerTest extends VertxTestBase {
}, new DeploymentOptions().setWorker(true));
await();
}
@Test
public void testWorkerTimer() {
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
vertx.setTimer(10, id -> {
assertTrue(Context.isOnWorkerThread());
testComplete();
});
}
}, new DeploymentOptions().setWorker(true));
await();
}
@Test
public void testFailInTimer() {
RuntimeException failure = new RuntimeException();
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v -> {
ctx.exceptionHandler(err -> {
assertSame(err, failure);
testComplete();
});
vertx.setTimer(5, id -> {
throw failure;
});
});
await();
}
@Test
public void testCancellationRace() throws Exception {
for (int i = 0;i < 200;i++) {
AtomicBoolean fired = new AtomicBoolean();
long timerId = vertx.setTimer(5, id -> {
fired.set(true);
});
Thread.sleep(5);
boolean res = vertx.cancelTimer(timerId);
if (res && fired.get()) {
throw new AssertionError("It failed " + i);
}
}
}
@Test
public void testUndeployCancelTimer() {
testUndeployCancellation(() -> vertx.setTimer(1000, id -> {}));
}
@Test
public void testUndeployCancelPeriodic() {
testUndeployCancellation(() -> vertx.setPeriodic(1000, id -> {}));
}
private void testUndeployCancellation(Supplier<Long> f) {
AtomicLong timer = new AtomicLong();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
timer.set(f.get());
}
}, onSuccess(deployment -> {
vertx.undeploy(deployment, v -> {
assertFalse(vertx.cancelTimer(timer.get()));
testComplete();
});
}));
await();
}
}