JBoss hornetq SVN: r10872 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 15:24:25 -0400 (Tue, 21 Jun 2011)
New Revision: 10872
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
Log:
Fixing bridge
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21 19:24:25 UTC (rev 10872)
@@ -746,7 +746,7 @@
return;
}
- long timeout = (long)(this.retryCount * this.retryMultiplier * this.retryMultiplier);
+ long timeout = (long)(this.retryInterval * Math.pow(this.retryMultiplier, retryCount));
if (timeout == 0)
{
timeout = this.retryInterval;
@@ -755,6 +755,8 @@
{
timeout = maxRetryInterval;
}
+
+ log.debug("Bridge " + this + " retrying connection #" + retryCount + ", maxRetry=" + reconnectAttemptsInUse + ", timeout=" + timeout);
scheduleRetryConnectFixedTimeout(timeout);
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21 19:24:25 UTC (rev 10872)
@@ -265,6 +265,7 @@
@Override
protected void afterConnect() throws Exception
{
+ super.afterConnect();
System.out.println("afterConnect");
setupNotificationConsumer();
}
@@ -275,7 +276,7 @@
super.stop();
}
- protected void failed(final boolean permanently)
+ protected void fail(final boolean permanently)
{
log.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
super.fail(permanently);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21 19:24:25 UTC (rev 10872)
@@ -506,7 +506,8 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "receiving nodeUP for nodeID=" + nodeID +
+ String ClusterTestBase = "receiving nodeUP for nodeID=";
+ log.debug(this + ClusterTestBase + nodeID +
" connectionPair=" + connectorPair, new Exception ("trace"));
}
// discard notifications about ourselves unless its from our backup
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21 19:24:25 UTC (rev 10872)
@@ -40,6 +40,7 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -1786,6 +1787,52 @@
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
+ protected void setupClusterConnection(final String name,
+ final String address,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ final int reconnectAttempts,
+ final long retryInterval,
+ final boolean netty,
+ final int nodeFrom,
+ final int... nodesTo)
+ {
+ HornetQServer serverFrom = servers[nodeFrom];
+
+ if (serverFrom == null)
+ {
+ throw new IllegalStateException("No server at node " + nodeFrom);
+ }
+
+ TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
+
+ List<String> pairs = new ArrayList<String>();
+ for (int element : nodesTo)
+ {
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+ pairs.add(serverTotc.getName());
+ }
+ ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+ address,
+ connectorFrom.getName(),
+ ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+ ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+ retryInterval,
+ ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+ ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ 1024,
+ pairs,
+ false);
+
+ serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
+ }
+
/**
* @param name
* @param address
@@ -1796,23 +1843,23 @@
* @return
*/
protected ClusterConnectionConfiguration createClusterConfig(final String name,
- final String address,
- final boolean forwardWhenNoConsumers,
- final int maxHops,
- TransportConfiguration connectorFrom,
- List<String> pairs)
- {
- ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- connectorFrom.getName(),
- 250,
- true,
- forwardWhenNoConsumers,
- maxHops,
- 1024,
- pairs, false);
- return clusterConf;
- }
+ final String address,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ TransportConfiguration connectorFrom,
+ List<String> pairs)
+ {
+ ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+ address,
+ connectorFrom.getName(),
+ 250,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ 1024,
+ pairs, false);
+ return clusterConf;
+ }
protected void setupClusterConnectionWithBackups(final String name,
final String address,
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-21 19:24:25 UTC (rev 10872)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
/**
* A SimpleSymmetricClusterTest
@@ -32,6 +33,7 @@
// Constants -----------------------------------------------------
static final Logger log = Logger.getLogger(SimpleSymmetricClusterTest.class);
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -39,13 +41,12 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
- public void setUp() throws Exception
+
+ public void setUp() throws Exception
{
super.setUp();
}
-
-
+
/**
* @param name
* @param address
@@ -56,11 +57,11 @@
* @return
*/
protected ClusterConnectionConfiguration createClusterConfig(final String name,
- final String address,
- final boolean forwardWhenNoConsumers,
- final int maxHops,
- TransportConfiguration connectorFrom,
- List<String> pairs)
+ final String address,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ TransportConfiguration connectorFrom,
+ List<String> pairs)
{
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
@@ -70,61 +71,65 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, false);
+ pairs,
+ false);
return clusterConf;
}
-
public void tearDown() throws Exception
{
stopServers(0, 1, 2);
super.tearDown();
}
-
+
public boolean isNetty()
{
return false;
}
-
+
public void testSimple() throws Exception
{
setupServer(0, true, isNetty());
setupServer(1, true, isNetty());
setupServer(2, true, isNetty());
-
+
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 2, 0);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 2, 0, 1);
-
+
startServers(0, 1, 2);
-
+
Thread.sleep(1000);
-
- for (int i = 0; i < 10; i++) log.info("****************************");
+
+ for (int i = 0; i < 10; i++)
+ log.info("****************************");
for (int i = 0; i <= 2; i++)
{
- log.info("*************************************\n " + servers[i] + " topology:\n" + servers[i].getClusterManager().getTopology().describe());
+ log.info("*************************************\n " + servers[i] +
+ " topology:\n" +
+ servers[i].getClusterManager().getTopology().describe());
}
- for (int i = 0; i < 10; i++) log.info("****************************");
+ for (int i = 0; i < 10; i++)
+ log.info("****************************");
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
-
- //Thread.sleep(1500);
-
+
+ // Thread.sleep(1500);
+
createQueue(0, "queues.testaddress", "queue0", null, false);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
createQueue(1, "queues.testaddress", "queue0", null, false);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
createQueue(2, "queues.testaddress", "queue0", null, false);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
addConsumer(0, 0, "queue0", null);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
addConsumer(1, 1, "queue0", null);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
addConsumer(2, 2, "queue0", null);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -136,6 +141,85 @@
}
+ public void testSimpleRoundRobbin() throws Exception
+ {
+ setupServer(0, true, isNetty());
+ setupServer(1, true, isNetty());
+ setupServer(2, true, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, 10, 100, isNetty(), 0, 1, 2);
+ setupClusterConnection("cluster1", "queues", false, 1, 10, 100, isNetty(), 1, 2, 0);
+ setupClusterConnection("cluster1", "queues", false, 1, 10, 100, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ // Thread.sleep(1500);
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+
+ addConsumer(0, 0, "queue0", null);
+ // Thread.sleep(1500);
+ addConsumer(1, 1, "queue0", null);
+ // Thread.sleep(1500);
+ addConsumer(2, 2, "queue0", null);
+ // Thread.sleep(1500);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 33, true, null);
+
+ verifyReceiveRoundRobin(33, 0, 1, 2);
+
+ stopServers(2);
+
+ Thread.sleep(5000);
+
+ send(0, "queues.testaddress", 100, true, null);
+
+ verifyReceiveRoundRobin(100, 0, 1);
+
+ sfs[2] = null;
+ consumers[2] = null;
+
+
+ startServers(2);
+
+ setupSessionFactory(2, isNetty());
+
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 33, true, null);
+
+ verifyReceiveRoundRobin(33, 0, 1, 2);
+
+
+
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
13 years, 6 months
JBoss hornetq SVN: r10871 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 14:11:11 -0400 (Tue, 21 Jun 2011)
New Revision: 10871
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21 17:52:32 UTC (rev 10870)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21 18:11:11 UTC (rev 10871)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.distribution;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -38,7 +40,6 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -337,16 +338,68 @@
}
System.out.println("=======================================================================");
- for (HornetQServer hornetQServer : servers)
+ try
{
- if (hornetQServer != null)
+ for (HornetQServer hornetQServer : servers)
{
- System.out.println(clusterDescription(hornetQServer));
+ if (hornetQServer != null)
+ {
+ System.out.println(clusterDescription(hornetQServer));
+ System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ }
}
+
+ for (HornetQServer hornetQServer : servers)
+ {
+ System.out.println("Management bindings on " + hornetQServer);
+ if (hornetQServer != null)
+ {
+ System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ }
+ }
}
+ catch (Throwable dontCare)
+ {
+ }
+
+
throw new IllegalStateException(msg);
}
+
+
+ protected String debugBindings(final HornetQServer server, final String address) throws Exception
+ {
+
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ if (server == null)
+ {
+ return "server is shutdown";
+ }
+ PostOffice po = server.getPostOffice();
+ if (po == null)
+ {
+ return "server is shutdown";
+ }
+ Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
+
+ out.println("=======================================================================");
+ out.println("Binding information for address = " + address + " on " + server);
+
+ for (Binding binding : bindings.getBindings())
+ {
+ QueueBinding qBinding = (QueueBinding)binding;
+
+ out.println("Binding = " + qBinding + ", queue=" + qBinding.getQueue());
+ }
+ out.println("=======================================================================");
+
+ return str.toString();
+
+ }
+
protected void createQueue(final int node,
final String address,
final String queueName,
@@ -368,6 +421,8 @@
{
filterString = ClusterTestBase.FILTER_PROP.toString() + "='" + filterVal + "'";
}
+
+ log.info("Creating " + queueName + " , address " + address + " on " + servers[node]);
session.createQueue(address, queueName, filterString, durable);
@@ -832,7 +887,7 @@
{
for (ClusterConnection cc : clusterManager.getClusterConnections())
{
- out += cc.description() + "\n";
+ out += cc.describe() + "\n";
}
}
out += "\n\nfull topology:";
@@ -1837,24 +1892,16 @@
servers[node].setIdentity("server " + node);
ClusterTestBase.log.info("starting server " + servers[node]);
servers[node].start();
+
ClusterTestBase.log.info("started server " + servers[node]);
ClusterTestBase.log.info("started server " + node);
- /*
- * we need to wait a lil while between server start up to allow the server to communicate in some order.
- * This is to avoid split brain on startup
- * */
- // TODO: Do we really need this?
- Thread.sleep(1000);
- }
- for (int node : nodes)
- {
- //wait for each server to start, it may be a backup and started in a separate thread
+
waitForServer(servers[node]);
}
}
- private void waitForServer(HornetQServer server)
+ protected void waitForServer(HornetQServer server)
throws InterruptedException
{
long timetowait =System.currentTimeMillis() + 5000;
13 years, 6 months
JBoss hornetq SVN: r10870 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 13:52:32 -0400 (Tue, 21 Jun 2011)
New Revision: 10870
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-21 17:29:32 UTC (rev 10869)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-21 17:52:32 UTC (rev 10870)
@@ -1323,6 +1323,8 @@
class StaticConnector implements Serializable
{
+ private static final long serialVersionUID = 6772279632415242634l;
+
private List<Connector> connectors;
public ClientSessionFactory connect() throws HornetQException
13 years, 6 months
JBoss hornetq SVN: r10869 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 13:29:32 -0400 (Tue, 21 Jun 2011)
New Revision: 10869
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-21 15:45:41 UTC (rev 10868)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-21 17:29:32 UTC (rev 10869)
@@ -49,7 +49,7 @@
sb.append(record.getLevel()).append(" [");
sb.append(stripPackage(record.getLoggerName())).append("]").append(" ");
- sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
+ //sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
sb.append(record.getMessage());
sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
@@ -67,7 +67,7 @@
{
}
}
- sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
+ //sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
return sb.toString();
}
13 years, 6 months
JBoss hornetq SVN: r10868 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 11:45:41 -0400 (Tue, 21 Jun 2011)
New Revision: 10868
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
just removing logs
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21 15:45:02 UTC (rev 10867)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21 15:45:41 UTC (rev 10868)
@@ -1061,11 +1061,6 @@
RemoteQueueBinding binding = bindings.get(clusterName);
- synchronized (System.err)
- {
- new Exception("Looking for consumer on " + clusterName + " binding = " + binding).printStackTrace(System.out);
- }
-
if (binding == null)
{
throw new IllegalStateException("Cannot find binding for " + clusterName + " on " + ClusterConnectionImpl.this);
13 years, 6 months
JBoss hornetq SVN: r10867 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 11:45:02 -0400 (Tue, 21 Jun 2011)
New Revision: 10867
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-21 15:34:44 UTC (rev 10866)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-21 15:45:02 UTC (rev 10867)
@@ -149,7 +149,7 @@
return count;
}
- public String describe()
+ public synchronized String describe()
{
String desc = "";
13 years, 6 months
JBoss hornetq SVN: r10866 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/postoffice/impl and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 11:34:44 -0400 (Tue, 21 Jun 2011)
New Revision: 10866
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -83,9 +83,9 @@
private static final long serialVersionUID = 2512460695662741413L;
private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
+
private static final boolean isDebug = log.isDebugEnabled();
// Attributes
@@ -206,7 +206,7 @@
closeExecutor = orderedExecutorFactory.getExecutor();
this.interceptors = interceptors;
-
+
}
public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
@@ -217,12 +217,11 @@
if (connection == null)
{
StringBuffer msg = new StringBuffer("Unable to connect to server using configuration ").append(connectorConfig);
- if(backupConfig != null)
+ if (backupConfig != null)
{
msg.append(" and backup configuration ").append(backupConfig);
}
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- msg.toString());
+ throw new HornetQException(HornetQException.NOT_CONNECTED, msg.toString());
}
}
@@ -234,11 +233,11 @@
public void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp)
{
- if(live.equals(connectorConfig) && backUp != null)
+ if (live.equals(connectorConfig) && backUp != null)
{
if (isDebug)
{
- log.debug("Setting up backup config = " + backUp + " for live = " + live);
+ log.debug("Setting up backup config = " + backUp + " for live = " + live);
}
backupConfig = backUp;
}
@@ -246,7 +245,11 @@
{
if (isDebug)
{
- log.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + " / " + backUp + " but it didn't belong to " + this.connectorConfig);
+ log.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live +
+ " / " +
+ backUp +
+ " but it didn't belong to " +
+ this.connectorConfig);
}
}
}
@@ -381,7 +384,7 @@
sessions.remove(session);
}
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -458,10 +461,10 @@
}
closed = true;
-
+
serverLocator.factoryClosed(this);
}
-
+
public void cleanup()
{
if (closed)
@@ -473,39 +476,36 @@
causeExit();
synchronized (createSessionLock)
{
- synchronized (failoverLock)
+ HashSet<ClientSessionInternal> sessionsToClose;
+ synchronized (sessions)
{
- HashSet<ClientSessionInternal> sessionsToClose;
- synchronized (sessions)
+ sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ }
+ // work on a copied set. the session will be removed from sessions when session.close() is called
+ for (ClientSessionInternal session : sessionsToClose)
+ {
+ try
{
- sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ session.cleanUp(false);
}
- // work on a copied set. the session will be removed from sessions when session.close() is called
- for (ClientSessionInternal session : sessionsToClose)
+ catch (Exception e)
{
- try
- {
- session.cleanUp(false);
- }
- catch (Exception e)
- {
- log.warn("Unable to close session", e);
- }
+ log.warn("Unable to close session", e);
}
+ }
- checkCloseConnection();
- }
+ checkCloseConnection();
}
closed = true;
}
- public boolean isClosed()
- {
- return closed;
- }
+ public boolean isClosed()
+ {
+ return closed;
+ }
- public ServerLocator getServerLocator()
+ public ServerLocator getServerLocator()
{
return serverLocator;
}
@@ -517,7 +517,7 @@
{
stopPingingAfterOne = true;
}
-
+
public void resumePinging()
{
stopPingingAfterOne = false;
@@ -553,12 +553,11 @@
return;
}
-
if (isTrace)
{
log.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
}
-
+
// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
callFailureListeners(me, false, false);
@@ -587,7 +586,6 @@
// It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
// until failover is complete
-
if (reconnectAttempts != 0)
{
lockChannel1();
@@ -928,7 +926,7 @@
{
sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
}
-
+
for (ClientSessionInternal session : sessionsToFailover)
{
session.handleFailover(connection);
@@ -938,7 +936,7 @@
private void getConnectionWithRetry(final int reconnectAttempts)
{
log.info("getConnectionWithRetry::" + reconnectAttempts);
-
+
long interval = retryInterval;
int count = 0;
@@ -961,7 +959,7 @@
if (reconnectAttempts != 0)
{
count++;
-
+
if (reconnectAttempts != -1 && count == reconnectAttempts)
{
log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up on reconnecting it.");
@@ -971,19 +969,21 @@
if (isTrace)
{
- log.trace("Waiting " + interval +
- " milliseconds before next retry. RetryInterval=" + retryInterval +
- " and multiplier = " + retryIntervalMultiplier);
+ log.trace("Waiting " + interval +
+ " milliseconds before next retry. RetryInterval=" +
+ retryInterval +
+ " and multiplier = " +
+ retryIntervalMultiplier);
}
-
+
try
{
- waitLock.wait(interval);
+ waitLock.wait(interval);
}
catch (InterruptedException ignore)
{
}
-
+
// Exponential back-off
long newInterval = (long)(interval * retryIntervalMultiplier);
@@ -1062,10 +1062,12 @@
try
{
DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
+
if (log.isDebugEnabled())
{
- log.debug("Trying to connect with connector = " + connectorFactory + ", parameters = " + connectorConfig.getParams());
+ log.debug("Trying to connect with connector = " + connectorFactory +
+ ", parameters = " +
+ connectorConfig.getParams());
}
connector = connectorFactory.createConnector(connectorConfig.getParams(),
@@ -1083,7 +1085,7 @@
{
log.debug("Trying to connect at the main server using connector :" + connectorConfig);
}
-
+
tc = connector.createConnection();
if (tc == null)
@@ -1092,7 +1094,7 @@
{
log.debug("Main server is not up. Hopefully there's a backup configured now!");
}
-
+
try
{
connector.close();
@@ -1104,8 +1106,8 @@
connector = null;
}
}
- //if connection fails we can try the backup in case it has come live
- if(connector == null && backupConfig != null)
+ // if connection fails we can try the backup in case it has come live
+ if (connector == null && backupConfig != null)
{
if (isDebug)
{
@@ -1113,11 +1115,11 @@
}
ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
connector = backupConnectorFactory.createConnector(backupConfig.getParams(),
- handler,
- this,
- closeExecutor,
- threadPool,
- scheduledThreadPool);
+ handler,
+ this,
+ closeExecutor,
+ threadPool,
+ scheduledThreadPool);
if (connector != null)
{
connector.start();
@@ -1130,7 +1132,7 @@
{
log.debug("Backup is not active yet");
}
-
+
try
{
connector.close();
@@ -1144,12 +1146,12 @@
else
{
/*looks like the backup is now live, lets use that*/
-
+
if (isDebug)
{
log.debug("Connected to the backup at " + backupConfig);
}
-
+
connectorConfig = backupConfig;
backupConfig = null;
@@ -1162,7 +1164,7 @@
{
if (isTrace)
{
- log.trace("No Backup configured!", new Exception ("trace"));
+ log.trace("No Backup configured!", new Exception("trace"));
}
}
}
@@ -1215,7 +1217,7 @@
{
log.debug("Defined connection " + connection);
}
-
+
connection.addFailureListener(new DelegatingFailureListener(connection.getID()));
Channel channel0 = connection.getChannel(0, -1);
@@ -1249,7 +1251,7 @@
{
log.debug("Subscribing Topology");
}
-
+
channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
if (serverLocator.isClusterConnection())
{
@@ -1258,9 +1260,7 @@
{
log.debug("Announcing node " + serverLocator.getNodeID() + ", isBackup=" + serverLocator.isBackup());
}
- channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
- serverLocator.isBackup(),
- config));
+ channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
}
}
}
@@ -1324,7 +1324,7 @@
if (connection != null)
{
Channel channel1 = connection.getChannel(1, -1);
-
+
if (channel1 != null)
{
channel1.returnBlocking();
@@ -1365,7 +1365,7 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
-
+
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1400,11 +1400,13 @@
{
if (isDebug)
{
- log.debug("Node " + topMessage.getNodeID() + " going up, connector = " + topMessage.getPair() + ", isLast=" + topMessage.isLast());
+ log.debug("Node " + topMessage.getNodeID() +
+ " going up, connector = " +
+ topMessage.getPair() +
+ ", isLast=" +
+ topMessage.isLast());
}
- serverLocator.notifyNodeUp(topMessage.getNodeID(),
- topMessage.getPair(),
- topMessage.isLast());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
}
@@ -1477,8 +1479,8 @@
first = false;
long now = System.currentTimeMillis();
-
- if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now >= lastCheck + connectionTTL )
+
+ if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now >= lastCheck + connectionTTL)
{
if (!connection.checkDataReceived())
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -18,8 +18,19 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQException;
@@ -665,258 +676,258 @@
}
}
- public synchronized boolean isHA()
+ public boolean isHA()
{
return ha;
}
- public synchronized boolean isCacheLargeMessagesClient()
+ public boolean isCacheLargeMessagesClient()
{
return cacheLargeMessagesClient;
}
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ public void setCacheLargeMessagesClient(final boolean cached)
{
cacheLargeMessagesClient = cached;
}
- public synchronized long getClientFailureCheckPeriod()
+ public long getClientFailureCheckPeriod()
{
return clientFailureCheckPeriod;
}
- public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+ public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
{
checkWrite();
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
}
- public synchronized long getConnectionTTL()
+ public long getConnectionTTL()
{
return connectionTTL;
}
- public synchronized void setConnectionTTL(final long connectionTTL)
+ public void setConnectionTTL(final long connectionTTL)
{
checkWrite();
this.connectionTTL = connectionTTL;
}
- public synchronized long getCallTimeout()
+ public long getCallTimeout()
{
return callTimeout;
}
- public synchronized void setCallTimeout(final long callTimeout)
+ public void setCallTimeout(final long callTimeout)
{
checkWrite();
this.callTimeout = callTimeout;
}
- public synchronized int getMinLargeMessageSize()
+ public int getMinLargeMessageSize()
{
return minLargeMessageSize;
}
- public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+ public void setMinLargeMessageSize(final int minLargeMessageSize)
{
checkWrite();
this.minLargeMessageSize = minLargeMessageSize;
}
- public synchronized int getConsumerWindowSize()
+ public int getConsumerWindowSize()
{
return consumerWindowSize;
}
- public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+ public void setConsumerWindowSize(final int consumerWindowSize)
{
checkWrite();
this.consumerWindowSize = consumerWindowSize;
}
- public synchronized int getConsumerMaxRate()
+ public int getConsumerMaxRate()
{
return consumerMaxRate;
}
- public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+ public void setConsumerMaxRate(final int consumerMaxRate)
{
checkWrite();
this.consumerMaxRate = consumerMaxRate;
}
- public synchronized int getConfirmationWindowSize()
+ public int getConfirmationWindowSize()
{
return confirmationWindowSize;
}
- public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+ public void setConfirmationWindowSize(final int confirmationWindowSize)
{
checkWrite();
this.confirmationWindowSize = confirmationWindowSize;
}
- public synchronized int getProducerWindowSize()
+ public int getProducerWindowSize()
{
return producerWindowSize;
}
- public synchronized void setProducerWindowSize(final int producerWindowSize)
+ public void setProducerWindowSize(final int producerWindowSize)
{
checkWrite();
this.producerWindowSize = producerWindowSize;
}
- public synchronized int getProducerMaxRate()
+ public int getProducerMaxRate()
{
return producerMaxRate;
}
- public synchronized void setProducerMaxRate(final int producerMaxRate)
+ public void setProducerMaxRate(final int producerMaxRate)
{
checkWrite();
this.producerMaxRate = producerMaxRate;
}
- public synchronized boolean isBlockOnAcknowledge()
+ public boolean isBlockOnAcknowledge()
{
return blockOnAcknowledge;
}
- public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
{
checkWrite();
this.blockOnAcknowledge = blockOnAcknowledge;
}
- public synchronized boolean isBlockOnDurableSend()
+ public boolean isBlockOnDurableSend()
{
return blockOnDurableSend;
}
- public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ public void setBlockOnDurableSend(final boolean blockOnDurableSend)
{
checkWrite();
this.blockOnDurableSend = blockOnDurableSend;
}
- public synchronized boolean isBlockOnNonDurableSend()
+ public boolean isBlockOnNonDurableSend()
{
return blockOnNonDurableSend;
}
- public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+ public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
{
checkWrite();
this.blockOnNonDurableSend = blockOnNonDurableSend;
}
- public synchronized boolean isAutoGroup()
+ public boolean isAutoGroup()
{
return autoGroup;
}
- public synchronized void setAutoGroup(final boolean autoGroup)
+ public void setAutoGroup(final boolean autoGroup)
{
checkWrite();
this.autoGroup = autoGroup;
}
- public synchronized boolean isPreAcknowledge()
+ public boolean isPreAcknowledge()
{
return preAcknowledge;
}
- public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+ public void setPreAcknowledge(final boolean preAcknowledge)
{
checkWrite();
this.preAcknowledge = preAcknowledge;
}
- public synchronized int getAckBatchSize()
+ public int getAckBatchSize()
{
return ackBatchSize;
}
- public synchronized void setAckBatchSize(final int ackBatchSize)
+ public void setAckBatchSize(final int ackBatchSize)
{
checkWrite();
this.ackBatchSize = ackBatchSize;
}
- public synchronized boolean isUseGlobalPools()
+ public boolean isUseGlobalPools()
{
return useGlobalPools;
}
- public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+ public void setUseGlobalPools(final boolean useGlobalPools)
{
checkWrite();
this.useGlobalPools = useGlobalPools;
}
- public synchronized int getScheduledThreadPoolMaxSize()
+ public int getScheduledThreadPoolMaxSize()
{
return scheduledThreadPoolMaxSize;
}
- public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+ public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
{
checkWrite();
this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
}
- public synchronized int getThreadPoolMaxSize()
+ public int getThreadPoolMaxSize()
{
return threadPoolMaxSize;
}
- public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ public void setThreadPoolMaxSize(final int threadPoolMaxSize)
{
checkWrite();
this.threadPoolMaxSize = threadPoolMaxSize;
}
- public synchronized long getRetryInterval()
+ public long getRetryInterval()
{
return retryInterval;
}
- public synchronized void setRetryInterval(final long retryInterval)
+ public void setRetryInterval(final long retryInterval)
{
checkWrite();
this.retryInterval = retryInterval;
}
- public synchronized long getMaxRetryInterval()
+ public long getMaxRetryInterval()
{
return maxRetryInterval;
}
- public synchronized void setMaxRetryInterval(final long retryInterval)
+ public void setMaxRetryInterval(final long retryInterval)
{
checkWrite();
maxRetryInterval = retryInterval;
}
- public synchronized double getRetryIntervalMultiplier()
+ public double getRetryIntervalMultiplier()
{
return retryIntervalMultiplier;
}
- public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+ public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
{
checkWrite();
this.retryIntervalMultiplier = retryIntervalMultiplier;
}
- public synchronized int getReconnectAttempts()
+ public int getReconnectAttempts()
{
return reconnectAttempts;
}
- public synchronized void setReconnectAttempts(final int reconnectAttempts)
+ public void setReconnectAttempts(final int reconnectAttempts)
{
checkWrite();
this.reconnectAttempts = reconnectAttempts;
@@ -933,23 +944,23 @@
return initialConnectAttempts;
}
- public synchronized boolean isFailoverOnInitialConnection()
+ public boolean isFailoverOnInitialConnection()
{
return this.failoverOnInitialConnection;
}
- public synchronized void setFailoverOnInitialConnection(final boolean failover)
+ public void setFailoverOnInitialConnection(final boolean failover)
{
checkWrite();
this.failoverOnInitialConnection = failover;
}
- public synchronized String getConnectionLoadBalancingPolicyClassName()
+ public String getConnectionLoadBalancingPolicyClassName()
{
return connectionLoadBalancingPolicyClassName;
}
- public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+ public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
{
checkWrite();
connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
@@ -975,12 +986,12 @@
return interceptors.remove(interceptor);
}
- public synchronized int getInitialMessagePacketSize()
+ public int getInitialMessagePacketSize()
{
return initialMessagePacketSize;
}
- public synchronized void setInitialMessagePacketSize(final int size)
+ public void setInitialMessagePacketSize(final int size)
{
checkWrite();
initialMessagePacketSize = size;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -213,7 +213,7 @@
{
if (isTrace)
{
- log.trace("Receiving notification : " + notification);
+ log.trace("Receiving notification : " + notification + " on server " + this.server);
}
synchronized (notificationLock)
{
@@ -471,7 +471,10 @@
String uid = UUIDGenerator.getInstance().generateStringUUID();
- System.out.println("Seding notification for addBinding " + binding);
+ if (isTrace)
+ {
+ log.trace("Seding notification for addBinding " + binding + " from server " + server);
+ }
managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -60,6 +60,8 @@
String getIdentity();
+ String describe();
+
Configuration getConfiguration();
RemotingService getRemotingService();
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -15,11 +15,11 @@
import java.util.Map;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.HornetQServer;
/**
* A ClusterConnection
@@ -35,6 +35,8 @@
SimpleString getName();
String getNodeID();
+
+ HornetQServer getServer();
/**
* @return a Map of node ID and addresses
@@ -46,7 +48,5 @@
TransportConfiguration getConnector();
// for debug
- String description();
-
- void nodeAnnounced(String nodeID, Pair<TransportConfiguration,TransportConfiguration> connectorPair);
+ String describe();
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -59,4 +59,9 @@
void deployBridge(BridgeConfiguration config) throws Exception;
void destroyBridge(String name) throws Exception;
+
+ /**
+ * @return
+ */
+ String describe();
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -28,11 +28,11 @@
import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
@@ -77,7 +77,7 @@
private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
- protected final ServerLocatorInternal serverLocator;
+ protected final ServerLocator serverLocator;
private final UUID nodeUUID;
@@ -140,7 +140,7 @@
// Public --------------------------------------------------------
- public BridgeImpl(final ServerLocatorInternal serverLocator,
+ public BridgeImpl(final ServerLocator serverLocator,
final int reconnectAttempts,
final long retryInterval,
final double retryMultiplier,
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -24,7 +24,7 @@
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
@@ -52,9 +52,9 @@
public class ClusterConnectionBridge extends BridgeImpl
{
private static final Logger log = Logger.getLogger(ClusterConnectionBridge.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
+
private final ClusterConnection clusterConnection;
private final MessageFlowRecord flowRecord;
@@ -69,8 +69,13 @@
private final String targetNodeID;
+ private final TransportConfiguration connector;
+
+ private final ServerLocatorInternal discoveryLocator;
+
public ClusterConnectionBridge(final ClusterConnection clusterConnection,
- final ServerLocatorInternal serverLocator,
+ final ServerLocator targetLocator,
+ final ServerLocatorInternal discoveryLocator,
final int reconnectAttempts,
final long retryInterval,
final double retryMultiplier,
@@ -94,7 +99,7 @@
final MessageFlowRecord flowRecord,
final TransportConfiguration connector) throws Exception
{
- super(serverLocator,
+ super(targetLocator,
reconnectAttempts,
retryInterval,
retryMultiplier,
@@ -112,18 +117,21 @@
password,
activated,
storageManager);
-
+
System.out.println("ClusterConnectionBridge");
+ this.discoveryLocator = discoveryLocator;
+
idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
-
+
this.clusterConnection = clusterConnection;
this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
-
+ this.connector = connector;
+
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
}
@@ -161,7 +169,16 @@
private void setupNotificationConsumer() throws Exception
{
- log.debug("Setting up notificationConsumer for " + flowRecord + " on bridge " + this.getName());
+ if (log.isDebugEnabled())
+ {
+ log.debug("Setting up notificationConsumer between " + this.clusterConnection.getConnector() +
+ " and " +
+ flowRecord.getBridge().getForwardingConnection() +
+ " clusterConnection = " +
+ this.clusterConnection.getName() +
+ " on server " +
+ clusterConnection.getServer());
+ }
if (flowRecord != null)
{
flowRecord.reset();
@@ -170,7 +187,9 @@
{
try
{
- log.debug("Closing notification Consumer for reopening " + notifConsumer + " on bridge " + this.getName());
+ log.debug("Closing notification Consumer for reopening " + notifConsumer +
+ " on bridge " +
+ this.getName());
notifConsumer.close();
notifConsumer = null;
@@ -183,7 +202,9 @@
// Get the queue data
- String qName = "notif." + UUIDGenerator.getInstance().generateStringUUID();
+ String qName = "notif." + UUIDGenerator.getInstance().generateStringUUID() +
+ "." +
+ clusterConnection.getServer();
SimpleString notifQueueName = new SimpleString(qName);
@@ -236,7 +257,7 @@
{
log.debug("Cluster connetion bridge on " + clusterConnection + " requesting information on queues");
}
-
+
prod.send(message);
}
}
@@ -247,23 +268,23 @@
System.out.println("afterConnect");
setupNotificationConsumer();
}
-
+
@Override
public void stop() throws Exception
{
super.stop();
}
-
+
protected void failed(final boolean permanently)
{
log.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
super.fail(permanently);
-
+
if (permanently)
{
log.debug("cluster node for bridge " + this.getName() + " is permanently down");
- serverLocator.notifyNodeDown(targetNodeID);
+ discoveryLocator.notifyNodeDown(targetNodeID);
}
-
+
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -16,6 +16,8 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,6 +32,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -367,6 +370,11 @@
{
return nodeUUID.toString();
}
+
+ public HornetQServer getServer()
+ {
+ return server;
+ }
public synchronized Map<String, String> getNodes()
{
@@ -388,7 +396,10 @@
return;
}
- log.debug("Activating cluster connection nodeID=" + nodeUUID);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Activating cluster connection nodeID=" + nodeUUID + " for server=" + this.server);
+ }
backup = false;
@@ -522,7 +533,7 @@
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null)
{
- return;
+ return;
}
/*we dont create bridges to backups*/
if(connectorPair.a == null)
@@ -538,95 +549,7 @@
if (record == null)
{
- // New node - create a new flow record
- final SimpleString queueName = new SimpleString("sf." + name + "." + nodeID);
-
- Binding queueBinding = postOffice.getBinding(queueName);
-
- Queue queue;
-
- if (queueBinding != null)
- {
- queue = (Queue)queueBinding.getBindable();
- }
- else
- {
- // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
- // actually routed to at that address though
- queue = server.createQueue(queueName, queueName, null, true, false);
- }
-
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
- }
- else
- {
- log.info("Reattaching nodeID=" + nodeID);
- }
- }
- catch (Exception e)
- {
- log.error("Failed to update topology", e);
- }
- }
- }
-
- public void nodeAnnounced(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
- {
- log.warn(this + " WTF nodeAnnounced nodeID=" + nodeID, new Exception ("trace"));
- if (log.isDebugEnabled())
- {
- log.debug(this + " received nodeAnnouncedUp for " + nodeID + ", connectorPair=" + connectorPair);
- }
-
- if (nodeID.equals(nodeUUID.toString()))
- {
- return;
- }
-
- // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
- {
- if (log.isDebugEnabled())
- {
- log.debug("Ignoring nodeUp message as it only allows direct connections");
- }
- return;
- }
-
- // FIXME required to prevent cluster connections w/o discovery group
- // and empty static connectors to create bridges... ulgy!
- if (serverLocator == null)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Ignoring nodeUp as serverLocator==null");
- }
- return;
- }
- /*we dont create bridges to backups*/
- if(connectorPair.a == null)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Igoring nodeup as connectorPair.a==null (backup)");
- }
- return;
- }
-
- synchronized (records)
- {
- if (isTrace)
- {
- log.trace("Adding record for nodeID=" + nodeID);
- }
- try
- {
- MessageFlowRecord record = records.get(nodeID);
-
- if (record == null)
- {
// New node - create a new flow record
final SimpleString queueName = new SimpleString("sf." + name + "." + nodeID);
@@ -648,13 +571,6 @@
createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
- else
- {
- if (isTrace)
- {
- log.trace("It already had a node created before, ignoring the nodeUp message");
- }
- }
}
catch (Exception e)
{
@@ -662,19 +578,24 @@
}
}
}
-
- private void createNewRecord(final String nodeID,
+
+ private void createNewRecord(final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue,
final boolean start) throws Exception
{
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(nodeID, connector, queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector, queueName, queue);
- records.put(nodeID, record);
+ records.put(targetNodeID, record);
- Bridge bridge = createBridge(record);
+ Bridge bridge = createClusteredBridge(record);
+ if (log.isDebugEnabled())
+ {
+ log.debug("PORRA creating record between " + this.connector + " and " + connector + bridge);
+ }
+
record.setBridge(bridge);
if (start)
@@ -688,16 +609,36 @@
* @return
* @throws Exception
*/
- protected Bridge createBridge(MessageFlowRecordImpl record) throws Exception
+ protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
+
+ ServerLocator targetLocator = HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+
+ targetLocator.setReconnectAttempts(0);
+
+ targetLocator.setInitialConnectAttempts(0);
+ targetLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ targetLocator.setConnectionTTL(connectionTTL);
+ targetLocator.setInitialConnectAttempts(0);
+
+ targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
+ targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
+ targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+
+ if(retryInterval > 0)
+ {
+ targetLocator.setRetryInterval(retryInterval);
+ }
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
+ targetLocator,
serverLocator,
reconnectAttempts,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
nodeUUID,
- record.getNodeID(),
+ record.getTargetNodeID(),
record.getQueueName(),
record.getQueue(),
executorFactory.getExecutor(),
@@ -724,7 +665,7 @@
{
private Bridge bridge;
- private final String nodeID;
+ private final String targetNodeID;
private final TransportConfiguration connector;
private final SimpleString queueName;
private final Queue queue;
@@ -733,21 +674,43 @@
private volatile boolean isClosed = false;
- private volatile boolean paused = false;
-
private volatile boolean firstReset = false;
- public MessageFlowRecordImpl(final String nodeID,
+ public MessageFlowRecordImpl(final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue)
{
this.queue = queue;
- this.nodeID = nodeID;
+ this.targetNodeID = targetNodeID;
this.connector = connector;
this.queueName = queueName;
}
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "MessageFlowRecordImpl [nodeID=" + targetNodeID +
+ ", connector=" +
+ connector +
+ ", queueName=" +
+ queueName +
+ ", queue=" +
+ queue +
+ ", isClosed=" +
+ isClosed +
+ ", firstReset=" +
+ firstReset +
+ "]";
+ }
+
+
+
public String getAddress()
{
return address.toString();
@@ -756,9 +719,9 @@
/**
* @return the nodeID
*/
- public String getNodeID()
+ public String getTargetNodeID()
{
- return nodeID;
+ return targetNodeID;
}
/**
@@ -1026,11 +989,6 @@
log.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this);
}
- synchronized (System.err)
- {
- new Exception("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this).printStackTrace(System.out);
- }
-
bindings.put(clusterName, binding);
try
@@ -1214,18 +1172,24 @@
return "ClusterConnectionImpl [nodeUUID=" + nodeUUID + ", connector=" + connector + ", address=" + address + "]";
}
- public String description()
+ public String describe()
{
- String out = name + " connected to\n";
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+
+ out.println(this);
+ out.println("***************************************");
+ out.println(name + " connected to");
for (Entry<String, MessageFlowRecord> messageFlow : records.entrySet())
{
- String nodeID = messageFlow.getKey();
- Bridge bridge = messageFlow.getValue().getBridge();
-
- out += "\t" + nodeID + " -- " + bridge.isStarted() + "\n";
+ out.println("\t Bridge = " + messageFlow.getValue().getBridge());
+ out.println("\t Flow Record = " + messageFlow.getValue());
}
+ out.println("***************************************");
- return out;
+
+ return str.toString();
}
interface ClusterConnector
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -15,6 +15,8 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.InetAddress;
import java.util.ArrayList;
@@ -142,7 +144,26 @@
this.clustered = clustered;
}
+
+ public String describe()
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ out.println("Information on " + this);
+ out.println("*******************************************************");
+ out.println("Topology: " + topology.describe());
+
+ for (ClusterConnection conn : this.clusterConnections.values())
+ {
+ out.println(conn.describe());
+ }
+
+ out.println("*******************************************************");
+ return str.toString();
+ }
+
public synchronized void start() throws Exception
{
if (started)
@@ -648,7 +669,7 @@
// We are going to manually retry on the bridge in case of failure
serverLocator.setReconnectAttempts(0);
- serverLocator.setInitialConnectAttempts(1);
+ serverLocator.setInitialConnectAttempts(-1);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -14,6 +14,8 @@
package org.hornetq.core.server.impl;
import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.nio.channels.ClosedChannelException;
import java.security.AccessController;
@@ -830,7 +832,16 @@
// HornetQServer implementation
// -----------------------------------------------------------
-
+ public String describe()
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ out.println("Information about server " + this.identity);
+ out.println("Cluster Connection:" + this.getClusterManager().describe());
+
+ return str.toString();
+ }
public void setIdentity(String identity)
{
this.identity = identity;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -443,6 +443,10 @@
{
try
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("deleting temporary queue " + bindingName);
+ }
if (postOffice.getBinding(bindingName) != null)
{
postOffice.removeBinding(bindingName);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -673,6 +673,10 @@
// https://jira.jboss.org/jira/browse/HORNETQ-317
if (messagingServer == null || !messagingServer.isInitialised())
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("ignoring message " + notification + " as the server is not initialized");
+ }
return;
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-21 15:03:33 UTC (rev 10865)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-21 15:34:44 UTC (rev 10866)
@@ -38,7 +38,7 @@
StringBuffer sb = new StringBuffer();
- sb.append("[").append(Thread.currentThread().getName()).append("] ");
+ sb.append("* [").append(Thread.currentThread().getName()).append("] ");
sb.append(calendar.get(GregorianCalendar.HOUR_OF_DAY) + ":" +
calendar.get(GregorianCalendar.MINUTE) +
":" +
@@ -49,6 +49,7 @@
sb.append(record.getLevel()).append(" [");
sb.append(stripPackage(record.getLoggerName())).append("]").append(" ");
+ sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
sb.append(record.getMessage());
sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
@@ -66,6 +67,8 @@
{
}
}
+ sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
+
return sb.toString();
}
13 years, 6 months
JBoss hornetq SVN: r10864 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 09:39:41 -0400 (Tue, 21 Jun 2011)
New Revision: 10864
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
Log:
Adding a test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-06-21 10:13:05 UTC (rev 10863)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-06-21 13:39:41 UTC (rev 10864)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.management;
+import java.util.LinkedList;
import java.util.Map;
import junit.framework.Assert;
@@ -888,6 +889,55 @@
session.deleteQueue(queue);
}
+ public void testRemoveMessage2() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ // send messages on queue
+
+ for (int i = 0 ; i < 100; i++)
+ {
+
+ ClientMessage msg = session.createMessage(false);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+ }
+
+ ClientConsumer cons = session.createConsumer(queue);
+ session.start();
+ LinkedList<ClientMessage> msgs = new LinkedList<ClientMessage>();
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage msg = cons.receive(1000);
+ msgs.add(msg);
+ }
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(100, queueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ Map<String, Object>[] messages = queueControl.listMessages(null);
+ Assert.assertEquals(50, messages.length);
+ assertEquals(50, ((Integer)messages[0].get("count")).intValue());
+ long messageID = (Long)messages[0].get("messageID");
+
+ // delete 1st message
+ boolean deleted = queueControl.removeMessage(messageID);
+ Assert.assertTrue(deleted);
+ Assert.assertEquals(99, queueControl.getMessageCount());
+
+ cons.close();
+
+ // check there is a single message to consume from queue
+ ManagementTestBase.consumeMessages(99, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
public void testCountMessagesWithFilter() throws Exception
{
SimpleString key = new SimpleString("key");
@@ -1477,6 +1527,7 @@
locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnNonDurableSend(true);
+ locator.setConsumerWindowSize(0);
ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, true, false);
session.start();
13 years, 6 months
JBoss hornetq SVN: r10863 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster: distribution and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-21 06:13:05 -0400 (Tue, 21 Jun 2011)
New Revision: 10863
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java
Log:
HORNETQ-720 Fix NPEs in Replication tests
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2011-06-20 18:02:25 UTC (rev 10862)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2011-06-21 10:13:05 UTC (rev 10863)
@@ -20,7 +20,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.server.HornetQServer;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-20 18:02:25 UTC (rev 10862)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21 10:13:05 UTC (rev 10863)
@@ -38,7 +38,6 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-20 18:02:25 UTC (rev 10862)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-21 10:13:05 UTC (rev 10863)
@@ -167,6 +167,7 @@
config1.setSecurityEnabled(false);
config1.setSharedStore(false);
config1.setBackup(true);
+ backupConfig = config1;
backupServer = createBackupServer();
Configuration config0 = super.createDefaultConfig();
@@ -177,6 +178,7 @@
//liveConfig.setBackupConnectorName("toBackup");
config0.setSecurityEnabled(false);
config0.setSharedStore(false);
+ liveConfig = config0;
liveServer = createLiveServer();
backupServer.start();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-06-20 18:02:25 UTC (rev 10862)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-06-21 10:13:05 UTC (rev 10863)
@@ -46,13 +46,13 @@
{
return new SameProcessHornetQServer(createServer(true, liveConfig));
}
-
+
@Override
protected TestableServer createBackupServer()
{
return new SameProcessHornetQServer(createServer(true, backupConfig));
}
-
+
@Override
protected void createConfigs() throws Exception
{
@@ -64,8 +64,9 @@
config1.setSecurityEnabled(false);
config1.setSharedStore(false);
config1.setBackup(true);
+ backupConfig = config1;
backupServer = createBackupServer();
-
+
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
@@ -74,8 +75,9 @@
liveConfig.setBackupConnectorName("toBackup");*/
config0.setSecurityEnabled(false);
config0.setSharedStore(false);
+ liveConfig = config0;
liveServer = createLiveServer();
-
+
backupServer.start();
liveServer.start();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java 2011-06-20 18:02:25 UTC (rev 10862)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java 2011-06-21 10:13:05 UTC (rev 10863)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.failover;
+import org.hornetq.core.server.impl.InVMNodeManager;
+
/**
* A ReplicatedFailoverTest
*
@@ -38,6 +40,13 @@
// Protected -----------------------------------------------------
@Override
+ protected void setUp() throws Exception
+ {
+ nodeManager = new InVMNodeManager();
+ super.setUp();
+ }
+
+ @Override
protected void createConfigs() throws Exception
{
createReplicatedConfigs();
13 years, 6 months
JBoss hornetq SVN: r10862 - tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-06-20 14:02:25 -0400 (Mon, 20 Jun 2011)
New Revision: 10862
Added:
tags/HornetQ_2_2_6_Final_pending/
Log:
pending release for 2.2.6.Final
13 years, 6 months