[jboss-cvs] JBoss Messaging SVN: r5470 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Dec 7 23:59:35 EST 2008
Author: timfox
Date: 2008-12-07 23:59:34 -0500 (Sun, 07 Dec 2008)
New Revision: 5470
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
Log:
More reconnect tests
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2008-12-08 04:59:34 UTC (rev 5470)
@@ -424,7 +424,7 @@
private RemotingConnection getConnectionForCreateSession() throws MessagingException
{
while (true)
- {
+ {
RemotingConnection connection = getConnection(1);
if (connection == null)
@@ -755,13 +755,15 @@
}
private RemotingConnection getConnection(final int count)
- {
+ {
RemotingConnection conn;
+
+
if (connections.size() < maxConnections)
{
// Create a new one
-
+
DelegatingBufferHandler handler = new DelegatingBufferHandler();
Connector connector = connectorFactory.createConnector(transportParams, handler, this);
@@ -786,7 +788,7 @@
if (tc == null)
{
return null;
- }
+ }
conn = new RemotingConnectionImpl(tc, callTimeout, pingPeriod, connectionTTL, pingExecutor, null);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2008-12-07 14:27:39 UTC (rev 5469)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2008-12-08 04:59:34 UTC (rev 5470)
@@ -258,7 +258,7 @@
// FailureListener implementation --------------------------------
public synchronized boolean connectionFailed(final MessagingException me)
- {
+ {
//By the time this is called
synchronized (this)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java 2008-12-07 14:27:39 UTC (rev 5469)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java 2008-12-08 04:59:34 UTC (rev 5470)
@@ -151,12 +151,6 @@
ClientConsumer cons1 = session1.createConsumer(address1);
session1.start();
-
- ClientMessage message = session0.createClientMessage(false);
- SimpleString propKey = new SimpleString("propkey");
- SimpleString propVal = new SimpleString("propval");
- message.putStringProperty(propKey, propVal);
- message.getBody().flip();
//Now we will simulate a failure of the message flow connection between server1 and server2
//And prevent reconnection for a few tries, then it will reconnect without failing over
@@ -167,11 +161,25 @@
InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
- prod0.send(message);
+ final int numMessages = 10;
+
+ SimpleString propKey = new SimpleString("propkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
- ClientMessage r1 = cons1.receive(1000);
- assertNotNull(r1);
- assertEquals(propVal, r1.getProperty(propKey));
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage r1 = cons1.receive(500);
+ assertNotNull(r1);
+ assertEquals(i, r1.getProperty(propKey));
+ }
session0.close();
session1.close();
@@ -276,12 +284,8 @@
session2.start();
- ClientMessage message = session0.createClientMessage(false);
- SimpleString propKey = new SimpleString("propkey");
- SimpleString propVal = new SimpleString("propval");
- message.putStringProperty(propKey, propVal);
- message.getBody().flip();
+
//Now we will simulate a failure of the message flow connection between server1 and server2
//And prevent reconnection for a few tries, then it will failover
MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
@@ -291,11 +295,25 @@
InVMConnector.numberOfFailures = retriesBeforeFailover;
forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
- prod0.send(message);
+ final int numMessages = 10;
+
+ SimpleString propKey = new SimpleString("propkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
- ClientMessage r1 = cons1.receive(2000);
- assertNotNull(r1);
- assertEquals(propVal, r1.getProperty(propKey));
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage r1 = cons1.receive(500);
+ assertNotNull(r1);
+ assertEquals(i, r1.getProperty(propKey));
+ }
session0.close();
session2.close();
@@ -308,7 +326,388 @@
assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
}
+
+ public void testFailoverThenReconnectAfterFailover() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+
+ Map<String, Object> service2Params = new HashMap<String, Object>();
+ MessagingService service2 = createMessagingService(2, service2Params, true);
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params,
+ "server0tc");
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params,
+ "server1tc");
+
+ TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service2Params,
+ "server2tc");
+
+ connectors.put(server1tc.getName(), server1tc);
+
+ connectors.put(server2tc.getName(), server2tc);
+
+ service1.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ service1.getServer().getConfiguration().setBackupConnectorName(server2tc.getName());
+
+ service2.getServer().getConfiguration().setBackup(true);
+
+ service1.start();
+
+ service2.start();
+
+ log.info("Started service1 and service2");
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+ connectorNames.add(new Pair<String, String>(server1tc.getName(), server2tc.getName()));
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final long retryInterval = 50;
+ final double retryIntervalMultiplier = 1d;
+ final int retriesBeforeFailover = 3;
+ final int maxRetriesAfterFailover = 3;
+
+ final String flowName = "flow1";
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+ address1.toString(),
+ null,
+ true,
+ 1,
+ -1,
+ null,
+ retryInterval,
+ retryIntervalMultiplier,
+ retriesBeforeFailover,
+ maxRetriesAfterFailover,
+ connectorNames);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ log.info("started service0");
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+ ClientSession session2 = csf2.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session2.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session2.createConsumer(address1);
+
+ session2.start();
+
+ //Now we will simulate a failure of the message flow connection between server1 and server2
+ //And prevent reconnection for a few tries, then it will failover
+ MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+ Forwarder forwarder = flow.getForwarders().iterator().next();
+ RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = retriesBeforeFailover;
+ forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ //Now we should be failed over so fail again and should reconnect
+ forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+ InVMConnector.resetFailures();
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
+ forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ final int numMessages = 10;
+
+ SimpleString propKey = new SimpleString("propkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage r1 = cons1.receive(500);
+ assertNotNull(r1);
+ assertEquals(i, r1.getProperty(propKey));
+ }
+
+ session0.close();
+ session2.close();
+
+ service0.stop();
+ service1.stop();
+ service2.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testAutomaticReconnectSingleServer() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params,
+ "server0tc");
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params,
+ "server1tc");
+
+ connectors.put(server1tc.getName(), server1tc);
+
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+ connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final long retryInterval = 50;
+ final double retryIntervalMultiplier = 1d;
+ final int retriesBeforeFailover = 3;
+ final int maxRetriesAfterFailover = -1;
+
+ final String flowName = "flow1";
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+ address1.toString(),
+ null,
+ true,
+ 1,
+ -1,
+ null,
+ retryInterval,
+ retryIntervalMultiplier,
+ retriesBeforeFailover,
+ maxRetriesAfterFailover,
+ connectorNames);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session1.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session1.createConsumer(address1);
+
+ session1.start();
+
+ //Now we will simulate a failure of the message flow connection between server1 and server2
+ //And prevent reconnection for a few tries, then it will reconnect without failing over
+ MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+ Forwarder forwarder = flow.getForwarders().iterator().next();
+ RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
+ forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ final int numMessages = 10;
+
+ SimpleString propKey = new SimpleString("propkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage r1 = cons1.receive(500);
+ assertNotNull(r1);
+ assertEquals(i, r1.getProperty(propKey));
+ }
+
+ session0.close();
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testNonAutomaticReconnectSingleServer() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params,
+ "server0tc");
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params,
+ "server1tc");
+
+ connectors.put(server1tc.getName(), server1tc);
+
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+ connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final long retryInterval = 50;
+ final double retryIntervalMultiplier = 1d;
+ final int retriesBeforeFailover = 3;
+ final int maxRetriesAfterFailover = 0;
+
+ final String flowName = "flow1";
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+ address1.toString(),
+ null,
+ true,
+ 1,
+ -1,
+ null,
+ retryInterval,
+ retryIntervalMultiplier,
+ retriesBeforeFailover,
+ maxRetriesAfterFailover,
+ connectorNames);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session1.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session1.createConsumer(address1);
+
+ session1.start();
+
+ //Now we will simulate a failure of the message flow connection between server1 and server2
+ //And prevent reconnection for a few tries, then it will reconnect without failing over
+ MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+ Forwarder forwarder = flow.getForwarders().iterator().next();
+ RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = retriesBeforeFailover * 2;
+ forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ final int numMessages = 10;
+
+ SimpleString propKey = new SimpleString("propkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage r1 = cons1.receive(500);
+ assertNotNull(r1);
+ assertEquals(i, r1.getProperty(propKey));
+ }
+
+ //Now fail it again
+
+ InVMConnector.resetFailures();
+ forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = retriesBeforeFailover * 2;
+ forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage r1 = cons1.receive(500);
+ assertNotNull(r1);
+ assertEquals(i, r1.getProperty(propKey));
+ }
+
+ session0.close();
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list