[jboss-cvs] JBoss Messaging SVN: r5467 - in trunk: tests/src/org/jboss/messaging/tests/integration/cluster/failover and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 5 12:21:18 EST 2008


Author: timfox
Date: 2008-12-05 12:21:17 -0500 (Fri, 05 Dec 2008)
New Revision: 5467

Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java
Log:
Fixed deadlock and tweaks


Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-12-05 16:40:47 UTC (rev 5466)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-12-05 17:21:17 UTC (rev 5467)
@@ -435,19 +435,21 @@
          {
             return;
          }
+         
+         destroyed = true;
+      }
 
-         log.warn(me.getMessage());
+      log.warn(me.getMessage());
 
-         // Then call the listeners
-         callListeners(me);
+      // Then call the listeners
+      callListeners(me);
 
-         internalClose();
+      internalClose();
 
-         for (Channel channel : channels.values())
-         {
-            channel.fail();
-         }
-      }
+      for (Channel channel : channels.values())
+      {
+         channel.fail();
+      }      
    }
 
    public void destroy()
@@ -458,14 +460,16 @@
          {
             return;
          }
+         
+         destroyed = true;
+      }
 
-         internalClose();
+      internalClose();
 
-         // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1421
-         // This affects clustering, so I'm keeping this out for now
-         // We need to inform Listeners about the connection being closed
-         // callListeners(null);
-      }
+      // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1421
+      // This affects clustering, so I'm keeping this out for now
+      // We need to inform Listeners about the connection being closed
+      // callListeners(null);      
    }
 
    public boolean isExpired(final long now)
@@ -578,8 +582,6 @@
 
       pingChannel.close();
 
-      destroyed = true;
-
       // We close the underlying transport connection
       transportConnection.close();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2008-12-05 16:40:47 UTC (rev 5466)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2008-12-05 17:21:17 UTC (rev 5467)
@@ -40,6 +40,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.MessagingService;
@@ -290,7 +291,9 @@
 
          if (!ok)
          {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
          }
 
          if (handler.failure != null)
@@ -375,7 +378,9 @@
 
          if (!ok)
          {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
          }
 
          if (handler.failure != null)
@@ -465,7 +470,9 @@
 
          if (!ok)
          {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
          }
 
          if (handler.failure != null)
@@ -538,7 +545,7 @@
 
          sessions.add(sessConsume);
       }
-      
+
       ClientSession sessSend = sf.createSession(false, false, false);
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
@@ -546,17 +553,16 @@
       sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.rollback();
-      
+
       sendMessages(sessSend, producer, numMessages, threadNum);
-      
+
       sessSend.commit();
-      
 
       for (ClientSession session : sessions)
       {
          session.start();
       }
-       
+
       Set<MyHandler> handlers = new HashSet<MyHandler>();
 
       for (ClientConsumer consumer : consumers)
@@ -574,7 +580,9 @@
 
          if (!ok)
          {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
          }
 
          if (handler.failure != null)
@@ -582,9 +590,9 @@
             throw new Exception("Handler failed: " + handler.failure);
          }
       }
-      
+
       handlers.clear();
-      
+
       // Set handlers to null
       for (ClientConsumer consumer : consumers)
       {
@@ -595,7 +603,7 @@
       {
          session.rollback();
       }
-      
+
       // New handlers
       for (ClientConsumer consumer : consumers)
       {
@@ -605,27 +613,29 @@
 
          handlers.add(handler);
       }
-      
+
       for (MyHandler handler : handlers)
       {
          boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          if (!ok)
          {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
          }
-         
+
          if (handler.failure != null)
          {
             throw new Exception("Handler failed on rollback: " + handler.failure);
          }
       }
-        
+
       for (ClientSession session : sessions)
       {
          session.commit();
       }
-      
+
       sessSend.close();
       for (ClientSession session : sessions)
       {
@@ -1236,8 +1246,12 @@
 
          final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
                                                                               new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                                                         backupParams));
-         
+                                                                                                         backupParams),
+                                                                              0,
+                                                                              1,
+                                                                              ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                              ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
+
          sf.setSendWindowSize(32 * 1024);
 
          ClientSession session = sf.createSession(false, false, false);
@@ -1298,6 +1312,8 @@
          }
          while (!failer.isExecuted());
 
+         InVMConnector.resetFailures();
+
          session.close();
 
          assertEquals(0, sf.numSessions());
@@ -1339,7 +1355,8 @@
               .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
       Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
       TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                   backupParams, "backup-connector");
+                                                                   backupParams,
+                                                                   "backup-connector");
       connectors.put(backupTC.getName(), backupTC);
       liveConf.setConnectorConfigurations(connectors);
       liveConf.setBackupConnectorName(backupTC.getName());
@@ -1402,9 +1419,9 @@
 
             int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
             int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-            
-           // log.info("Got message " + tn + ":" + cnt);
 
+            // log.info("Got message " + tn + ":" + cnt);
+
             Integer c = consumerCounts.get(tn);
             if (c == null)
             {
@@ -1450,6 +1467,8 @@
 
          RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
 
+         InVMConnector.numberOfFailures = 1;
+         InVMConnector.failOnCreateConnection = true;
          conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
 
          log.info("** Fail complete");
@@ -1538,11 +1557,11 @@
             c = new Integer(cnt);
          }
 
-         //log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
-         
+         // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
+
          if (tn == threadNum && cnt != c.intValue())
          {
-            failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;            
+            failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
             log.error(failure);
 
             latch.countDown();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java	2008-12-05 16:40:47 UTC (rev 5466)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java	2008-12-05 17:21:17 UTC (rev 5467)
@@ -36,6 +36,7 @@
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.MessagingService;
@@ -65,7 +66,7 @@
    private MessagingService backupService;
 
    private final Map<String, Object> backupParams = new HashMap<String, Object>();
-   
+
    private Timer timer;
 
    // Static --------------------------------------------------------
@@ -74,7 +75,6 @@
 
    // Public --------------------------------------------------------
 
-   
    public void testA() throws Exception
    {
       runTest(new RunnableT()
@@ -85,7 +85,7 @@
          }
       });
    }
-   
+
    public void testB() throws Exception
    {
       runTest(new RunnableT()
@@ -96,7 +96,7 @@
          }
       });
    }
-   
+
    public void testC() throws Exception
    {
       runTest(new RunnableT()
@@ -107,7 +107,7 @@
          }
       });
    }
-   
+
    public void testD() throws Exception
    {
       runTest(new RunnableT()
@@ -118,7 +118,7 @@
          }
       });
    }
-   
+
    public void testE() throws Exception
    {
       runTest(new RunnableT()
@@ -129,7 +129,7 @@
          }
       });
    }
-   
+
    public void testF() throws Exception
    {
       runTest(new RunnableT()
@@ -140,7 +140,7 @@
          }
       });
    }
-   
+
    public void testG() throws Exception
    {
       runTest(new RunnableT()
@@ -151,7 +151,7 @@
          }
       });
    }
-   
+
    public void testH() throws Exception
    {
       runTest(new RunnableT()
@@ -162,7 +162,7 @@
          }
       });
    }
-   
+
    public void testI() throws Exception
    {
       runTest(new RunnableT()
@@ -173,7 +173,7 @@
          }
       });
    }
-   
+
    public void testJ() throws Exception
    {
       runTest(new RunnableT()
@@ -184,7 +184,7 @@
          }
       });
    }
-   
+
    public void testK() throws Exception
    {
       runTest(new RunnableT()
@@ -195,7 +195,7 @@
          }
       });
    }
-   
+
    public void testL() throws Exception
    {
       runTest(new RunnableT()
@@ -206,7 +206,7 @@
          }
       });
    }
-   
+
    public void testN() throws Exception
    {
       runTest(new RunnableT()
@@ -221,17 +221,21 @@
    public void runTest(final RunnableT runnable) throws Exception
    {
       final int numIts = getNumIterations();
-      
+
       for (int its = 0; its < numIts; its++)
       {
          start();
 
          ClientSessionFactoryImpl sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
                                                                     new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                                               backupParams));
+                                                                                               backupParams),
+                                                                    0,
+                                                                    1,
+                                                                    ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                    ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
 
          sf.setSendWindowSize(32 * 1024);
-         
+
          ClientSession session = sf.createSession(false, false, false);
 
          Failer failer = startFailer(1000, session);
@@ -242,25 +246,27 @@
             runnable.run(sf);
          }
          while (!failer.isExecuted());
+         
+         InVMConnector.resetFailures();
 
          session.close();
 
          assertEquals(0, sf.numSessions());
-         
+
          assertEquals(0, sf.numConnections());
-         
+
          stop();
       }
    }
-       
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
+
    protected void doTestA(final ClientSessionFactory sf) throws Exception
    {
       long start = System.currentTimeMillis();
-      
+
       log.info("starting================");
 
       ClientSession s = sf.createSession(false, false, false);
@@ -902,7 +908,7 @@
 
       Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
       Set<ClientSession> sessions = new HashSet<ClientSession>();
-      
+
       for (int i = 0; i < numSessions; i++)
       {
          SimpleString subName = new SimpleString("sub" + i);
@@ -933,12 +939,12 @@
          message.getBody().flip();
          producer.send(message);
       }
-      
+
       for (ClientSession session : sessions)
       {
          session.start();
       }
-      
+
       for (int i = 0; i < numMessages; i++)
       {
          for (ClientConsumer consumer : consumers)
@@ -949,7 +955,7 @@
             {
                throw new IllegalStateException("Failed to receive message " + i);
             }
-            
+
             assertNotNull(msg);
 
             assertEquals(i, msg.getProperty(new SimpleString("count")));
@@ -982,7 +988,7 @@
       }
 
       s.close();
-      
+
       assertEquals(1, ((ClientSessionFactoryImpl)sf).numSessions());
 
       long end = System.currentTimeMillis();
@@ -1051,7 +1057,7 @@
       }
 
       sessSend.commit();
-      
+
       log.info("sent and committed");
 
       for (int i = 0; i < numMessages; i++)
@@ -1305,7 +1311,7 @@
 
       sessCreate.deleteQueue(ADDRESS);
 
-      sessCreate.close();     
+      sessCreate.close();
    }
 
    protected void doTestJ(final ClientSessionFactory sf) throws Exception
@@ -1379,7 +1385,6 @@
 
       s.close();
    }
-   
 
    protected void doTestN(final ClientSessionFactory sf) throws Exception
    {
@@ -1426,44 +1431,44 @@
 
       sessCreate.close();
    }
-   
+
    protected int getNumIterations()
    {
       return 20;
    }
-   
+
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       log.info("*********** created timer");
       timer = new Timer(true);
-      
+
       log.info("************ Starting test " + this.getName());
    }
-   
+
    protected void tearDown() throws Exception
    {
       timer.cancel();
-      
+
       log.info("************ Ended test " + this.getName());
-      
+
       InVMRegistry.instance.clear();
-      
+
       super.tearDown();
    }
-     
+
    // Private -------------------------------------------------------
-   
+
    private Failer startFailer(final long time, final ClientSession session)
    {
       Failer failer = new Failer((ClientSessionInternal)session);
 
       timer.schedule(failer, (long)(time * Math.random()), 100);
-      
+
       return failer;
    }
-   
+
    private void start() throws Exception
    {
       Configuration backupConf = new ConfigurationImpl();
@@ -1486,7 +1491,8 @@
               .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
       Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
       TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                   backupParams, "backup-connector");
+                                                                   backupParams,
+                                                                   "backup-connector");
       connectors.put(backupTC.getName(), backupTC);
       liveConf.setConnectorConfigurations(connectors);
       liveConf.setBackupConnectorName(backupTC.getName());
@@ -1506,26 +1512,28 @@
 
       assertEquals(0, InVMRegistry.instance.size());
    }
-  
+
    // Inner classes -------------------------------------------------
-   
+
    class Failer extends TimerTask
-   { 
+   {
       private final ClientSessionInternal session;
 
       private boolean executed;
 
       public Failer(final ClientSessionInternal session)
-      {     
+      {
          this.session = session;
       }
 
       public synchronized void run()
       {
          log.info("** Failing connection");
- 
+
+         InVMConnector.numberOfFailures = 1;
+         InVMConnector.failOnCreateConnection = true;
          session.getConnection().fail(new MessagingException(MessagingException.NOT_CONNECTED, "oops"));
-         
+
          log.info("** Fail complete");
 
          cancel();
@@ -1538,9 +1546,9 @@
          return executed;
       }
    }
-   
+
    public abstract class RunnableT
    {
-      abstract void run(final ClientSessionFactory sf) throws Exception;      
+      abstract void run(final ClientSessionFactory sf) throws Exception;
    }
 }




More information about the jboss-cvs-commits mailing list