checkpoint

This commit is contained in:
Pid
2012-10-26 12:49:28 +01:00
parent ef910c2aad
commit c6f5874f98
17 changed files with 309 additions and 68 deletions

View File

@@ -27,7 +27,7 @@ import org.vertx.java.core.eventbus.impl.hazelcast.HazelcastClusterManager;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.jmx.EventBusMXBean;
import org.vertx.java.core.jmx.VertxJMX;
import org.vertx.java.core.jmx.JmxUtil;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
@@ -76,7 +76,7 @@ public class DefaultEventBus implements EventBus, EventBusMXBean {
this.serverID = new ServerID(DEFAULT_CLUSTER_PORT, "localhost");
this.server = null;
this.subs = null;
VertxJMX.register(this);
JmxUtil.register(this);
}
public DefaultEventBus(VertxInternal vertx, String hostname) {
@@ -89,7 +89,8 @@ public class DefaultEventBus implements EventBus, EventBusMXBean {
ClusterManager mgr = new HazelcastClusterManager(vertx);
subs = mgr.getSubsMap("subs");
this.server = setServer();
VertxJMX.register(this, port, "localhost");
JmxUtil.unregisterEventBus();
JmxUtil.register(this, port, "localhost");
}
@Override
@@ -295,7 +296,7 @@ public class DefaultEventBus implements EventBus, EventBusMXBean {
callCompletionHandler(completionHandler);
}
getHandlerCloseHook(context).entries.remove(new HandlerEntry(address, handler));
VertxJMX.unregister(handler, address);
JmxUtil.unregister(handler, address);
return;
}
}
@@ -451,7 +452,7 @@ public class DefaultEventBus implements EventBus, EventBusMXBean {
}
}
getHandlerCloseHook(context).entries.add(new HandlerEntry(address, handler));
VertxJMX.register(handler, address);
JmxUtil.register(handler, address);
}
private HandlerCloseHook getHandlerCloseHook(Context context) {

View File

@@ -41,7 +41,7 @@ import org.vertx.java.core.http.impl.ws.hybi08.Handshake08;
import org.vertx.java.core.http.impl.ws.hybi17.HandshakeRFC6455;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.jmx.VertxJMX;
import org.vertx.java.core.jmx.JmxUtil;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
import org.vertx.java.core.net.impl.*;
@@ -51,7 +51,7 @@ import java.net.*;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
@@ -203,12 +203,12 @@ public class DefaultHttpServer implements HttpServer {
}
}
listening = true;
VertxJMX.register(this, id.host, port);
JmxUtil.register(this, id.host, port);
return this;
}
public void close() {
// TODO VertxJMX.unregister(this, port);
// TODO JmxUtil.unregister(this, port);
close(null);
}
@@ -249,6 +249,10 @@ public class DefaultHttpServer implements HttpServer {
wsHandler = null;
}
public long getRequestCount() {
return requestCount.get();
}
public HttpServer setSSL(boolean ssl) {
tcpHelper.setSSL(ssl);
return this;
@@ -379,7 +383,7 @@ public class DefaultHttpServer implements HttpServer {
conn.internalClose();
}
VertxJMX.unregisterHttpServer(id.host, id.port);
JmxUtil.unregisterHttpServer(id.host, id.port);
// We need to reset it since sock.internalClose() above can call into the close handlers of sockets on the same thread
// which can cause context id for the thread to change!
@@ -404,7 +408,7 @@ public class DefaultHttpServer implements HttpServer {
});
}
private static final AtomicInteger count = new AtomicInteger(0);
private final AtomicLong requestCount = new AtomicLong(0);
public class ServerHandler extends SimpleChannelUpstreamHandler {

View File

@@ -30,7 +30,7 @@ import org.vertx.java.core.http.HttpClient;
import org.vertx.java.core.http.HttpServer;
import org.vertx.java.core.http.impl.DefaultHttpClient;
import org.vertx.java.core.http.impl.DefaultHttpServer;
import org.vertx.java.core.jmx.VertxJMX;
import org.vertx.java.core.jmx.JmxUtil;
import org.vertx.java.core.jmx.VertxMXBean;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
@@ -98,7 +98,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean {
*/
private void configure() {
this.backgroundPoolSize = Integer.getInteger("vertx.backgroundPoolSize", 20);
VertxJMX.register(this);
JmxUtil.register(this);
}
public NetServer createNetServer() {
@@ -187,7 +187,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean {
result = backgroundPool;
if (result == null) {
backgroundPool = result = VertxExecutors.newThreadPool(backgroundPoolSize, "vert.x-worker-thread-", false);
VertxJMX.register(backgroundPool, "pool=Worker");
JmxUtil.register(backgroundPool, "pool=Worker");
orderedFact = new OrderedExecutorFactory(backgroundPool);
}
}
@@ -203,7 +203,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean {
result = workerPool;
if (result == null) {
ExecutorService corePool = VertxExecutors.newThreadPool(corePoolSize, "vert.x-core-thread-", false);
VertxJMX.register(corePool, "pool=Core");
JmxUtil.register(corePool, "pool=Core");
workerPool = result = new NioWorkerPool(corePool, corePoolSize);
}
}
@@ -221,7 +221,7 @@ public class DefaultVertx extends VertxInternal implements VertxMXBean {
result = acceptorPool;
if (result == null) {
acceptorPool = result = VertxExecutors.newCachedThreadPool("vert.x-acceptor-thread-");
VertxJMX.register(acceptorPool, "pool=Acceptor");
JmxUtil.register(acceptorPool, "pool=Acceptor");
}
}
}

View File

@@ -8,14 +8,53 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class VertxThreadPoolExecutor extends ThreadPoolExecutor implements ExecutorServiceMXBean {
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.NotificationBroadcaster;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
public class VertxThreadPoolExecutor extends ThreadPoolExecutor implements ExecutorServiceMXBean, NotificationBroadcaster {
private final NotificationBroadcasterSupport support = new NotificationBroadcasterSupport();
private final AtomicLong sequence = new AtomicLong(0L);
private AtomicLong waitingTaskCount = new AtomicLong(0L);
public static final String QUEUE_NOT_ZERO = "queue.not.zero";
public VertxThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
@Override
public void addNotificationListener(NotificationListener listener,
NotificationFilter filter, Object handback)
throws IllegalArgumentException {
support.addNotificationListener(listener, filter, handback);
}
@Override
public void removeNotificationListener(NotificationListener listener)
throws ListenerNotFoundException {
support.removeNotificationListener(listener);
}
@Override
public MBeanNotificationInfo[] getNotificationInfo() {
return support.getNotificationInfo();
}
private void fireQueueNotification(long size) {
long timestamp = System.currentTimeMillis();
String msg = String.format("Executor queue size is %d", size);
Notification notification = new Notification(QUEUE_NOT_ZERO, this, sequence.incrementAndGet(), timestamp, msg);
support.sendNotification(notification);
}
@Override
public int getWorkQueueSize() {
return super.getQueue().size();
@@ -42,6 +81,12 @@ public class VertxThreadPoolExecutor extends ThreadPoolExecutor implements Execu
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
waitingTaskCount.decrementAndGet();
// rough stab at an event if the queue is larger than 1
long size = waitingTaskCount.get();
if (size > 1) {
fireQueueNotification(size);
}
}
@Override

View File

@@ -4,8 +4,10 @@ public interface HttpServerMXBean {
String getHost();
long getRequestCount();
int getPort();
void close();
String getObjectName();
}

View File

@@ -1,17 +1,18 @@
package org.vertx.java.core.jmx;
import org.vertx.java.core.http.HttpServer;
import org.vertx.java.core.http.impl.DefaultHttpServer;
public class HttpServerProxy implements HttpServerMXBean {
private HttpServer delegate;
private DefaultHttpServer delegate;
private int port;
private String host;
public HttpServerProxy(HttpServer delegate, String host, int port) {
public HttpServerProxy(DefaultHttpServer delegate, String host, int port) {
this.delegate = delegate;
this.host = host;
this.port = port;
@@ -28,8 +29,13 @@ public class HttpServerProxy implements HttpServerMXBean {
}
@Override
public void close() {
delegate.close();
public long getRequestCount() {
return delegate.getRequestCount();
}
@Override
public String getObjectName() {
return String.format("org.vertx:type=HttpServer,name=%s[%s]", host, port);
}
}

View File

@@ -14,13 +14,15 @@ import javax.management.ObjectName;
import org.vertx.java.core.Handler;
import org.vertx.java.core.VertxRuntimeException;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.http.HttpServer;
import org.vertx.java.core.http.impl.DefaultHttpServer;
import org.vertx.java.core.impl.DefaultVertx;
//import org.vertx.java.core.http.ServerWebSocket;
import org.vertx.java.core.net.NetServer;
import org.vertx.java.core.net.impl.DefaultNetServer;
import org.vertx.java.core.sockjs.SockJSServer;
import org.vertx.java.core.sockjs.impl.DefaultSockJSServer;
public class VertxJMX {
public class JmxUtil {
private static final String DOMAIN = "org.vertx";
@@ -40,8 +42,15 @@ public class VertxJMX {
}
}
public static void unregisterEventBus() {
ObjectName name = name("type=EventBus");
if (mbeanServer.isRegistered(name)) {
unregisterMBean(name);
}
}
public static void register(EventBus eventBus, int port, String host) {
ObjectName name = name("type=EventBus,host=" + host + ",port=" + port);
ObjectName name = name(String.format("type=EventBus-%s[%s]", host, port));
if (!mbeanServer.isRegistered(name)) {
registerMBean(eventBus, name);
}
@@ -64,50 +73,49 @@ public class VertxJMX {
}
public static void register(NetServer server, String host, int port) {
ObjectName name = name("type=NetServer,host="+host+",port="+port);
if (!mbeanServer.isRegistered(name)) {
registerMBean(new NetServerProxy(server, host, port), name);
}
public static void register(DefaultNetServer server, String host, int port) {
NetServerProxy proxy = new NetServerProxy(server, host, port);
ObjectName name = wrap(proxy.getObjectName());
registerMBean(proxy, name);
}
public static void unregisterNetServer(String host, int port) {
ObjectName name = name("type=NetServer,host="+host+",port="+port);
if (mbeanServer.isRegistered(name)) {
unregisterMBean(name);
}
ObjectName name = name(String.format("type=NetServer,name=%s[%s]", host, port));
unregisterMBean(name);
}
public static void register(HttpServer server, String host, int port) {
ObjectName name = name("type=HttpServer,host="+host+",port="+port);
if (!mbeanServer.isRegistered(name)) {
registerMBean(new HttpServerProxy(server, host, port), name);
}
public static void register(DefaultHttpServer server, String host, int port) {
HttpServerProxy proxy = new HttpServerProxy(server, host, port);
ObjectName name = wrap(proxy.getObjectName());
registerMBean(proxy, name);
}
public static void unregisterHttpServer(String host, int port) {
ObjectName name = name("type=HttpServer,host="+host+",port="+port);
if (mbeanServer.isRegistered(name)) {
unregisterMBean(name);
}
ObjectName name = name(String.format("type=HttpServer,name=%s[%s]", host, port));
unregisterMBean(name);
}
public static void register(SockJSServer server, String host, int port) {
// Map<String, String> keys = new HashMap<>();
// keys.put("type", "Vertx");
// keys.put("server", "SockJSServer");
// keys.put("port", String.valueOf(port));
// ObjectName name = name(keys);
// if (!mbeanServer.isRegistered(name)) {
// registerMBean(server, name);
// }
public static void register(DefaultSockJSServer server, String host, int port) {
SockJSServerProxy proxy = new SockJSServerProxy(server, host, port);
ObjectName name = wrap(proxy.getObjectName());
registerMBean(proxy, name);
}
public static void unregister(SockJSServer server, String host, int port) {
ObjectName name = name(String.format("type=SockJSServer,name='%s[%s]'", host, port));
unregisterMBean(name);
}
private static ObjectName name(String keys) {
public static ObjectName wrap(String keys) {
try {
return new ObjectName(keys);
} catch (MalformedObjectNameException e) {
throw new VertxRuntimeException(e);
}
}
public static ObjectName name(String keys) {
try {
return new ObjectName(DOMAIN + ":" + keys);
@@ -116,9 +124,15 @@ public class VertxJMX {
}
}
private static void registerMBean(Object bean, ObjectName name) {
public static void registerMBean(Object bean, String name) {
registerMBean(bean, wrap(name));
}
public static void registerMBean(Object bean, ObjectName name) {
try {
mbeanServer.registerMBean(bean, name);
if (!mbeanServer.isRegistered(name)) {
mbeanServer.registerMBean(bean, name);
}
} catch (InstanceAlreadyExistsException e) {
throw new VertxRuntimeException(e);
} catch (MBeanRegistrationException e) {
@@ -128,9 +142,11 @@ public class VertxJMX {
}
}
private static void unregisterMBean(ObjectName name) {
public static void unregisterMBean(ObjectName name) {
try {
mbeanServer.unregisterMBean(name);
if (mbeanServer.isRegistered(name)) {
mbeanServer.unregisterMBean(name);
}
} catch (InstanceNotFoundException e) {
throw new VertxRuntimeException(e);
} catch (MBeanRegistrationException e) {

View File

@@ -6,6 +6,6 @@ public interface NetServerMXBean {
int getPort();
void close();
String getObjectName();
}

View File

@@ -27,8 +27,8 @@ public class NetServerProxy implements NetServerMXBean {
}
@Override
public void close() {
delegate.close();
public String getObjectName() {
return String.format("org.vertx:type=NetServer,name=%s[%s]", host, port);
}
}

View File

@@ -0,0 +1,7 @@
package org.vertx.java.core.jmx;
public interface SockJSServerMXBean {
String getObjectName();
}

View File

@@ -0,0 +1,22 @@
package org.vertx.java.core.jmx;
import org.vertx.java.core.sockjs.impl.DefaultSockJSServer;
public class SockJSServerProxy implements SockJSServerMXBean {
private DefaultSockJSServer server;
private String host;
private int port;
public SockJSServerProxy(DefaultSockJSServer server, String host, int port) {
this.server = server;
this.host = host;
this.port = port;
}
@Override
public String getObjectName() {
return String.format("org.vertx:type=SockJSServer,name=%s[%s]", host, port);
}
}

View File

@@ -33,7 +33,7 @@ import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.jmx.VertxJMX;
import org.vertx.java.core.jmx.JmxUtil;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
import org.vertx.java.core.net.NetServer;
@@ -57,7 +57,7 @@ public class DefaultNetServer implements NetServer {
private final VertxInternal vertx;
private final Context ctx;
private final TCPSSLHelper tcpHelper = new TCPSSLHelper();
private final Map<Channel, DefaultNetSocket> socketMap = new ConcurrentHashMap();
private final Map<Channel, DefaultNetSocket> socketMap = new ConcurrentHashMap<>();
private Handler<NetSocket> connectHandler;
private ChannelGroup serverChannelGroup;
private boolean listening;
@@ -160,7 +160,7 @@ public class DefaultNetServer implements NetServer {
}
actualServer.handlerManager.addHandler(connectHandler, ctx);
}
VertxJMX.register(this, id.host, port);
JmxUtil.register(this, id.host, port);
return this;
}
@@ -205,7 +205,7 @@ public class DefaultNetServer implements NetServer {
sock.internalClose();
}
VertxJMX.unregisterNetServer(id.host, id.port);
JmxUtil.unregisterNetServer(id.host, id.port);
// We need to reset it since sock.internalClose() above can call into the close handlers of sockets on the same thread
// which can cause context id for the thread to change!
@@ -372,7 +372,7 @@ public class DefaultNetServer implements NetServer {
NioWorker worker = ch.getWorker();
//Choose a handler
final HandlerHolder handler = handlerManager.chooseHandler(worker);
final HandlerHolder<NetSocket> handler = handlerManager.chooseHandler(worker);
if (handler == null) {
//Ignore
@@ -399,7 +399,7 @@ public class DefaultNetServer implements NetServer {
}
}
private void connected(final NioSocketChannel ch, final HandlerHolder handler) {
private void connected(final NioSocketChannel ch, final HandlerHolder<NetSocket> handler) {
handler.context.execute(new Runnable() {
public void run() {
DefaultNetSocket sock = new DefaultNetSocket(vertx, ch, handler.context);

View File

@@ -0,0 +1,13 @@
package org.vertx.java.deploy.impl;
public interface DeploymentMXBean {
String getName();
int getInstances();
boolean getAutoRedeploy();
String getObjectName();
}

View File

@@ -0,0 +1,31 @@
package org.vertx.java.deploy.impl;
public class DeploymentProxy implements DeploymentMXBean {
private Deployment deployment;
public DeploymentProxy(Deployment deployment) {
this.deployment = deployment;
}
@Override
public String getName() {
return deployment.name;
}
@Override
public int getInstances() {
return deployment.instances;
}
@Override
public boolean getAutoRedeploy() {
return deployment.autoRedeploy;
}
@Override
public String getObjectName() {
return String.format("org.vertx:type=Deployment,name=%s[%s]", deployment.name, deployment.instances);
}
}

View File

@@ -28,6 +28,7 @@ import org.vertx.java.core.impl.BlockingAction;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.impl.WorkerContext;
import org.vertx.java.core.jmx.JmxUtil;
import org.vertx.java.core.json.DecodeException;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
@@ -49,6 +50,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.management.ObjectName;
/**
*
* This class could benefit from some refactoring
@@ -115,6 +118,8 @@ public class VerticleManager implements ModuleReloader {
}
}
}
VerticleManagerProxy proxy = new VerticleManagerProxy(this);
JmxUtil.registerMBean(proxy, proxy.getObjectName());
}
public void block() {
@@ -680,6 +685,9 @@ public class VerticleManager implements ModuleReloader {
final ClassLoader sharedLoader = worker ? new ParentLastURLClassLoader(urls, getClass()
.getClassLoader()): null;
ObjectName on = JmxUtil.name(String.format("type=Deployment,name=%s[%s]", deployment.name, deployment.instances));
JmxUtil.registerMBean(new DeploymentProxy(deployment), on);
for (int i = 0; i < instances; i++) {
// Launch the verticle instance
@@ -832,6 +840,8 @@ public class VerticleManager implements ModuleReloader {
}
});
}
ObjectName on = JmxUtil.name(String.format("type=Deployment,name=%s[%s]", deployment.name, deployment.instances));
JmxUtil.unregisterMBean(on);
}
if (deployment.parentDeploymentName != null) {

View File

@@ -0,0 +1,13 @@
package org.vertx.java.deploy.impl;
import java.util.Map;
public interface VerticleManagerMXBean {
String getObjectName();
Map<String, Integer> getDeployments();
void undeploy(String name);
}

View File

@@ -0,0 +1,71 @@
package org.vertx.java.deploy.impl;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.NotificationBroadcaster;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import org.vertx.java.core.Handler;
public class VerticleManagerProxy implements VerticleManagerMXBean, NotificationBroadcaster {
private static final String UNDEPLOY_EVENT = "undeploy";
private final NotificationBroadcasterSupport support = new NotificationBroadcasterSupport();
private final AtomicLong sequence = new AtomicLong(0L);
private VerticleManager verticleManager;
public VerticleManagerProxy(VerticleManager verticleManager) {
this.verticleManager = verticleManager;
}
@Override
public String getObjectName() {
return String.format("org.vertx:type=VerticleManager");
}
@Override
public Map<String, Integer> getDeployments() {
return verticleManager.listInstances();
}
@Override
public void undeploy(final String name) {
verticleManager.undeploy(name, new Handler<Void>() {
public void handle(Void event) {
String msg = String.format("Undeployed %s", name);
long timestamp = System.currentTimeMillis();
Notification notification = new Notification(UNDEPLOY_EVENT, VerticleManagerProxy.this, sequence.incrementAndGet(), timestamp, msg);
support.sendNotification(notification);
}
});
}
@Override
public void addNotificationListener(NotificationListener listener,
NotificationFilter filter, Object handback)
throws IllegalArgumentException {
support.addNotificationListener(listener, filter, handback);
}
@Override
public void removeNotificationListener(NotificationListener listener)
throws ListenerNotFoundException {
support.removeNotificationListener(listener);
}
@Override
public MBeanNotificationInfo[] getNotificationInfo() {
return support.getNotificationInfo();
}
}