[hornetq-commits] JBoss hornetq SVN: r8050 - in trunk: src/main/org/hornetq/core/management/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 6 05:06:28 EDT 2009


Author: jmesnil
Date: 2009-10-06 05:06:28 -0400 (Tue, 06 Oct 2009)
New Revision: 8050

Added:
   trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
Modified:
   trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java
   trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/Bridge.java
   trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
Log:
HORNETQ-168: Provide MBean for cluster diagnostics

* added to ClusterConnectionControl the management operation getNodes() which returns a Map of Node ID / addresses of the other
  nodes for the given cluster 

Modified: trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -13,8 +13,10 @@
 
 package org.hornetq.core.management;
 
+import java.util.Map;
 
 
+
 /**
  * A ClusterConnectionControlMBean
  *
@@ -26,6 +28,8 @@
    String getName();
 
    String getAddress();
+   
+   String getNodeID();
 
    boolean isDuplicateDetection();
 
@@ -40,4 +44,6 @@
    String getDiscoveryGroupName();
 
    long getRetryInterval();
+   
+   Map<String, String> getNodes() throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -14,6 +14,7 @@
 package org.hornetq.core.management.impl;
 
 import java.util.List;
+import java.util.Map;
 
 import javax.management.StandardMBean;
 
@@ -78,6 +79,11 @@
    {
       return configuration.getRetryInterval();
    }
+   
+   public String getNodeID()
+   {
+      return clusterConnection.getNodeID();
+   }
 
    public Object[] getStaticConnectorNamePairs()
    {
@@ -135,6 +141,11 @@
       return configuration.isForwardWhenNoConsumers();
    }
 
+   public Map<String, String> getNodes() throws Exception
+   {
+      return clusterConnection.getNodes();
+   }
+   
    public boolean isStarted()
    {
       return clusterConnection.isStarted();

Modified: trunk/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/Bridge.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/Bridge.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -15,6 +15,7 @@
 package org.hornetq.core.server.cluster;
 
 import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.Queue;
@@ -48,4 +49,6 @@
    void setQueue(Queue queue);
 
    void setNotificationService(NotificationService notificationService);
+
+   RemotingConnection getForwardingConnection();
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.server.cluster;
 
+import java.util.Map;
+
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.utils.SimpleString;
 
@@ -29,6 +31,13 @@
 {
    SimpleString getName();
 
+   String getNodeID();
+
+   /**
+    * @return a Map of node ID and addresses
+    */
+   Map<String, String> getNodes();
+
    void handleReplicatedAddBinding(SimpleString address,
                                    SimpleString uniqueName,
                                    SimpleString routingName,

Modified: trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -15,7 +15,6 @@
 package org.hornetq.core.server.cluster;
 
 import org.hornetq.core.client.MessageHandler;
-import org.hornetq.core.server.Queue;
 
 /**
  * A MessageFlowRecord
@@ -32,9 +31,8 @@
    
    int getMaxHops();
    
-   void activate(Queue queue) throws Exception;
-   
-   //void reset() throws Exception;
-   
+   Bridge getBridge();
+
    void close() throws Exception;
+
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.core.client.ClientMessage;
@@ -254,7 +255,25 @@
    {
       return name;
    }
+   
+   public String getNodeID()
+   {
+      return nodeUUID.toString();
+   }
 
+   public Map<String, String> getNodes()
+   {
+      Map<String, String> nodes = new HashMap<String, String>();
+      for (Entry<String, MessageFlowRecord> record : records.entrySet( ))
+      {
+         if (record.getValue().getBridge().getForwardingConnection() != null)
+         {
+            nodes.put(record.getKey(), record.getValue().getBridge().getForwardingConnection().getRemoteAddress());
+         }
+      }
+      return nodes;
+   }
+   
    public synchronized void activate()
    {
       if (!started)
@@ -442,19 +461,15 @@
          clearBindings();
       }
 
-      public void activate(final Queue queue) throws Exception
-      {
-         this.queue = queue;
-
-         bridge.setQueue(queue);
-
-         bridge.start();
-      }
-
       public void setBridge(final Bridge bridge)
       {
          this.bridge = bridge;
       }
+      
+      public Bridge getBridge()
+      {
+         return bridge;
+      }
 
       public synchronized void onMessage(final ClientMessage message)
       {

Added: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.management;
+
+import static org.hornetq.tests.util.RandomUtil.randomBoolean;
+import static org.hornetq.tests.util.RandomUtil.randomPositiveInt;
+import static org.hornetq.tests.util.RandomUtil.randomPositiveLong;
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
+import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
+import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
+import org.hornetq.core.config.cluster.QueueConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.utils.Pair;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A BridgeControlTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * Created 11 dec. 2008 17:38:58
+ *
+ */
+public class ClusterConnectionControl2Test extends ManagementTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private HornetQServer server_0;
+
+   private HornetQServer server_1;
+
+   private MBeanServer mbeanServer_1;
+   
+   private int port_1 = TransportConstants.DEFAULT_PORT + 1000;
+   
+   private ClusterConnectionConfiguration clusterConnectionConfig_0;
+
+   private String clusterName = "cluster";
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testNodes() throws Exception
+   {
+      ClusterConnectionControl clusterConnectionControl_0 = createManagementControl(clusterConnectionConfig_0.getName());
+      assertTrue(clusterConnectionControl_0.isStarted());
+      Map<String, String> nodes = clusterConnectionControl_0.getNodes();
+      assertEquals(0, nodes.size());
+      
+      server_1.start();
+      Thread.sleep(3000);
+
+      nodes = clusterConnectionControl_0.getNodes();
+      System.out.println(nodes);
+      assertEquals(1, nodes.size());
+      String remoteAddress = nodes.values().iterator().next();
+      assertTrue(remoteAddress.endsWith(":" + port_1));
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      String discoveryName = randomString();
+      String groupAddress = "231.7.7.7";
+      int groupPort = 9876;
+
+      Map<String, Object> acceptorParams_1 = new HashMap<String, Object>();
+      acceptorParams_1.put(TransportConstants.PORT_PROP_NAME, port_1);
+      TransportConfiguration acceptorConfig_1 = new TransportConfiguration(NettyAcceptorFactory.class.getName(), acceptorParams_1);
+      
+      TransportConfiguration connectorConfig_1 = new TransportConfiguration(NettyConnectorFactory.class.getName(),
+                                                                            acceptorParams_1);
+
+      TransportConfiguration connectorConfig_0 = new TransportConfiguration(NettyConnectorFactory.class.getName());
+
+      QueueConfiguration queueConfig = new QueueConfiguration(randomString(), randomString(), null, false);
+
+      clusterConnectionConfig_0 = new ClusterConnectionConfiguration(clusterName,
+                                                                   queueConfig.getAddress(),
+                                                                   randomPositiveLong(),
+                                                                   randomBoolean(),
+                                                                   randomBoolean(),
+                                                                   randomPositiveInt(),
+                                                                   discoveryName);
+      List<Pair<String, String>> connectorInfos = new ArrayList<Pair<String, String>>();
+      connectorInfos.add(new Pair<String, String>("netty", null));
+      BroadcastGroupConfiguration broadcastGroupConfig = new BroadcastGroupConfiguration(discoveryName,
+                                                                                         null,
+                                                                                         -1,
+                                                                                         groupAddress,
+                                                                                         groupPort,
+                                                                                         250,
+                                                                                         connectorInfos);
+      DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration(discoveryName,
+                                                                                         groupAddress,
+                                                                                         groupPort,
+                                                                                         ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT);
+
+      Configuration conf_1 = new ConfigurationImpl();
+      conf_1.setSecurityEnabled(false);
+      conf_1.setJMXManagementEnabled(true);
+      conf_1.setClustered(true);
+      conf_1.getAcceptorConfigurations().add(acceptorConfig_1);
+      conf_1.getConnectorConfigurations().put("netty", connectorConfig_1);
+      conf_1.getQueueConfigurations().add(queueConfig);
+      conf_1.getDiscoveryGroupConfigurations().put(discoveryName, discoveryGroupConfig);
+      conf_1.getBroadcastGroupConfigurations().add(broadcastGroupConfig);
+
+      Configuration conf_0 = new ConfigurationImpl();
+      conf_0.setSecurityEnabled(false);
+      conf_0.setJMXManagementEnabled(true);
+      conf_0.setClustered(true);
+      conf_0.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      conf_0.getConnectorConfigurations().put("netty", connectorConfig_0);
+      conf_0.getClusterConfigurations().add(clusterConnectionConfig_0);
+      conf_0.getDiscoveryGroupConfigurations().put(discoveryName, discoveryGroupConfig);
+      conf_0.getBroadcastGroupConfigurations().add(broadcastGroupConfig);
+
+      mbeanServer_1 = MBeanServerFactory.createMBeanServer();
+      server_1 = HornetQ.newHornetQServer(conf_1, mbeanServer_1, false);
+
+      server_0 = HornetQ.newHornetQServer(conf_0, mbeanServer, false);
+      server_0.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server_0.stop();
+      server_1.stop();
+      
+      server_0 = null;
+      server_1 = null;
+      
+      MBeanServerFactory.releaseMBeanServer(mbeanServer_1);
+      mbeanServer_1 = null;
+
+      super.tearDown();
+   }
+
+   protected ClusterConnectionControl createManagementControl(String name) throws Exception
+   {
+      return ManagementControlHelper.createClusterConnectionControl(name, mbeanServer);
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
 
 import org.hornetq.core.config.Configuration;
@@ -64,6 +65,8 @@
 
    private HornetQServer server_1;
 
+   private MBeanServer mbeanServer_1;
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
@@ -198,7 +201,8 @@
       conf_0.getClusterConfigurations().add(clusterConnectionConfig1);
       conf_0.getClusterConfigurations().add(clusterConnectionConfig2);
 
-      server_1 = HornetQ.newHornetQServer(conf_1, MBeanServerFactory.createMBeanServer(), false);
+      mbeanServer_1 = MBeanServerFactory.createMBeanServer();
+      server_1 = HornetQ.newHornetQServer(conf_1, mbeanServer_1, false);
       server_1.start();
 
       server_0 = HornetQ.newHornetQServer(conf_0, mbeanServer, false);
@@ -215,6 +219,9 @@
       
       server_1 = null;
 
+      MBeanServerFactory.releaseMBeanServer(mbeanServer_1);
+      mbeanServer_1 = null;
+
       super.tearDown();
    }
    

Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java	2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java	2009-10-06 09:06:28 UTC (rev 8050)
@@ -13,6 +13,8 @@
 
 package org.hornetq.tests.integration.management;
 
+import java.util.Map;
+
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
@@ -84,6 +86,11 @@
          {
             return (String)proxy.retrieveAttributeValue("staticConnectorNamePairsAsJSON");
          }
+         
+         public Map<String, String> getNodes() throws Exception
+         {
+            return (Map<String, String>)proxy.retrieveAttributeValue("nodes");
+         }
 
          public boolean isDuplicateDetection()
          {
@@ -99,6 +106,11 @@
          {
             return (String)proxy.retrieveAttributeValue("name");
          }
+         
+         public String getNodeID()
+         {
+            return (String)proxy.retrieveAttributeValue("nodeID");
+         }
 
          public boolean isStarted()
          {



More information about the hornetq-commits mailing list