[hornetq-commits] JBoss hornetq SVN: r8130 - in trunk: src/main/org/hornetq/core/remoting/impl/invm and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 20 08:43:47 EDT 2009


Author: jmesnil
Date: 2009-10-20 08:43:46 -0400 (Tue, 20 Oct 2009)
New Revision: 8130

Modified:
   trunk/src/main/org/hornetq/core/management/NotificationType.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-37: Add additional core management notifications

* added notifications when acceptors & cluster connections are started/stopped

Modified: trunk/src/main/org/hornetq/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/NotificationType.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/management/NotificationType.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -30,8 +30,13 @@
    BROADCAST_GROUP_STARTED(10),
    BROADCAST_GROUP_STOPPED(11),
    BRIDGE_STARTED(12),
-   BRIDGE_STOPPED(13);
+   BRIDGE_STOPPED(13), 
+   CLUSTER_CONNECTION_STARTED(14),
+   CLUSTER_CONNECTION_STOPPED(15),
+   ACCEPTOR_STARTED(16),
+   ACCEPTOR_STOPPED(17);
 
+
    private final int value;
 
    private NotificationType(int value)

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -19,6 +19,9 @@
 
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.management.NotificationType;
 import org.hornetq.core.remoting.spi.Acceptor;
 import org.hornetq.core.remoting.spi.BufferHandler;
 import org.hornetq.core.remoting.spi.Connection;
@@ -26,6 +29,8 @@
 import org.hornetq.utils.ConfigurationHelper;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * A InVMAcceptor
@@ -51,6 +56,8 @@
    
    private boolean paused;
 
+   private NotificationService notificationService;
+
    public InVMAcceptor(final Map<String, Object> configuration,
                        final BufferHandler handler,
                        final ConnectionLifeCycleListener listener,
@@ -74,6 +81,15 @@
 
       InVMRegistry.instance.registerAcceptor(id, this);
 
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName()));
+         props.putIntProperty(new SimpleString("id"), id);
+         Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+         notificationService.sendNotification(notification);
+      }
+      
       started = true;
       
       paused = false;
@@ -98,6 +114,23 @@
 
       connections.clear();
 
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName()));
+         props.putIntProperty(new SimpleString("id"), id);
+         Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+         try
+         {
+            notificationService.sendNotification(notification);
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
+      
       started = false;
       
       paused = false;
@@ -134,6 +167,11 @@
       
       paused = false;
    }
+   
+   public void setNotificationService(NotificationService notificationService)
+   {
+      this.notificationService = notificationService;
+   }
 
    public BufferHandler getHandler()
    {

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -174,6 +174,7 @@
 
             if (managementService != null)
             {
+               acceptor.setNotificationService(managementService);
                managementService.registerAcceptor(acceptor, info);
             }
          }

Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -13,10 +13,11 @@
 
 package org.hornetq.core.remoting.spi;
 
+import org.hornetq.core.management.NotificationService;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
- * An Acceptor is used tby the Remoting Service to allow clients to connect. It should take care of dispatchin client requests
+ * An Acceptor is used by the Remoting Service to allow clients to connect. It should take care of dispatching client requests
  * to the Remoting Service's Dispatcher.
  *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -27,4 +28,6 @@
    void pause();
    
    void resume();
+   
+   void setNotificationService(NotificationService notificationService);
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -47,6 +47,7 @@
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUID;
 
 /**
@@ -218,6 +219,14 @@
       }
 
       started = true;
+      
+      if (managementService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putStringProperty(new SimpleString("name"), name);
+         Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STARTED, props);
+         managementService.sendNotification(notification);
+      }
    }
 
    public synchronized void stop() throws Exception
@@ -243,6 +252,14 @@
          }
       }
 
+      if (managementService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putStringProperty(new SimpleString("name"), name);
+         Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STOPPED, props);
+         managementService.sendNotification(notification);
+      }
+      
       started = false;
    }
 

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -31,12 +31,17 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.management.NotificationType;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
 import org.hornetq.core.remoting.spi.Acceptor;
 import org.hornetq.core.remoting.spi.BufferHandler;
 import org.hornetq.core.remoting.spi.Connection;
 import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
 import org.hornetq.utils.ConfigurationHelper;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.VersionLoader;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
@@ -121,6 +126,8 @@
 
    private final Executor threadPool;
 
+   private NotificationService notificationService;
+
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
                         final ConnectionLifeCycleListener listener,
@@ -303,6 +310,16 @@
           log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion().getNettyVersion() + " using " + Version.ID);
       }
 
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
+         props.putStringProperty(new SimpleString("host"), new SimpleString(host));
+         props.putIntProperty(new SimpleString("port"), port);
+         Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+         notificationService.sendNotification(notification);
+      }
+      
       log.info("Started Netty Acceptor version " + Version.ID);
    }
 
@@ -364,6 +381,24 @@
       }
 
       connections.clear();
+    
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
+         props.putStringProperty(new SimpleString("host"), new SimpleString(host));
+         props.putIntProperty(new SimpleString("port"), port);
+         Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+         try
+         {
+            notificationService.sendNotification(notification);
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
       
       paused = false;
    }
@@ -414,6 +449,11 @@
       paused = false;
    }
 
+   public void setNotificationService(final NotificationService notificationService)
+   {
+      this.notificationService = notificationService;
+   }
+   
    // Inner classes -----------------------------------------------------------------------------
 
    @ChannelPipelineCoverage("one")

Modified: trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -25,10 +25,14 @@
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.AcceptorControl;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationType;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.utils.SimpleString;
 
 /**
  * A AcceptorControlTest
@@ -133,7 +137,43 @@
       }
       
    }
+   
+   public void testNotifications() throws Exception
+   {
+      TransportConfiguration acceptorConfig = new TransportConfiguration(InVMAcceptorFactory.class.getName(),
+                                                                         new HashMap<String, Object>(),
+                                                                         randomString());
+      Configuration conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
+      conf.setJMXManagementEnabled(true);
+      conf.getAcceptorConfigurations().add(acceptorConfig);
+      service = HornetQ.newHornetQServer(conf, mbeanServer, false);
+      service.start();
 
+      AcceptorControl acceptorControl = createManagementControl(acceptorConfig.getName());
+
+      
+      SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+
+      service.getManagementService().addNotificationListener(notifListener);
+      
+      assertEquals(0, notifListener.getNotifications().size());
+      
+      acceptorControl.stop();
+      
+      assertEquals(1, notifListener.getNotifications().size());
+      Notification notif = notifListener.getNotifications().get(0);
+      assertEquals(NotificationType.ACCEPTOR_STOPPED, notif.getType());
+      assertEquals(InVMAcceptorFactory.class.getName(), (notif.getProperties().getProperty(new SimpleString("factory")).toString()));
+      
+      acceptorControl.start();
+      
+      assertEquals(2, notifListener.getNotifications().size());
+      notif = notifListener.getNotifications().get(1);
+      assertEquals(NotificationType.ACCEPTOR_STARTED, notif.getType());
+      assertEquals(InVMAcceptorFactory.class.getName(), (notif.getProperties().getProperty(new SimpleString("factory")).toString()));      
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java	2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java	2009-10-20 12:43:46 UTC (rev 8130)
@@ -33,12 +33,16 @@
 import org.hornetq.core.config.cluster.QueueConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationType;
 import org.hornetq.core.management.ObjectNameBuilder;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.integration.SimpleNotificationService;
 import org.hornetq.utils.Pair;
+import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.json.JSONArray;
 import org.hornetq.utils.json.JSONObject;
 
@@ -144,6 +148,31 @@
       assertTrue(clusterConnectionControl.isStarted());
    }
 
+   public void testNotifications() throws Exception
+   {
+      SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+      checkResource(ObjectNameBuilder.DEFAULT.getClusterConnectionObjectName(clusterConnectionConfig1.getName()));
+      ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig1.getName());
+
+      server_0.getManagementService().addNotificationListener(notifListener);
+      
+      assertEquals(0, notifListener.getNotifications().size());
+      
+      clusterConnectionControl.stop();
+      
+      assertTrue(notifListener.getNotifications().size() > 0);
+      Notification notif = notifListener.getNotifications().get(notifListener.getNotifications().size() - 1);
+      assertEquals(NotificationType.CLUSTER_CONNECTION_STOPPED, notif.getType());
+      assertEquals(clusterConnectionControl.getName(), (notif.getProperties().getProperty(new SimpleString("name")).toString()));
+      
+      clusterConnectionControl.start();
+      
+      assertTrue(notifListener.getNotifications().size() > 0);
+      notif = notifListener.getNotifications().get(notifListener.getNotifications().size() - 1);
+      assertEquals(NotificationType.CLUSTER_CONNECTION_STARTED, notif.getType());
+      assertEquals(clusterConnectionControl.getName(), (notif.getProperties().getProperty(new SimpleString("name")).toString()));      
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list