[jboss-cvs] JBoss Messaging SVN: r5470 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Dec 7 23:59:35 EST 2008


Author: timfox
Date: 2008-12-07 23:59:34 -0500 (Sun, 07 Dec 2008)
New Revision: 5470

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
Log:
More reconnect tests


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2008-12-08 04:59:34 UTC (rev 5470)
@@ -424,7 +424,7 @@
    private RemotingConnection getConnectionForCreateSession() throws MessagingException
    {
       while (true)
-      {
+      {         
          RemotingConnection connection = getConnection(1);
    
          if (connection == null)
@@ -755,13 +755,15 @@
    }
 
    private RemotingConnection getConnection(final int count)
-   {
+   {      
       RemotingConnection conn;
+      
+      
 
       if (connections.size() < maxConnections)
       {
          // Create a new one
-
+         
          DelegatingBufferHandler handler = new DelegatingBufferHandler();
 
          Connector connector = connectorFactory.createConnector(transportParams, handler, this);
@@ -786,7 +788,7 @@
          if (tc == null)
          {
             return null;
-         }
+         }                 
 
          conn = new RemotingConnectionImpl(tc, callTimeout, pingPeriod, connectionTTL, pingExecutor, null);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2008-12-08 04:59:34 UTC (rev 5470)
@@ -258,7 +258,7 @@
    // FailureListener implementation --------------------------------
 
    public synchronized boolean connectionFailed(final MessagingException me)
-   {
+   {      
       //By the time this is called
       synchronized (this)
       {      

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java	2008-12-07 14:27:39 UTC (rev 5469)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java	2008-12-08 04:59:34 UTC (rev 5470)
@@ -151,12 +151,6 @@
       ClientConsumer cons1 = session1.createConsumer(address1);
 
       session1.start();
-
-      ClientMessage message = session0.createClientMessage(false);
-      SimpleString propKey = new SimpleString("propkey");
-      SimpleString propVal = new SimpleString("propval");
-      message.putStringProperty(propKey, propVal);
-      message.getBody().flip();
       
       //Now we will simulate a failure of the message flow connection between server1 and server2
       //And prevent reconnection for a few tries, then it will reconnect without failing over
@@ -167,11 +161,25 @@
       InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
       forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
 
-      prod0.send(message);
+      final int numMessages = 10;
+      
+      SimpleString propKey = new SimpleString("propkey");      
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);         
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+         
+         prod0.send(message);
+      }
 
-      ClientMessage r1 = cons1.receive(1000);
-      assertNotNull(r1);
-      assertEquals(propVal, r1.getProperty(propKey));
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage r1 = cons1.receive(500);
+         assertNotNull(r1);
+         assertEquals(i, r1.getProperty(propKey));
+      }
 
       session0.close();
       session1.close();
@@ -276,12 +284,8 @@
 
       session2.start();
 
-      ClientMessage message = session0.createClientMessage(false);
-      SimpleString propKey = new SimpleString("propkey");
-      SimpleString propVal = new SimpleString("propval");
-      message.putStringProperty(propKey, propVal);
-      message.getBody().flip();
       
+      
       //Now we will simulate a failure of the message flow connection between server1 and server2
       //And prevent reconnection for a few tries, then it will failover
       MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
@@ -291,11 +295,25 @@
       InVMConnector.numberOfFailures = retriesBeforeFailover;
       forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
 
-      prod0.send(message);
+      final int numMessages = 10;
+      
+      SimpleString propKey = new SimpleString("propkey");      
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);         
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+         
+         prod0.send(message);
+      }
 
-      ClientMessage r1 = cons1.receive(2000);
-      assertNotNull(r1);
-      assertEquals(propVal, r1.getProperty(propKey));
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage r1 = cons1.receive(500);
+         assertNotNull(r1);
+         assertEquals(i, r1.getProperty(propKey));
+      }
 
       session0.close();
       session2.close();
@@ -308,7 +326,388 @@
       assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
       assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
    }
+   
+   public void testFailoverThenReconnectAfterFailover() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      MessagingService service0 = createMessagingService(0, service0Params);
 
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      MessagingService service1 = createMessagingService(1, service1Params);
+            
+      Map<String, Object> service2Params = new HashMap<String, Object>();
+      MessagingService service2 = createMessagingService(2, service2Params, true);
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params,
+                                                                    "server0tc");
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params,
+                                                                    "server1tc");
+
+      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service2Params,
+                                                                    "server2tc");
+      
+      connectors.put(server1tc.getName(), server1tc);
+      
+      connectors.put(server2tc.getName(), server2tc);
+      
+      service1.getServer().getConfiguration().setConnectorConfigurations(connectors);
+      
+      service1.getServer().getConfiguration().setBackupConnectorName(server2tc.getName());
+      
+      service2.getServer().getConfiguration().setBackup(true);
+      
+      service1.start();
+      
+      service2.start();
+      
+      log.info("Started service1 and service2");
+                 
+      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+      connectorNames.add(new Pair<String, String>(server1tc.getName(), server2tc.getName()));
+
+      final SimpleString address1 = new SimpleString("testaddress");
+
+      final long retryInterval = 50;
+      final double retryIntervalMultiplier = 1d;
+      final int retriesBeforeFailover = 3;
+      final int maxRetriesAfterFailover = 3;
+      
+      final String flowName = "flow1";
+      
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+                                                                        address1.toString(),
+                                                                        null,
+                                                                        true,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        retryInterval,
+                                                                        retryIntervalMultiplier,
+                                                                        retriesBeforeFailover,
+                                                                        maxRetriesAfterFailover,
+                                                                        connectorNames);
+
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+      
+      log.info("started service0");
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+      ClientSession session2 = csf2.createSession(false, true, true);
+
+      session0.createQueue(address1, address1, null, false, false, false);
+      session2.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session2.createConsumer(address1);
+
+      session2.start();
+     
+      //Now we will simulate a failure of the message flow connection between server1 and server2
+      //And prevent reconnection for a few tries, then it will failover
+      MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+      Forwarder forwarder = flow.getForwarders().iterator().next();
+      RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = retriesBeforeFailover;
+      forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+      
+      //Now we should be failed over so fail again and should reconnect
+      forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+      InVMConnector.resetFailures();
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
+      forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      final int numMessages = 10;
+      
+      SimpleString propKey = new SimpleString("propkey");      
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);         
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+         
+         prod0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage r1 = cons1.receive(500);
+         assertNotNull(r1);
+         assertEquals(i, r1.getProperty(propKey));
+      }
+
+      session0.close();
+      session2.close();
+
+      service0.stop();
+      service1.stop();
+      service2.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+   }
+   
+   public void testAutomaticReconnectSingleServer() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      MessagingService service0 = createMessagingService(0, service0Params);
+
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      MessagingService service1 = createMessagingService(1, service1Params);
+      service1.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params,
+                                                                    "server0tc");
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params,
+                                                                    "server1tc");
+   
+      connectors.put(server1tc.getName(), server1tc);
+      
+ 
+      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+      connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+
+      final SimpleString address1 = new SimpleString("testaddress");
+
+      final long retryInterval = 50;
+      final double retryIntervalMultiplier = 1d;
+      final int retriesBeforeFailover = 3;
+      final int maxRetriesAfterFailover = -1;
+      
+      final String flowName = "flow1";
+      
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+                                                                        address1.toString(),
+                                                                        null,
+                                                                        true,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        retryInterval,
+                                                                        retryIntervalMultiplier,
+                                                                        retriesBeforeFailover,
+                                                                        maxRetriesAfterFailover,
+                                                                        connectorNames);
+
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+
+      session0.createQueue(address1, address1, null, false, false, false);
+      session1.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session1.createConsumer(address1);
+
+      session1.start();
+      
+      //Now we will simulate a failure of the message flow connection between server1 and server2
+      //And prevent reconnection for a few tries, then it will reconnect without failing over
+      MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+      Forwarder forwarder = flow.getForwarders().iterator().next();
+      RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
+      forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      final int numMessages = 10;
+      
+      SimpleString propKey = new SimpleString("propkey");      
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);         
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+         
+         prod0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage r1 = cons1.receive(500);
+         assertNotNull(r1);
+         assertEquals(i, r1.getProperty(propKey));
+      }
+
+      session0.close();
+      session1.close();
+
+      service0.stop();
+      service1.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+   
+   public void testNonAutomaticReconnectSingleServer() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      MessagingService service0 = createMessagingService(0, service0Params);
+
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      MessagingService service1 = createMessagingService(1, service1Params);
+      service1.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params,
+                                                                    "server0tc");
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params,
+                                                                    "server1tc");
+   
+      connectors.put(server1tc.getName(), server1tc);
+      
+ 
+      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+      connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+
+      final SimpleString address1 = new SimpleString("testaddress");
+
+      final long retryInterval = 50;
+      final double retryIntervalMultiplier = 1d;
+      final int retriesBeforeFailover = 3;
+      final int maxRetriesAfterFailover = 0;
+      
+      final String flowName = "flow1";
+      
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+                                                                        address1.toString(),
+                                                                        null,
+                                                                        true,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        retryInterval,
+                                                                        retryIntervalMultiplier,
+                                                                        retriesBeforeFailover,
+                                                                        maxRetriesAfterFailover,
+                                                                        connectorNames);
+
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+
+      session0.createQueue(address1, address1, null, false, false, false);
+      session1.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session1.createConsumer(address1);
+
+      session1.start();
+      
+      //Now we will simulate a failure of the message flow connection between server1 and server2
+      //And prevent reconnection for a few tries, then it will reconnect without failing over
+      MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+      Forwarder forwarder = flow.getForwarders().iterator().next();
+      RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = retriesBeforeFailover * 2;
+      forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      final int numMessages = 10;
+      
+      SimpleString propKey = new SimpleString("propkey");      
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);         
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+         
+         prod0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage r1 = cons1.receive(500);
+         assertNotNull(r1);
+         assertEquals(i, r1.getProperty(propKey));
+      }
+      
+      //Now fail it again
+      
+      InVMConnector.resetFailures();
+      forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = retriesBeforeFailover * 2;
+      forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);         
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+         
+         prod0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage r1 = cons1.receive(500);
+         assertNotNull(r1);
+         assertEquals(i, r1.getProperty(propKey));
+      }
+
+      session0.close();
+      session1.close();
+
+      service0.stop();
+      service1.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list