[jboss-cvs] JBoss Messaging SVN: r5141 - in trunk: tests/src/org/jboss/messaging/tests/integration/cluster and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Oct 18 05:52:29 EDT 2008


Author: timfox
Date: 2008-10-18 05:52:29 -0400 (Sat, 18 Oct 2008)
New Revision: 5141

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
Log:
Improvements to tests plus fix lock bug in rollback on failover


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-18 09:52:29 UTC (rev 5141)
@@ -302,11 +302,14 @@
       }
    }
 
-   public synchronized void clear()
+   public void clear()
    {
+      synchronized (this)
+      {
+         buffer.clear();
+      }
+      
       waitForOnMessageToComplete();
-
-      buffer.clear();
    }
 
    public int getClientWindowSize()

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-10-17 21:34:38 UTC (rev 5140)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-10-18 09:52:29 UTC (rev 5141)
@@ -59,7 +59,7 @@
    // Constants -----------------------------------------------------
 
    private static final int RECEIVE_TIMEOUT = 5000;
-   
+
    private static final int NUM_THREADS = 10;
 
    // Attributes ----------------------------------------------------
@@ -79,7 +79,6 @@
 
    // Public --------------------------------------------------------
 
-
    public void testA() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -90,7 +89,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testB() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -101,7 +100,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testC() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -112,7 +111,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testD() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -156,7 +155,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testH() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -167,7 +166,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testI() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -178,7 +177,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testJ() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -189,7 +188,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testK() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -200,7 +199,7 @@
          }
       }, NUM_THREADS);
    }
-   
+
    public void testL() throws Exception
    {
       runTestMultipleThreads(new RunnableT()
@@ -212,10 +211,21 @@
       }, NUM_THREADS);
    }
    
+   public void testM() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestM(sf, threadNum);
+         }
+      }, NUM_THREADS);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
+
    protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws Exception
    {
       long start = System.currentTimeMillis();
@@ -250,54 +260,13 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
-      class MyHandler implements MessageHandler
-      {
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         volatile int count;
-
-         public void onMessage(ClientMessage message)
-         {
-            try
-            {
-               message.acknowledge();
-            }
-            catch (MessagingException me)
-            {
-               log.error("Failed to process", me);
-            }
-            
-            if (count >= numMessages)
-            {
-               return;
-            }
-
-            count++;
-            
-            if (count == numMessages)
-            {
-               latch.countDown();
-            }
-         }
-      }
-
       Set<MyHandler> handlers = new HashSet<MyHandler>();
 
       for (ClientConsumer consumer : consumers)
       {
-         MyHandler handler = new MyHandler();
+         MyHandler handler = new MyHandler(threadNum, numMessages);
 
          consumer.setMessageHandler(handler);
 
@@ -309,6 +278,11 @@
          boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
 
          assertTrue("Didn't receive all messages", ok);
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
       }
 
       sessSend.close();
@@ -363,59 +337,18 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       for (ClientSession session : sessions)
       {
          session.start();
       }
 
-      class MyHandler implements MessageHandler
-      {
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         volatile int count;
-
-         public void onMessage(ClientMessage message)
-         {
-            try
-            {
-               message.acknowledge();
-            }
-            catch (MessagingException me)
-            {
-               log.error("Failed to process", me);
-            }
-            
-            if (count >= numMessages)
-            {
-               return;
-            }
-
-            count++;
-            
-            if (count == numMessages)
-            {
-               latch.countDown();
-            }
-         }
-      }
-
       Set<MyHandler> handlers = new HashSet<MyHandler>();
 
       for (ClientConsumer consumer : consumers)
       {
-         MyHandler handler = new MyHandler();
+         MyHandler handler = new MyHandler(threadNum, numMessages);
 
          consumer.setMessageHandler(handler);
 
@@ -427,6 +360,11 @@
          boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          assertTrue(ok);
+         
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
       }
 
       sessSend.close();
@@ -481,74 +419,23 @@
          sessions.add(sessConsume);
       }
 
-      ClientSession sessSend = sf.createSession(false, true, true, false);
+      ClientSession sessSend = sf.createSession(false, false, false, false);
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.rollback();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.commit();
 
-      class MyHandler implements MessageHandler
-      {
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         volatile int count;
-
-         public void onMessage(ClientMessage message)
-         {
-            try
-            {
-               message.acknowledge();
-            }
-            catch (MessagingException me)
-            {
-               log.error("Failed to process", me);
-            }
-            
-            if (count >= numMessages)
-            {
-               return;
-            }
-
-            count++;
-            
-            if (count == numMessages)
-            {
-               latch.countDown();
-            }
-         }
-      }
-
       Set<MyHandler> handlers = new HashSet<MyHandler>();
 
       for (ClientConsumer consumer : consumers)
       {
-         MyHandler handler = new MyHandler();
+         MyHandler handler = new MyHandler(threadNum, numMessages);
 
          consumer.setMessageHandler(handler);
 
@@ -560,20 +447,13 @@
          boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          assertTrue(ok);
+         
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
       }
 
-      handlers.clear();
-
-      // New handlers
-      for (ClientConsumer consumer : consumers)
-      {
-         MyHandler handler = new MyHandler();
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
       for (ClientSession session : sessions)
       {
          session.rollback();
@@ -643,31 +523,11 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.rollback();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.commit();
 
@@ -676,42 +536,11 @@
          session.start();
       }
 
-      class MyHandler implements MessageHandler
-      {
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         volatile int count;
-
-         public void onMessage(ClientMessage message)
-         {
-            try
-            {
-               message.acknowledge();
-            }
-            catch (MessagingException me)
-            {
-               log.error("Failed to process", me);
-            }
-            
-            if (count >= numMessages)
-            {
-               return;
-            }
-
-            count++;
-            
-            if (count == numMessages)
-            {
-               latch.countDown();
-            }
-         }
-      }
-
       Set<MyHandler> handlers = new HashSet<MyHandler>();
 
       for (ClientConsumer consumer : consumers)
       {
-         MyHandler handler = new MyHandler();
+         MyHandler handler = new MyHandler(threadNum, numMessages);
 
          consumer.setMessageHandler(handler);
 
@@ -723,6 +552,11 @@
          boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          assertTrue(ok);
+         
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
       }
 
       handlers.clear();
@@ -730,7 +564,7 @@
       // New handlers
       for (ClientConsumer consumer : consumers)
       {
-         MyHandler handler = new MyHandler();
+         MyHandler handler = new MyHandler(threadNum, numMessages);
 
          consumer.setMessageHandler(handler);
 
@@ -810,30 +644,10 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         for (ClientConsumer consumer : consumers)
-         {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+      consumeMessages(consumers, numMessages);
 
-            assertNotNull(msg);
-
-            msg.acknowledge();
-         }
-      }
-
       sessSend.close();
       for (ClientSession session : sessions)
       {
@@ -886,35 +700,15 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       for (ClientSession session : sessions)
       {
          session.start();
       }
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         for (ClientConsumer consumer : consumers)
-         {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+      consumeMessages(consumers, numMessages);
 
-            assertNotNull(msg);
-
-            msg.acknowledge();
-         }
-      }
-
       sessSend.close();
       for (ClientSession session : sessions)
       {
@@ -969,64 +763,23 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.rollback();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.commit();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         for (ClientConsumer consumer : consumers)
-         {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+      consumeMessages(consumers, numMessages);
 
-            assertNotNull(msg);
-
-            msg.acknowledge();
-         }
-      }
-
       for (ClientSession session : sessions)
       {
          session.rollback();
       }
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         for (ClientConsumer consumer : consumers)
-         {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+      consumeMessages(consumers, numMessages);
 
-            assertNotNull(msg);
-
-            msg.acknowledge();
-         }
-      }
-
-
       for (ClientSession session : sessions)
       {
          session.commit();
@@ -1084,31 +837,11 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.rollback();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
-         producer.send(message);
-      }
+      sendMessages(sessSend, producer, numMessages, threadNum);
 
       sessSend.commit();
 
@@ -1117,31 +850,15 @@
          session.start();
       }
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         for (ClientConsumer consumer : consumers)
-         {            
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+      consumeMessages(consumers, numMessages);
 
-            msg.acknowledge();
-         }
-      }
-
       for (ClientSession session : sessions)
       {
          session.rollback();
       }
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         for (ClientConsumer consumer : consumers)
-         {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+      consumeMessages(consumers, numMessages);
 
-            msg.acknowledge();
-         }
-      }
-
       for (ClientSession session : sessions)
       {
          session.commit();
@@ -1274,28 +991,72 @@
 
       s.close();
    }
+   
+   protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      ClientSession sessCreate = sf.createSession(false, true, true, false);
 
+      sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
+
+      ClientSession sess = sf.createSession(false, true, true, false);
+      
+      sess.stop();
+
+      sess.start();
+
+      ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
+
+      ClientProducer producer = sess.createProducer(ADDRESS);
+
+      ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+      message.getBody().flip();
+
+      producer.send(message);
+      
+      sess.start();
+
+      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+      assertNotNull(message2);
+
+      message2.acknowledge();
+      
+      sess.stop();
+      
+      sess.start();
+
+      sess.close();
+
+      sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
+
+      sessCreate.close();
+   }
+
    protected int getNumIterations()
    {
       return 5;
    }
-   
+
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       log.info("************ Starting test " + this.getName());
-      
+
       timer = new Timer();
-      
+
    }
-   
+
    protected void tearDown() throws Exception
    {
       log.info("************* Ending test " + this.getName());
-      
+
       timer.cancel();
-      
+
       super.tearDown();
    }
 
@@ -1347,11 +1108,11 @@
                }
             }
          }
-         
+
          do
          {
             List<Runner> threads = new ArrayList<Runner>();
-            
+
             for (int i = 0; i < numThreads; i++)
             {
                Runner runner = new Runner(runnable, i);
@@ -1360,18 +1121,18 @@
 
                runner.start();
             }
-            
+
             for (Runner thread : threads)
             {
-               thread.join();               
+               thread.join();
 
                assertNull(thread.throwable);
             }
-            
+
             runnable.checkFail();
          }
          while (!failer.isExecuted());
-         
+
          session.close();
 
          assertEquals(0, sf.getSessionCount());
@@ -1379,18 +1140,18 @@
          stop();
       }
    }
-   
+
    private Failer startFailer(final long time, final ClientSession session)
    {
       Failer failer = new Failer(session);
 
       timer.schedule(failer, (long)(time * Math.random()), 100);
-      
+
       return failer;
    }
 
    private void start() throws Exception
-   {      
+   {
       Configuration backupConf = new ConfigurationImpl();
       backupConf.setSecurityEnabled(false);
       backupConf.setPacketConfirmationBatchSize(10);
@@ -1423,8 +1184,6 @@
 
       assertEquals(0, ConnectionRegistryImpl.instance.size());
 
-      // ConnectionRegistryImpl.instance.clear();
-
       assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
 
       backupService.stop();
@@ -1436,9 +1195,78 @@
       assertEquals(0, InVMRegistry.instance.size());
    }
 
+   private void sendMessages(final ClientSession sessSend,
+                             final ClientProducer producer,
+                             final int numMessages,
+                             final int threadNum) throws Exception
+   {
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("threadnum"), threadNum);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+   }
+
+   private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages) throws Exception
+   {
+      //We make sure the messages arrive in the order they were sent from a particular producer
+      Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+            if (consumerCounts == null)
+            {
+               consumerCounts = new HashMap<Integer, Integer>();
+               counts.put(consumer, consumerCounts);
+            }
+
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+            int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+
+            Integer c = consumerCounts.get(tn);
+            if (c == null)
+            {
+               c = new Integer(cnt);
+            }
+
+            if (cnt != c.intValue())
+            {
+               throw new Exception("Invalid count, expected " + c + " got " + cnt);
+            }
+            
+            c++;
+            
+            //Wrap
+            if (c == numMessages)
+            {
+               c = 0;
+            }
+            
+            consumerCounts.put(tn, c);
+
+            msg.acknowledge();
+         }
+      }
+   }
+
    // Inner classes -------------------------------------------------
 
-   class Failer extends TimerTask
+   private class Failer extends TimerTask
    {
       private final ClientSession session;
 
@@ -1470,17 +1298,18 @@
       }
    }
 
-   public abstract class RunnableT extends Thread
+   private abstract class RunnableT extends Thread
    {
       private volatile String failReason;
+
       private volatile Throwable throwable;
-      
+
       public void setFailed(final String reason, final Throwable throwable)
       {
          this.failReason = reason;
          this.throwable = throwable;
       }
-      
+
       public void checkFail()
       {
          if (throwable != null)
@@ -1492,6 +1321,82 @@
             fail(failReason);
          }
       }
+
       public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
    }
+
+   private class MyHandler implements MessageHandler
+   {
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      private Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+      volatile String failure;
+
+      final int tn;
+      
+      final int numMessages;
+      
+      volatile boolean done;
+
+      MyHandler(final int threadNum, final int numMessages)
+      {
+         this.tn = threadNum;
+         
+         this.numMessages = numMessages;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException me)
+         {
+            log.error("Failed to process", me);
+         }
+         
+         if (done)
+         {
+            return;
+         }
+
+         int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
+         int cnt = (Integer)message.getProperty(new SimpleString("count"));
+
+         Integer c = counts.get(threadNum);
+         if (c == null)
+         {
+            c = new Integer(cnt);
+         }
+         
+         //log.info("got message " + threadNum + "-" + cnt);
+
+         if (cnt != c.intValue())
+         {
+            failure = "Invalid count, expected " + c + " got " + cnt;
+            log.error(failure);
+
+            latch.countDown();
+         }
+         
+         if (tn == threadNum && c == numMessages - 1)
+         {
+            done = true;
+            latch.countDown();
+         }
+
+         c++;
+         //Wrap around at 100
+         if (c == 100)
+         {
+            c = 0;
+         }
+         
+         counts.put(threadNum, c);
+
+         
+      }
+   }
 }




More information about the jboss-cvs-commits mailing list