[hornetq-commits] JBoss hornetq SVN: r8130 - in trunk: src/main/org/hornetq/core/remoting/impl/invm and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 20 08:43:47 EDT 2009
Author: jmesnil
Date: 2009-10-20 08:43:46 -0400 (Tue, 20 Oct 2009)
New Revision: 8130
Modified:
trunk/src/main/org/hornetq/core/management/NotificationType.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-37: Add additional core management notifications
* added notifications when acceptors & cluster connections are started/stopped
Modified: trunk/src/main/org/hornetq/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/NotificationType.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/management/NotificationType.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -30,8 +30,13 @@
BROADCAST_GROUP_STARTED(10),
BROADCAST_GROUP_STOPPED(11),
BRIDGE_STARTED(12),
- BRIDGE_STOPPED(13);
+ BRIDGE_STOPPED(13),
+ CLUSTER_CONNECTION_STARTED(14),
+ CLUSTER_CONNECTION_STOPPED(15),
+ ACCEPTOR_STARTED(16),
+ ACCEPTOR_STOPPED(17);
+
private final int value;
private NotificationType(int value)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -19,6 +19,9 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.BufferHandler;
import org.hornetq.core.remoting.spi.Connection;
@@ -26,6 +29,8 @@
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
/**
* A InVMAcceptor
@@ -51,6 +56,8 @@
private boolean paused;
+ private NotificationService notificationService;
+
public InVMAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -74,6 +81,15 @@
InVMRegistry.instance.registerAcceptor(id, this);
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName()));
+ props.putIntProperty(new SimpleString("id"), id);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+ notificationService.sendNotification(notification);
+ }
+
started = true;
paused = false;
@@ -98,6 +114,23 @@
connections.clear();
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName()));
+ props.putIntProperty(new SimpleString("id"), id);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
started = false;
paused = false;
@@ -134,6 +167,11 @@
paused = false;
}
+
+ public void setNotificationService(NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
public BufferHandler getHandler()
{
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -174,6 +174,7 @@
if (managementService != null)
{
+ acceptor.setNotificationService(managementService);
managementService.registerAcceptor(acceptor, info);
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -13,10 +13,11 @@
package org.hornetq.core.remoting.spi;
+import org.hornetq.core.management.NotificationService;
import org.hornetq.core.server.HornetQComponent;
/**
- * An Acceptor is used tby the Remoting Service to allow clients to connect. It should take care of dispatchin client requests
+ * An Acceptor is used by the Remoting Service to allow clients to connect. It should take care of dispatching client requests
* to the Remoting Service's Dispatcher.
*
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -27,4 +28,6 @@
void pause();
void resume();
+
+ void setNotificationService(NotificationService notificationService);
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -47,6 +47,7 @@
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
/**
@@ -218,6 +219,14 @@
}
started = true;
+
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STARTED, props);
+ managementService.sendNotification(notification);
+ }
}
public synchronized void stop() throws Exception
@@ -243,6 +252,14 @@
}
}
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STOPPED, props);
+ managementService.sendNotification(notification);
+ }
+
started = false;
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -31,12 +31,17 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.BufferHandler;
import org.hornetq.core.remoting.spi.Connection;
import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
import org.hornetq.utils.ConfigurationHelper;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.VersionLoader;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@@ -121,6 +126,8 @@
private final Executor threadPool;
+ private NotificationService notificationService;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -303,6 +310,16 @@
log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion().getNettyVersion() + " using " + Version.ID);
}
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
+ props.putStringProperty(new SimpleString("host"), new SimpleString(host));
+ props.putIntProperty(new SimpleString("port"), port);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+ notificationService.sendNotification(notification);
+ }
+
log.info("Started Netty Acceptor version " + Version.ID);
}
@@ -364,6 +381,24 @@
}
connections.clear();
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
+ props.putStringProperty(new SimpleString("host"), new SimpleString(host));
+ props.putIntProperty(new SimpleString("port"), port);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
paused = false;
}
@@ -414,6 +449,11 @@
paused = false;
}
+ public void setNotificationService(final NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
+
// Inner classes -----------------------------------------------------------------------------
@ChannelPipelineCoverage("one")
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -25,10 +25,14 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AcceptorControl;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.utils.SimpleString;
/**
* A AcceptorControlTest
@@ -133,7 +137,43 @@
}
}
+
+ public void testNotifications() throws Exception
+ {
+ TransportConfiguration acceptorConfig = new TransportConfiguration(InVMAcceptorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ randomString());
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ conf.getAcceptorConfigurations().add(acceptorConfig);
+ service = HornetQ.newHornetQServer(conf, mbeanServer, false);
+ service.start();
+ AcceptorControl acceptorControl = createManagementControl(acceptorConfig.getName());
+
+
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+
+ service.getManagementService().addNotificationListener(notifListener);
+
+ assertEquals(0, notifListener.getNotifications().size());
+
+ acceptorControl.stop();
+
+ assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ assertEquals(NotificationType.ACCEPTOR_STOPPED, notif.getType());
+ assertEquals(InVMAcceptorFactory.class.getName(), (notif.getProperties().getProperty(new SimpleString("factory")).toString()));
+
+ acceptorControl.start();
+
+ assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ assertEquals(NotificationType.ACCEPTOR_STARTED, notif.getType());
+ assertEquals(InVMAcceptorFactory.class.getName(), (notif.getProperties().getProperty(new SimpleString("factory")).toString()));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -33,12 +33,16 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.management.ObjectNameBuilder;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.integration.SimpleNotificationService;
import org.hornetq.utils.Pair;
+import org.hornetq.utils.SimpleString;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -144,6 +148,31 @@
assertTrue(clusterConnectionControl.isStarted());
}
+ public void testNotifications() throws Exception
+ {
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ checkResource(ObjectNameBuilder.DEFAULT.getClusterConnectionObjectName(clusterConnectionConfig1.getName()));
+ ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig1.getName());
+
+ server_0.getManagementService().addNotificationListener(notifListener);
+
+ assertEquals(0, notifListener.getNotifications().size());
+
+ clusterConnectionControl.stop();
+
+ assertTrue(notifListener.getNotifications().size() > 0);
+ Notification notif = notifListener.getNotifications().get(notifListener.getNotifications().size() - 1);
+ assertEquals(NotificationType.CLUSTER_CONNECTION_STOPPED, notif.getType());
+ assertEquals(clusterConnectionControl.getName(), (notif.getProperties().getProperty(new SimpleString("name")).toString()));
+
+ clusterConnectionControl.start();
+
+ assertTrue(notifListener.getNotifications().size() > 0);
+ notif = notifListener.getNotifications().get(notifListener.getNotifications().size() - 1);
+ assertEquals(NotificationType.CLUSTER_CONNECTION_STARTED, notif.getType());
+ assertEquals(clusterConnectionControl.getName(), (notif.getProperties().getProperty(new SimpleString("name")).toString()));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the hornetq-commits
mailing list