From c6f5874f980301ec6faeaf2203dd53246d4f4a8e Mon Sep 17 00:00:00 2001 From: Pid Date: Fri, 26 Oct 2012 12:49:28 +0100 Subject: [PATCH] checkpoint --- .../core/eventbus/impl/DefaultEventBus.java | 11 +-- .../core/http/impl/DefaultHttpServer.java | 16 ++-- .../vertx/java/core/impl/DefaultVertx.java | 10 +-- .../core/impl/VertxThreadPoolExecutor.java | 47 +++++++++- .../vertx/java/core/jmx/HttpServerMXBean.java | 4 +- .../vertx/java/core/jmx/HttpServerProxy.java | 14 ++- .../core/jmx/{VertxJMX.java => JmxUtil.java} | 90 +++++++++++-------- .../vertx/java/core/jmx/NetServerMXBean.java | 2 +- .../vertx/java/core/jmx/NetServerProxy.java | 4 +- .../java/core/jmx/SockJSServerMXBean.java | 7 ++ .../java/core/jmx/SockJSServerProxy.java | 22 +++++ .../java/core/net/impl/DefaultNetServer.java | 12 +-- .../java/deploy/impl/DeploymentMXBean.java | 13 +++ .../java/deploy/impl/DeploymentProxy.java | 31 +++++++ .../java/deploy/impl/VerticleManager.java | 10 +++ .../deploy/impl/VerticleManagerMXBean.java | 13 +++ .../deploy/impl/VerticleManagerProxy.java | 71 +++++++++++++++ 17 files changed, 309 insertions(+), 68 deletions(-) rename vertx-core/src/main/java/org/vertx/java/core/jmx/{VertxJMX.java => JmxUtil.java} (57%) create mode 100644 vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerMXBean.java create mode 100644 vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerProxy.java create mode 100644 vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentMXBean.java create mode 100644 vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentProxy.java create mode 100644 vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerMXBean.java create mode 100644 vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerProxy.java diff --git a/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/DefaultEventBus.java b/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/DefaultEventBus.java index 6d9fe158a..e4ee9c07f 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/DefaultEventBus.java +++ b/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/DefaultEventBus.java @@ -27,7 +27,7 @@ import org.vertx.java.core.eventbus.impl.hazelcast.HazelcastClusterManager; import org.vertx.java.core.impl.Context; import org.vertx.java.core.impl.VertxInternal; import org.vertx.java.core.jmx.EventBusMXBean; -import org.vertx.java.core.jmx.VertxJMX; +import org.vertx.java.core.jmx.JmxUtil; import org.vertx.java.core.json.JsonArray; import org.vertx.java.core.json.JsonObject; import org.vertx.java.core.logging.Logger; @@ -76,7 +76,7 @@ public class DefaultEventBus implements EventBus, EventBusMXBean { this.serverID = new ServerID(DEFAULT_CLUSTER_PORT, "localhost"); this.server = null; this.subs = null; - VertxJMX.register(this); + JmxUtil.register(this); } public DefaultEventBus(VertxInternal vertx, String hostname) { @@ -89,7 +89,8 @@ public class DefaultEventBus implements EventBus, EventBusMXBean { ClusterManager mgr = new HazelcastClusterManager(vertx); subs = mgr.getSubsMap("subs"); this.server = setServer(); - VertxJMX.register(this, port, "localhost"); + JmxUtil.unregisterEventBus(); + JmxUtil.register(this, port, "localhost"); } @Override @@ -295,7 +296,7 @@ public class DefaultEventBus implements EventBus, EventBusMXBean { callCompletionHandler(completionHandler); } getHandlerCloseHook(context).entries.remove(new HandlerEntry(address, handler)); - VertxJMX.unregister(handler, address); + JmxUtil.unregister(handler, address); return; } } @@ -451,7 +452,7 @@ public class DefaultEventBus implements EventBus, EventBusMXBean { } } getHandlerCloseHook(context).entries.add(new HandlerEntry(address, handler)); - VertxJMX.register(handler, address); + JmxUtil.register(handler, address); } private HandlerCloseHook getHandlerCloseHook(Context context) { diff --git a/vertx-core/src/main/java/org/vertx/java/core/http/impl/DefaultHttpServer.java b/vertx-core/src/main/java/org/vertx/java/core/http/impl/DefaultHttpServer.java index 9366d486c..4f721323e 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/http/impl/DefaultHttpServer.java +++ b/vertx-core/src/main/java/org/vertx/java/core/http/impl/DefaultHttpServer.java @@ -41,7 +41,7 @@ import org.vertx.java.core.http.impl.ws.hybi08.Handshake08; import org.vertx.java.core.http.impl.ws.hybi17.HandshakeRFC6455; import org.vertx.java.core.impl.Context; import org.vertx.java.core.impl.VertxInternal; -import org.vertx.java.core.jmx.VertxJMX; +import org.vertx.java.core.jmx.JmxUtil; import org.vertx.java.core.logging.Logger; import org.vertx.java.core.logging.impl.LoggerFactory; import org.vertx.java.core.net.impl.*; @@ -51,7 +51,7 @@ import java.net.*; import java.nio.charset.Charset; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET; @@ -203,12 +203,12 @@ public class DefaultHttpServer implements HttpServer { } } listening = true; - VertxJMX.register(this, id.host, port); + JmxUtil.register(this, id.host, port); return this; } public void close() { - // TODO VertxJMX.unregister(this, port); + // TODO JmxUtil.unregister(this, port); close(null); } @@ -249,6 +249,10 @@ public class DefaultHttpServer implements HttpServer { wsHandler = null; } + public long getRequestCount() { + return requestCount.get(); + } + public HttpServer setSSL(boolean ssl) { tcpHelper.setSSL(ssl); return this; @@ -379,7 +383,7 @@ public class DefaultHttpServer implements HttpServer { conn.internalClose(); } - VertxJMX.unregisterHttpServer(id.host, id.port); + JmxUtil.unregisterHttpServer(id.host, id.port); // We need to reset it since sock.internalClose() above can call into the close handlers of sockets on the same thread // which can cause context id for the thread to change! @@ -404,7 +408,7 @@ public class DefaultHttpServer implements HttpServer { }); } - private static final AtomicInteger count = new AtomicInteger(0); + private final AtomicLong requestCount = new AtomicLong(0); public class ServerHandler extends SimpleChannelUpstreamHandler { diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/DefaultVertx.java b/vertx-core/src/main/java/org/vertx/java/core/impl/DefaultVertx.java index b865c5ddd..1caab25a5 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/DefaultVertx.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/DefaultVertx.java @@ -30,7 +30,7 @@ import org.vertx.java.core.http.HttpClient; import org.vertx.java.core.http.HttpServer; import org.vertx.java.core.http.impl.DefaultHttpClient; import org.vertx.java.core.http.impl.DefaultHttpServer; -import org.vertx.java.core.jmx.VertxJMX; +import org.vertx.java.core.jmx.JmxUtil; import org.vertx.java.core.jmx.VertxMXBean; import org.vertx.java.core.logging.Logger; import org.vertx.java.core.logging.impl.LoggerFactory; @@ -98,7 +98,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean { */ private void configure() { this.backgroundPoolSize = Integer.getInteger("vertx.backgroundPoolSize", 20); - VertxJMX.register(this); + JmxUtil.register(this); } public NetServer createNetServer() { @@ -187,7 +187,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean { result = backgroundPool; if (result == null) { backgroundPool = result = VertxExecutors.newThreadPool(backgroundPoolSize, "vert.x-worker-thread-", false); - VertxJMX.register(backgroundPool, "pool=Worker"); + JmxUtil.register(backgroundPool, "pool=Worker"); orderedFact = new OrderedExecutorFactory(backgroundPool); } } @@ -203,7 +203,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean { result = workerPool; if (result == null) { ExecutorService corePool = VertxExecutors.newThreadPool(corePoolSize, "vert.x-core-thread-", false); - VertxJMX.register(corePool, "pool=Core"); + JmxUtil.register(corePool, "pool=Core"); workerPool = result = new NioWorkerPool(corePool, corePoolSize); } } @@ -221,7 +221,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean { result = acceptorPool; if (result == null) { acceptorPool = result = VertxExecutors.newCachedThreadPool("vert.x-acceptor-thread-"); - VertxJMX.register(acceptorPool, "pool=Acceptor"); + JmxUtil.register(acceptorPool, "pool=Acceptor"); } } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/VertxThreadPoolExecutor.java b/vertx-core/src/main/java/org/vertx/java/core/impl/VertxThreadPoolExecutor.java index ad4bee317..c159bc98d 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/VertxThreadPoolExecutor.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/VertxThreadPoolExecutor.java @@ -8,14 +8,53 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -public class VertxThreadPoolExecutor extends ThreadPoolExecutor implements ExecutorServiceMXBean { +import javax.management.ListenerNotFoundException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.NotificationBroadcaster; +import javax.management.NotificationBroadcasterSupport; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; + +public class VertxThreadPoolExecutor extends ThreadPoolExecutor implements ExecutorServiceMXBean, NotificationBroadcaster { + + private final NotificationBroadcasterSupport support = new NotificationBroadcasterSupport(); + + private final AtomicLong sequence = new AtomicLong(0L); private AtomicLong waitingTaskCount = new AtomicLong(0L); + public static final String QUEUE_NOT_ZERO = "queue.not.zero"; + public VertxThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } + @Override + public void addNotificationListener(NotificationListener listener, + NotificationFilter filter, Object handback) + throws IllegalArgumentException { + support.addNotificationListener(listener, filter, handback); + } + + @Override + public void removeNotificationListener(NotificationListener listener) + throws ListenerNotFoundException { + support.removeNotificationListener(listener); + } + + @Override + public MBeanNotificationInfo[] getNotificationInfo() { + return support.getNotificationInfo(); + } + + private void fireQueueNotification(long size) { + long timestamp = System.currentTimeMillis(); + String msg = String.format("Executor queue size is %d", size); + Notification notification = new Notification(QUEUE_NOT_ZERO, this, sequence.incrementAndGet(), timestamp, msg); + support.sendNotification(notification); + } + @Override public int getWorkQueueSize() { return super.getQueue().size(); @@ -42,6 +81,12 @@ public class VertxThreadPoolExecutor extends ThreadPoolExecutor implements Execu protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); waitingTaskCount.decrementAndGet(); + + // rough stab at an event if the queue is larger than 1 + long size = waitingTaskCount.get(); + if (size > 1) { + fireQueueNotification(size); + } } @Override diff --git a/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerMXBean.java b/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerMXBean.java index 2cd0ed5f7..88b4a5859 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerMXBean.java +++ b/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerMXBean.java @@ -4,8 +4,10 @@ public interface HttpServerMXBean { String getHost(); + long getRequestCount(); + int getPort(); - void close(); + String getObjectName(); } diff --git a/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerProxy.java b/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerProxy.java index 2a8f683ff..991447a28 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerProxy.java +++ b/vertx-core/src/main/java/org/vertx/java/core/jmx/HttpServerProxy.java @@ -1,17 +1,18 @@ package org.vertx.java.core.jmx; import org.vertx.java.core.http.HttpServer; +import org.vertx.java.core.http.impl.DefaultHttpServer; public class HttpServerProxy implements HttpServerMXBean { - private HttpServer delegate; + private DefaultHttpServer delegate; private int port; private String host; - public HttpServerProxy(HttpServer delegate, String host, int port) { + public HttpServerProxy(DefaultHttpServer delegate, String host, int port) { this.delegate = delegate; this.host = host; this.port = port; @@ -28,8 +29,13 @@ public class HttpServerProxy implements HttpServerMXBean { } @Override - public void close() { - delegate.close(); + public long getRequestCount() { + return delegate.getRequestCount(); + } + + @Override + public String getObjectName() { + return String.format("org.vertx:type=HttpServer,name=%s[%s]", host, port); } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/jmx/VertxJMX.java b/vertx-core/src/main/java/org/vertx/java/core/jmx/JmxUtil.java similarity index 57% rename from vertx-core/src/main/java/org/vertx/java/core/jmx/VertxJMX.java rename to vertx-core/src/main/java/org/vertx/java/core/jmx/JmxUtil.java index e5291aa61..a319d03b6 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/jmx/VertxJMX.java +++ b/vertx-core/src/main/java/org/vertx/java/core/jmx/JmxUtil.java @@ -14,13 +14,15 @@ import javax.management.ObjectName; import org.vertx.java.core.Handler; import org.vertx.java.core.VertxRuntimeException; import org.vertx.java.core.eventbus.EventBus; -import org.vertx.java.core.http.HttpServer; +import org.vertx.java.core.http.impl.DefaultHttpServer; import org.vertx.java.core.impl.DefaultVertx; //import org.vertx.java.core.http.ServerWebSocket; -import org.vertx.java.core.net.NetServer; +import org.vertx.java.core.net.impl.DefaultNetServer; import org.vertx.java.core.sockjs.SockJSServer; +import org.vertx.java.core.sockjs.impl.DefaultSockJSServer; -public class VertxJMX { + +public class JmxUtil { private static final String DOMAIN = "org.vertx"; @@ -40,8 +42,15 @@ public class VertxJMX { } } + public static void unregisterEventBus() { + ObjectName name = name("type=EventBus"); + if (mbeanServer.isRegistered(name)) { + unregisterMBean(name); + } + } + public static void register(EventBus eventBus, int port, String host) { - ObjectName name = name("type=EventBus,host=" + host + ",port=" + port); + ObjectName name = name(String.format("type=EventBus-%s[%s]", host, port)); if (!mbeanServer.isRegistered(name)) { registerMBean(eventBus, name); } @@ -64,50 +73,49 @@ public class VertxJMX { } - public static void register(NetServer server, String host, int port) { - ObjectName name = name("type=NetServer,host="+host+",port="+port); - if (!mbeanServer.isRegistered(name)) { - registerMBean(new NetServerProxy(server, host, port), name); - } + public static void register(DefaultNetServer server, String host, int port) { + NetServerProxy proxy = new NetServerProxy(server, host, port); + ObjectName name = wrap(proxy.getObjectName()); + registerMBean(proxy, name); } public static void unregisterNetServer(String host, int port) { - ObjectName name = name("type=NetServer,host="+host+",port="+port); - if (mbeanServer.isRegistered(name)) { - unregisterMBean(name); - } + ObjectName name = name(String.format("type=NetServer,name=%s[%s]", host, port)); + unregisterMBean(name); } - public static void register(HttpServer server, String host, int port) { - ObjectName name = name("type=HttpServer,host="+host+",port="+port); - if (!mbeanServer.isRegistered(name)) { - registerMBean(new HttpServerProxy(server, host, port), name); - } + public static void register(DefaultHttpServer server, String host, int port) { + HttpServerProxy proxy = new HttpServerProxy(server, host, port); + ObjectName name = wrap(proxy.getObjectName()); + registerMBean(proxy, name); } public static void unregisterHttpServer(String host, int port) { - ObjectName name = name("type=HttpServer,host="+host+",port="+port); - if (mbeanServer.isRegistered(name)) { - unregisterMBean(name); - } + ObjectName name = name(String.format("type=HttpServer,name=%s[%s]", host, port)); + unregisterMBean(name); } - public static void register(SockJSServer server, String host, int port) { -// Map keys = new HashMap<>(); -// keys.put("type", "Vertx"); -// keys.put("server", "SockJSServer"); -// keys.put("port", String.valueOf(port)); -// ObjectName name = name(keys); -// if (!mbeanServer.isRegistered(name)) { -// registerMBean(server, name); -// } + public static void register(DefaultSockJSServer server, String host, int port) { + SockJSServerProxy proxy = new SockJSServerProxy(server, host, port); + ObjectName name = wrap(proxy.getObjectName()); + registerMBean(proxy, name); } public static void unregister(SockJSServer server, String host, int port) { - + ObjectName name = name(String.format("type=SockJSServer,name='%s[%s]'", host, port)); + unregisterMBean(name); } - private static ObjectName name(String keys) { + public static ObjectName wrap(String keys) { + try { + return new ObjectName(keys); + + } catch (MalformedObjectNameException e) { + throw new VertxRuntimeException(e); + } + } + + public static ObjectName name(String keys) { try { return new ObjectName(DOMAIN + ":" + keys); @@ -116,9 +124,15 @@ public class VertxJMX { } } - private static void registerMBean(Object bean, ObjectName name) { + public static void registerMBean(Object bean, String name) { + registerMBean(bean, wrap(name)); + } + + public static void registerMBean(Object bean, ObjectName name) { try { - mbeanServer.registerMBean(bean, name); + if (!mbeanServer.isRegistered(name)) { + mbeanServer.registerMBean(bean, name); + } } catch (InstanceAlreadyExistsException e) { throw new VertxRuntimeException(e); } catch (MBeanRegistrationException e) { @@ -128,9 +142,11 @@ public class VertxJMX { } } - private static void unregisterMBean(ObjectName name) { + public static void unregisterMBean(ObjectName name) { try { - mbeanServer.unregisterMBean(name); + if (mbeanServer.isRegistered(name)) { + mbeanServer.unregisterMBean(name); + } } catch (InstanceNotFoundException e) { throw new VertxRuntimeException(e); } catch (MBeanRegistrationException e) { diff --git a/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerMXBean.java b/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerMXBean.java index bbbef87a3..0c1d35221 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerMXBean.java +++ b/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerMXBean.java @@ -6,6 +6,6 @@ public interface NetServerMXBean { int getPort(); - void close(); + String getObjectName(); } diff --git a/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerProxy.java b/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerProxy.java index b7599ab79..17516cf04 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerProxy.java +++ b/vertx-core/src/main/java/org/vertx/java/core/jmx/NetServerProxy.java @@ -27,8 +27,8 @@ public class NetServerProxy implements NetServerMXBean { } @Override - public void close() { - delegate.close(); + public String getObjectName() { + return String.format("org.vertx:type=NetServer,name=%s[%s]", host, port); } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerMXBean.java b/vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerMXBean.java new file mode 100644 index 000000000..5f8db6883 --- /dev/null +++ b/vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerMXBean.java @@ -0,0 +1,7 @@ +package org.vertx.java.core.jmx; + +public interface SockJSServerMXBean { + + String getObjectName(); + +} diff --git a/vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerProxy.java b/vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerProxy.java new file mode 100644 index 000000000..7c52d225a --- /dev/null +++ b/vertx-core/src/main/java/org/vertx/java/core/jmx/SockJSServerProxy.java @@ -0,0 +1,22 @@ +package org.vertx.java.core.jmx; + +import org.vertx.java.core.sockjs.impl.DefaultSockJSServer; + +public class SockJSServerProxy implements SockJSServerMXBean { + + private DefaultSockJSServer server; + private String host; + private int port; + + public SockJSServerProxy(DefaultSockJSServer server, String host, int port) { + this.server = server; + this.host = host; + this.port = port; + } + + @Override + public String getObjectName() { + return String.format("org.vertx:type=SockJSServer,name=%s[%s]", host, port); + } + +} diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/DefaultNetServer.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/DefaultNetServer.java index 7a3df678a..c7043cadd 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/DefaultNetServer.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/DefaultNetServer.java @@ -33,7 +33,7 @@ import org.vertx.java.core.Handler; import org.vertx.java.core.buffer.Buffer; import org.vertx.java.core.impl.Context; import org.vertx.java.core.impl.VertxInternal; -import org.vertx.java.core.jmx.VertxJMX; +import org.vertx.java.core.jmx.JmxUtil; import org.vertx.java.core.logging.Logger; import org.vertx.java.core.logging.impl.LoggerFactory; import org.vertx.java.core.net.NetServer; @@ -57,7 +57,7 @@ public class DefaultNetServer implements NetServer { private final VertxInternal vertx; private final Context ctx; private final TCPSSLHelper tcpHelper = new TCPSSLHelper(); - private final Map socketMap = new ConcurrentHashMap(); + private final Map socketMap = new ConcurrentHashMap<>(); private Handler connectHandler; private ChannelGroup serverChannelGroup; private boolean listening; @@ -160,7 +160,7 @@ public class DefaultNetServer implements NetServer { } actualServer.handlerManager.addHandler(connectHandler, ctx); } - VertxJMX.register(this, id.host, port); + JmxUtil.register(this, id.host, port); return this; } @@ -205,7 +205,7 @@ public class DefaultNetServer implements NetServer { sock.internalClose(); } - VertxJMX.unregisterNetServer(id.host, id.port); + JmxUtil.unregisterNetServer(id.host, id.port); // We need to reset it since sock.internalClose() above can call into the close handlers of sockets on the same thread // which can cause context id for the thread to change! @@ -372,7 +372,7 @@ public class DefaultNetServer implements NetServer { NioWorker worker = ch.getWorker(); //Choose a handler - final HandlerHolder handler = handlerManager.chooseHandler(worker); + final HandlerHolder handler = handlerManager.chooseHandler(worker); if (handler == null) { //Ignore @@ -399,7 +399,7 @@ public class DefaultNetServer implements NetServer { } } - private void connected(final NioSocketChannel ch, final HandlerHolder handler) { + private void connected(final NioSocketChannel ch, final HandlerHolder handler) { handler.context.execute(new Runnable() { public void run() { DefaultNetSocket sock = new DefaultNetSocket(vertx, ch, handler.context); diff --git a/vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentMXBean.java b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentMXBean.java new file mode 100644 index 000000000..1517c2561 --- /dev/null +++ b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentMXBean.java @@ -0,0 +1,13 @@ +package org.vertx.java.deploy.impl; + +public interface DeploymentMXBean { + + String getName(); + + int getInstances(); + + boolean getAutoRedeploy(); + + String getObjectName(); + +} diff --git a/vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentProxy.java b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentProxy.java new file mode 100644 index 000000000..a9b5eb381 --- /dev/null +++ b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/DeploymentProxy.java @@ -0,0 +1,31 @@ +package org.vertx.java.deploy.impl; + +public class DeploymentProxy implements DeploymentMXBean { + + private Deployment deployment; + + public DeploymentProxy(Deployment deployment) { + this.deployment = deployment; + } + + @Override + public String getName() { + return deployment.name; + } + + @Override + public int getInstances() { + return deployment.instances; + } + + @Override + public boolean getAutoRedeploy() { + return deployment.autoRedeploy; + } + + @Override + public String getObjectName() { + return String.format("org.vertx:type=Deployment,name=%s[%s]", deployment.name, deployment.instances); + } + +} diff --git a/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManager.java b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManager.java index 44647172c..93e866d93 100644 --- a/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManager.java +++ b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManager.java @@ -28,6 +28,7 @@ import org.vertx.java.core.impl.BlockingAction; import org.vertx.java.core.impl.Context; import org.vertx.java.core.impl.VertxInternal; import org.vertx.java.core.impl.WorkerContext; +import org.vertx.java.core.jmx.JmxUtil; import org.vertx.java.core.json.DecodeException; import org.vertx.java.core.json.JsonObject; import org.vertx.java.core.logging.Logger; @@ -49,6 +50,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import javax.management.ObjectName; + /** * * This class could benefit from some refactoring @@ -115,6 +118,8 @@ public class VerticleManager implements ModuleReloader { } } } + VerticleManagerProxy proxy = new VerticleManagerProxy(this); + JmxUtil.registerMBean(proxy, proxy.getObjectName()); } public void block() { @@ -680,6 +685,9 @@ public class VerticleManager implements ModuleReloader { final ClassLoader sharedLoader = worker ? new ParentLastURLClassLoader(urls, getClass() .getClassLoader()): null; + ObjectName on = JmxUtil.name(String.format("type=Deployment,name=%s[%s]", deployment.name, deployment.instances)); + JmxUtil.registerMBean(new DeploymentProxy(deployment), on); + for (int i = 0; i < instances; i++) { // Launch the verticle instance @@ -832,6 +840,8 @@ public class VerticleManager implements ModuleReloader { } }); } + ObjectName on = JmxUtil.name(String.format("type=Deployment,name=%s[%s]", deployment.name, deployment.instances)); + JmxUtil.unregisterMBean(on); } if (deployment.parentDeploymentName != null) { diff --git a/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerMXBean.java b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerMXBean.java new file mode 100644 index 000000000..7946495ec --- /dev/null +++ b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerMXBean.java @@ -0,0 +1,13 @@ +package org.vertx.java.deploy.impl; + +import java.util.Map; + +public interface VerticleManagerMXBean { + + String getObjectName(); + + Map getDeployments(); + + void undeploy(String name); + +} diff --git a/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerProxy.java b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerProxy.java new file mode 100644 index 000000000..2c04dacf8 --- /dev/null +++ b/vertx-platform/src/main/java/org/vertx/java/deploy/impl/VerticleManagerProxy.java @@ -0,0 +1,71 @@ +package org.vertx.java.deploy.impl; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.ListenerNotFoundException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.NotificationBroadcaster; +import javax.management.NotificationBroadcasterSupport; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; + +import org.vertx.java.core.Handler; + + +public class VerticleManagerProxy implements VerticleManagerMXBean, NotificationBroadcaster { + + private static final String UNDEPLOY_EVENT = "undeploy"; + + private final NotificationBroadcasterSupport support = new NotificationBroadcasterSupport(); + + private final AtomicLong sequence = new AtomicLong(0L); + + private VerticleManager verticleManager; + + public VerticleManagerProxy(VerticleManager verticleManager) { + this.verticleManager = verticleManager; + } + + @Override + public String getObjectName() { + return String.format("org.vertx:type=VerticleManager"); + } + + @Override + public Map getDeployments() { + return verticleManager.listInstances(); + } + + @Override + public void undeploy(final String name) { + verticleManager.undeploy(name, new Handler() { + public void handle(Void event) { + String msg = String.format("Undeployed %s", name); + long timestamp = System.currentTimeMillis(); + Notification notification = new Notification(UNDEPLOY_EVENT, VerticleManagerProxy.this, sequence.incrementAndGet(), timestamp, msg); + support.sendNotification(notification); + } + }); + } + + @Override + public void addNotificationListener(NotificationListener listener, + NotificationFilter filter, Object handback) + throws IllegalArgumentException { + support.addNotificationListener(listener, filter, handback); + } + + @Override + public void removeNotificationListener(NotificationListener listener) + throws ListenerNotFoundException { + support.removeNotificationListener(listener); + } + + @Override + public MBeanNotificationInfo[] getNotificationInfo() { + return support.getNotificationInfo(); + } + +}