[hornetq-commits] JBoss hornetq SVN: r8050 - in trunk: src/main/org/hornetq/core/management/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 6 05:06:28 EDT 2009
Author: jmesnil
Date: 2009-10-06 05:06:28 -0400 (Tue, 06 Oct 2009)
New Revision: 8050
Added:
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
Modified:
trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java
trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
trunk/src/main/org/hornetq/core/server/cluster/Bridge.java
trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
Log:
HORNETQ-168: Provide MBean for cluster diagnostics
* added to ClusterConnectionControl the management operation getNodes() which returns a Map of Node ID / addresses of the other
nodes for the given cluster
Modified: trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -13,8 +13,10 @@
package org.hornetq.core.management;
+import java.util.Map;
+
/**
* A ClusterConnectionControlMBean
*
@@ -26,6 +28,8 @@
String getName();
String getAddress();
+
+ String getNodeID();
boolean isDuplicateDetection();
@@ -40,4 +44,6 @@
String getDiscoveryGroupName();
long getRetryInterval();
+
+ Map<String, String> getNodes() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -14,6 +14,7 @@
package org.hornetq.core.management.impl;
import java.util.List;
+import java.util.Map;
import javax.management.StandardMBean;
@@ -78,6 +79,11 @@
{
return configuration.getRetryInterval();
}
+
+ public String getNodeID()
+ {
+ return clusterConnection.getNodeID();
+ }
public Object[] getStaticConnectorNamePairs()
{
@@ -135,6 +141,11 @@
return configuration.isForwardWhenNoConsumers();
}
+ public Map<String, String> getNodes() throws Exception
+ {
+ return clusterConnection.getNodes();
+ }
+
public boolean isStarted()
{
return clusterConnection.isStarted();
Modified: trunk/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/Bridge.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/Bridge.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -15,6 +15,7 @@
package org.hornetq.core.server.cluster;
import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.Queue;
@@ -48,4 +49,6 @@
void setQueue(Queue queue);
void setNotificationService(NotificationService notificationService);
+
+ RemotingConnection getForwardingConnection();
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -13,6 +13,8 @@
package org.hornetq.core.server.cluster;
+import java.util.Map;
+
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.utils.SimpleString;
@@ -29,6 +31,13 @@
{
SimpleString getName();
+ String getNodeID();
+
+ /**
+ * @return a Map of node ID and addresses
+ */
+ Map<String, String> getNodes();
+
void handleReplicatedAddBinding(SimpleString address,
SimpleString uniqueName,
SimpleString routingName,
Modified: trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -15,7 +15,6 @@
package org.hornetq.core.server.cluster;
import org.hornetq.core.client.MessageHandler;
-import org.hornetq.core.server.Queue;
/**
* A MessageFlowRecord
@@ -32,9 +31,8 @@
int getMaxHops();
- void activate(Queue queue) throws Exception;
-
- //void reset() throws Exception;
-
+ Bridge getBridge();
+
void close() throws Exception;
+
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.core.client.ClientMessage;
@@ -254,7 +255,25 @@
{
return name;
}
+
+ public String getNodeID()
+ {
+ return nodeUUID.toString();
+ }
+ public Map<String, String> getNodes()
+ {
+ Map<String, String> nodes = new HashMap<String, String>();
+ for (Entry<String, MessageFlowRecord> record : records.entrySet( ))
+ {
+ if (record.getValue().getBridge().getForwardingConnection() != null)
+ {
+ nodes.put(record.getKey(), record.getValue().getBridge().getForwardingConnection().getRemoteAddress());
+ }
+ }
+ return nodes;
+ }
+
public synchronized void activate()
{
if (!started)
@@ -442,19 +461,15 @@
clearBindings();
}
- public void activate(final Queue queue) throws Exception
- {
- this.queue = queue;
-
- bridge.setQueue(queue);
-
- bridge.start();
- }
-
public void setBridge(final Bridge bridge)
{
this.bridge = bridge;
}
+
+ public Bridge getBridge()
+ {
+ return bridge;
+ }
public synchronized void onMessage(final ClientMessage message)
{
Added: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.management;
+
+import static org.hornetq.tests.util.RandomUtil.randomBoolean;
+import static org.hornetq.tests.util.RandomUtil.randomPositiveInt;
+import static org.hornetq.tests.util.RandomUtil.randomPositiveLong;
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
+import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
+import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
+import org.hornetq.core.config.cluster.QueueConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.utils.Pair;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A BridgeControlTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * Created 11 dec. 2008 17:38:58
+ *
+ */
+public class ClusterConnectionControl2Test extends ManagementTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server_0;
+
+ private HornetQServer server_1;
+
+ private MBeanServer mbeanServer_1;
+
+ private int port_1 = TransportConstants.DEFAULT_PORT + 1000;
+
+ private ClusterConnectionConfiguration clusterConnectionConfig_0;
+
+ private String clusterName = "cluster";
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testNodes() throws Exception
+ {
+ ClusterConnectionControl clusterConnectionControl_0 = createManagementControl(clusterConnectionConfig_0.getName());
+ assertTrue(clusterConnectionControl_0.isStarted());
+ Map<String, String> nodes = clusterConnectionControl_0.getNodes();
+ assertEquals(0, nodes.size());
+
+ server_1.start();
+ Thread.sleep(3000);
+
+ nodes = clusterConnectionControl_0.getNodes();
+ System.out.println(nodes);
+ assertEquals(1, nodes.size());
+ String remoteAddress = nodes.values().iterator().next();
+ assertTrue(remoteAddress.endsWith(":" + port_1));
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ String discoveryName = randomString();
+ String groupAddress = "231.7.7.7";
+ int groupPort = 9876;
+
+ Map<String, Object> acceptorParams_1 = new HashMap<String, Object>();
+ acceptorParams_1.put(TransportConstants.PORT_PROP_NAME, port_1);
+ TransportConfiguration acceptorConfig_1 = new TransportConfiguration(NettyAcceptorFactory.class.getName(), acceptorParams_1);
+
+ TransportConfiguration connectorConfig_1 = new TransportConfiguration(NettyConnectorFactory.class.getName(),
+ acceptorParams_1);
+
+ TransportConfiguration connectorConfig_0 = new TransportConfiguration(NettyConnectorFactory.class.getName());
+
+ QueueConfiguration queueConfig = new QueueConfiguration(randomString(), randomString(), null, false);
+
+ clusterConnectionConfig_0 = new ClusterConnectionConfiguration(clusterName,
+ queueConfig.getAddress(),
+ randomPositiveLong(),
+ randomBoolean(),
+ randomBoolean(),
+ randomPositiveInt(),
+ discoveryName);
+ List<Pair<String, String>> connectorInfos = new ArrayList<Pair<String, String>>();
+ connectorInfos.add(new Pair<String, String>("netty", null));
+ BroadcastGroupConfiguration broadcastGroupConfig = new BroadcastGroupConfiguration(discoveryName,
+ null,
+ -1,
+ groupAddress,
+ groupPort,
+ 250,
+ connectorInfos);
+ DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration(discoveryName,
+ groupAddress,
+ groupPort,
+ ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT);
+
+ Configuration conf_1 = new ConfigurationImpl();
+ conf_1.setSecurityEnabled(false);
+ conf_1.setJMXManagementEnabled(true);
+ conf_1.setClustered(true);
+ conf_1.getAcceptorConfigurations().add(acceptorConfig_1);
+ conf_1.getConnectorConfigurations().put("netty", connectorConfig_1);
+ conf_1.getQueueConfigurations().add(queueConfig);
+ conf_1.getDiscoveryGroupConfigurations().put(discoveryName, discoveryGroupConfig);
+ conf_1.getBroadcastGroupConfigurations().add(broadcastGroupConfig);
+
+ Configuration conf_0 = new ConfigurationImpl();
+ conf_0.setSecurityEnabled(false);
+ conf_0.setJMXManagementEnabled(true);
+ conf_0.setClustered(true);
+ conf_0.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ conf_0.getConnectorConfigurations().put("netty", connectorConfig_0);
+ conf_0.getClusterConfigurations().add(clusterConnectionConfig_0);
+ conf_0.getDiscoveryGroupConfigurations().put(discoveryName, discoveryGroupConfig);
+ conf_0.getBroadcastGroupConfigurations().add(broadcastGroupConfig);
+
+ mbeanServer_1 = MBeanServerFactory.createMBeanServer();
+ server_1 = HornetQ.newHornetQServer(conf_1, mbeanServer_1, false);
+
+ server_0 = HornetQ.newHornetQServer(conf_0, mbeanServer, false);
+ server_0.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server_0.stop();
+ server_1.stop();
+
+ server_0 = null;
+ server_1 = null;
+
+ MBeanServerFactory.releaseMBeanServer(mbeanServer_1);
+ mbeanServer_1 = null;
+
+ super.tearDown();
+ }
+
+ protected ClusterConnectionControl createManagementControl(String name) throws Exception
+ {
+ return ManagementControlHelper.createClusterConnectionControl(name, mbeanServer);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import org.hornetq.core.config.Configuration;
@@ -64,6 +65,8 @@
private HornetQServer server_1;
+ private MBeanServer mbeanServer_1;
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@@ -198,7 +201,8 @@
conf_0.getClusterConfigurations().add(clusterConnectionConfig1);
conf_0.getClusterConfigurations().add(clusterConnectionConfig2);
- server_1 = HornetQ.newHornetQServer(conf_1, MBeanServerFactory.createMBeanServer(), false);
+ mbeanServer_1 = MBeanServerFactory.createMBeanServer();
+ server_1 = HornetQ.newHornetQServer(conf_1, mbeanServer_1, false);
server_1.start();
server_0 = HornetQ.newHornetQServer(conf_0, mbeanServer, false);
@@ -215,6 +219,9 @@
server_1 = null;
+ MBeanServerFactory.releaseMBeanServer(mbeanServer_1);
+ mbeanServer_1 = null;
+
super.tearDown();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java 2009-10-06 08:25:32 UTC (rev 8049)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java 2009-10-06 09:06:28 UTC (rev 8050)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.management;
+import java.util.Map;
+
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
@@ -84,6 +86,11 @@
{
return (String)proxy.retrieveAttributeValue("staticConnectorNamePairsAsJSON");
}
+
+ public Map<String, String> getNodes() throws Exception
+ {
+ return (Map<String, String>)proxy.retrieveAttributeValue("nodes");
+ }
public boolean isDuplicateDetection()
{
@@ -99,6 +106,11 @@
{
return (String)proxy.retrieveAttributeValue("name");
}
+
+ public String getNodeID()
+ {
+ return (String)proxy.retrieveAttributeValue("nodeID");
+ }
public boolean isStarted()
{
More information about the hornetq-commits
mailing list