[jboss-cvs] JBoss Messaging SVN: r1661 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/stress

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 30 03:01:41 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-11-30 03:01:35 -0500 (Thu, 30 Nov 2006)
New Revision: 1661

Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
Log:
fix for http://jira.jboss.org/jira/browse/JBMESSAGING-660 and many TRACE logging improvments

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -217,8 +217,21 @@
     */
    public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
    {            
-      if (trace) { log.trace(this + " receiving " + msgs.size() + " message(s) from the remoting layer"); }
-                      
+      if (trace)
+      {
+         StringBuffer sb = new StringBuffer(this + " receiving [");
+         for(int i = 0; i < msgs.size(); i++)
+         {
+            sb.append(((MessageProxy)msgs.get(i)).getMessage().getMessageID());
+            if (i < msgs.size() - 1)
+            {
+               sb.append(",");
+            }
+         }
+         sb.append("] from the remoting layer");
+         log.trace(sb.toString());
+      }
+
       synchronized (mainLock)
       {
          if (closed)
@@ -233,7 +246,7 @@
                    
          buffer.addAll(msgs);                  
          
-         if (trace) { log.trace(this + " added messages to the buffer"); }            
+         if (trace) { log.trace(this + " added message(s) to the buffer"); }
          
          boolean full = buffer.size() >= bufferSize;         
          
@@ -341,9 +354,9 @@
       {
          sessionExecutor.execute(new Closer(result));
 
-         if (trace) { log.trace("blocking wait for Closer execution"); }
+         if (trace) { log.trace(this + " blocking wait for Closer execution"); }
          result.getResult();
-         if (trace) { log.trace("got Closer result"); }
+         if (trace) { log.trace(this + " got Closer result"); }
       }
       catch (InterruptedException e)
       {
@@ -426,7 +439,7 @@
                   }
                }
                               
-               if (trace) { log.trace("received " + m + " after being blocked on buffer"); }
+               if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
                        
                // If message is expired we still call pre and post deliver. This makes sure the
                // message is acknowledged so it gets removed from the queue/subscription.
@@ -436,14 +449,14 @@
                
                if (!m.getMessage().isExpired())
                {
-                  if (trace) { log.trace("message " + m + " is not expired, pushing it to the caller"); }
+                  if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
                   
                   break;
                }
                
                if (trace)
                {
-                  log.trace("message expired, discarding");
+                  log.trace(this + ": message expired, discarding");
                }
                
                // the message expired, so discard the message, adjust timeout and reenter the buffer

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -753,7 +753,7 @@
          if (trace) { log.trace("sent " + m); }
       }
       
-      if (trace) { log.trace("done the sends"); }
+      if (trace) { log.trace(tx + ": done the sends"); }
       
       // Then ack the acks
       
@@ -775,7 +775,7 @@
          if (trace) { log.trace("acked " + ack.getMessageID()); }
       }
       
-      if (trace) { log.trace("done the acks"); }
+      if (trace) { log.trace(tx + ": done the acks"); }
    }   
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -772,7 +772,20 @@
 
          try
          {
-            if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
+            if (trace)
+            {
+               StringBuffer sb = new StringBuffer(ServerConsumerEndpoint.this + " handing [");
+               for(int i = 0; i < list.size(); i++)
+               {
+                  sb.append(((MessageProxy)list.get(i)).getMessage().getMessageID());
+                  if (i < list.size() - 1)
+                  {
+                     sb.append(",");
+                  }
+               }
+               sb.append("] over to the remoting layer");
+               log.trace(sb.toString());
+            }
 
             ClientDelivery del = new ClientDelivery(list, id);
 

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -721,11 +721,12 @@
    
    protected void acknowledgeInternal(AckInfo ackInfo) throws Throwable
    {
-      //If the message was delivered via a connection consumer then the message needs to be acked
-      //via the original consumer that was used to feed the connection consumer - which
-      //won't be one of the consumers of this session
-      //Therefore we always look in the global map of consumers held in the server peer
-      ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
+      // If the message was delivered via a connection consumer then the message needs to be acked
+      // via the original consumer that was used to feed the connection consumer - which won't be
+      // one of the consumers of this session. Therefore we always look in the global map of
+      // consumers held in the server peer.
+      ServerConsumerEndpoint consumer =
+         connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
 
       if (consumer == null)
       {

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -649,146 +649,147 @@
          ListIterator iter = null;
          
          MessageReference ref = null;
-         
-         while (true)
-         {           
-            synchronized (refLock)
-            {              
+
+         synchronized (refLock)
+         {
+            while (true)
+            {
                if (iter == null)
                {
-                  ref = (MessageReference) messageRefs.peekFirst();
+                  ref = (MessageReference)messageRefs.peekFirst();
                }
                else
                {
                   if (iter.hasNext())
-                  {                        
+                  {
                      ref = (MessageReference)iter.next();
-                  } 
+                  }
                   else
                   {
                      ref = null;
                   }
                }
-            }
 
-            if (ref != null)
-            {
-               // Check if message is expired (we also do this on the client
-               // side)
-               // If so ack it from the channel
-               if (ref.isExpired())
+               if (ref != null)
                {
-                  if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+                  if (trace) { log.trace(this + " pushing " + ref); }
 
-                  // remove and acknowledge it
-                  if (iter == null)
+                  // Check if message is expired (we also do this on the client side). If so ack it
+                  // from the channel.
+
+                  if (ref.isExpired())
                   {
-                     removeFirstInMemory();
+                     if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+
+                     // remove and acknowledge it
+                     if (iter == null)
+                     {
+                        removeFirstInMemory();
+                     }
+                     else
+                     {
+                        iter.remove();
+                     }
+
+                     Delivery delivery = new SimpleDelivery(this, ref, true);
+
+                     acknowledgeInternal(delivery);
                   }
                   else
                   {
-                     iter.remove();
-                  }
+                     // Reference is not expired
 
-                  Delivery delivery = new SimpleDelivery(this, ref, true);
+                     // Attempt to push the ref to a receiver
+                     Delivery del = push(ref);
 
-                  acknowledgeInternal(delivery);
-               }
-               else
-               {
-                  // Reference is not expired
+                     if (del == null)
+                     {
+                        // No receiver, broken receiver or full receiver so we stop delivering; also
+                        // we need to decrement the delivery count, as no real delivery has been
+                        // actually performed
 
-                  // Attempt to push the ref to a receiver
-                  Delivery del = push(ref);
+                        if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
 
-                  if (del == null)
-                  {
-                     // No receiver, broken receiver or full receiver so we stop delivering; also
-                     // we need to decrement the delivery count, as no real delivery has been
-                     // actually performed
+                        receiversReady = false;
 
-                     if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
-
-                     receiversReady = false;
-                     
-                     return;
-                  }
-                  else if (!del.isSelectorAccepted())
-                  {
-                     // No receiver accepted the message because no selectors matched, so we create
-                     // an iterator (if we haven't already created it) to iterate through the refs
-                     // in the channel. 
-                     
-                     // TODO Note that this is only a partial solution since if there are messages
-                     // paged to storage it won't try those - i.e. it will only iterate through
-                     // those refs in memory. Dealing with refs in storage is somewhat tricky since
-                     // we can't just load them and iterate through them since we might run out of
-                     // memory, so we will need to load individual refs from storage given the
-                     // selector expressions. Secondly we should also introduce some in memory
-                     // indexes here to prevent having to iterate through all the refs every time.
-                     // Having said all that, having consumers on a queue that don't match many
-                     // messages is an antipattern and should be avoided by the user.
-                     if (iter == null)
+                        return;
+                     }
+                     else if (!del.isSelectorAccepted())
                      {
-                        iter = messageRefs.iterator();
-                     }                     
-                  }
-                  else
-                  {
-                     if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-                     
-                     // Receiver accepted the reference
+                        // No receiver accepted the message because no selectors matched, so we create
+                        // an iterator (if we haven't already created it) to iterate through the refs
+                        // in the channel.
 
-                     // We must synchronize here to cope with another race condition where message
-                     // is cancelled/acked in flight while the following few actions are being
-                     // performed. e.g. delivery could be cancelled acked after being removed from
-                     // state but before delivery being added (observed).
-                     synchronized (del)
+                        // TODO Note that this is only a partial solution since if there are messages
+                        // paged to storage it won't try those - i.e. it will only iterate through
+                        // those refs in memory. Dealing with refs in storage is somewhat tricky since
+                        // we can't just load them and iterate through them since we might run out of
+                        // memory, so we will need to load individual refs from storage given the
+                        // selector expressions. Secondly we should also introduce some in memory
+                        // indexes here to prevent having to iterate through all the refs every time.
+                        // Having said all that, having consumers on a queue that don't match many
+                        // messages is an antipattern and should be avoided by the user.
+                        if (iter == null)
+                        {
+                           iter = messageRefs.iterator();
+                        }
+                     }
+                     else
                      {
-                        if (trace) { log.trace(this + " incrementing delivery count for " + del); }
+                        if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
 
-                        // FIXME - It's actually possible the delivery could be
-                        // cancelled before it reaches
-                        // here, in which case we wouldn't get a delivery but we
-                        // still need to increment the
-                        // delivery count
-                        // All the problems related to these race conditions and
-                        // fiddly edge cases will disappear
-                        // once we do
-                        // http://jira.jboss.com/jira/browse/JBMESSAGING-355
-                        // This will make life a lot easier
+                        // Receiver accepted the reference
 
-                        if (!del.isCancelled())
+                        // We must synchronize here to cope with another race condition where message
+                        // is cancelled/acked in flight while the following few actions are being
+                        // performed. e.g. delivery could be cancelled acked after being removed from
+                        // state but before delivery being added (observed).
+                        synchronized (del)
                         {
-                           if (iter == null)
-                           {
-                              removeFirstInMemory();
-                           }
-                           else
-                           {
-                              iter.remove();                                
-                           }
+                           // FIXME - It's actually possible the delivery could be
+                           // cancelled before it reaches
+                           // here, in which case we wouldn't get a delivery but we
+                           // still need to increment the
+                           // delivery count
+                           // All the problems related to these race conditions and
+                           // fiddly edge cases will disappear
+                           // once we do
+                           // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+                           // This will make life a lot easier
 
-                           // delivered
-                           if (!del.isDone())
+                           if (!del.isCancelled())
                            {
-                              // Add the delivery to state
-                              synchronized (deliveryLock)
+                              if (iter == null)
                               {
-                                 deliveries.add(del);
+                                 removeFirstInMemory();
                               }
+                              else
+                              {
+                                 iter.remove();
+                                 if (trace) { log.trace(this + " removed current message from iterator"); }
+                              }
+
+                              // delivered
+                              if (!del.isDone())
+                              {
+                                 synchronized (deliveryLock)
+                                 {
+                                    deliveries.add(del);
+                                    if (trace) { log.trace(this + " starting to track  " + del); }
+                                 }
+                              }
                            }
                         }
                      }
                   }
                }
+               else
+               {
+                  // No more refs in channel
+                  if (trace) { log.trace(this + " no more refs to deliver "); }
+                  break;
+               }
             }
-            else
-            {
-               // No more refs in channel
-               if (trace) { log.trace(this + " no more refs to deliver "); }
-               break;
-            }
          }
       }
       catch (Throwable t)
@@ -845,7 +846,7 @@
             if (ref.isReliable() && recoverable)
             {
                // Reliable message in a recoverable state - also add to db
-               if (trace) { log.trace(this + "adding " + ref + " to database non-transactionally"); }
+               if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
 
                pm.addReference(channelID, ref, null);               
             }
@@ -862,20 +863,13 @@
          }
          else
          {
-            if (trace) { log.trace(this + "adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
-
             checkMemory();
 
             if (ref.isReliable() && !acceptReliableMessages)
             {
-               // this transaction has no chance to succeed, since a reliable
-               // message cannot be
-               // safely stored by a non-recoverable state, so doom the
-               // transaction
-               if (trace)
-               {
-                  log.trace(this + " cannot handle reliable messages, dooming the transaction");
-               }
+               // This transaction has no chance to succeed, since a reliable message cannot be
+               // safely stored by a non-recoverable state, so doom the transaction.
+               if (trace) { log.trace(this + " cannot handle reliable messages, dooming the transaction"); }
                tx.setRollbackOnly();
             } 
             else
@@ -883,18 +877,12 @@
                // add to post commit callback
                ref.setOrdering(messageOrdering.increment());
                this.getCallback(tx).addRef(ref);
-               if (trace)
-               {
-                  log.trace(this + " added transactionally " + ref
-                           + " in memory");
-               }
+               if (trace) { log.trace(this + " added " + ref + " to memory transactional callback, in transaction: " + tx); }
             }
 
             if (ref.isReliable() && recoverable)
             {
                // Reliable message in a recoverable state - also add to db
-               if (trace) { log.trace(this + "adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
-
                pm.addReference(channelID, ref, tx);
             }
          }
@@ -929,10 +917,7 @@
 
    protected void cancelInternal(Delivery del) throws Exception
    {
-      if (trace)
-      {
-         log.trace(this + " cancelling " + del + " in memory");
-      }
+      if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
 
       boolean removed;
 
@@ -969,13 +954,13 @@
             }
          }
          
-         //We may need to update the delivery count in the database
+         // We may need to update the delivery count in the database
          if (ref.isReliable())
          {
             pm.updateDeliveryCount(this.channelID, ref);
          }
 
-         if (trace) { log.trace(this + " added " + ref + " back into state"); }
+         if (trace) { log.trace(this + " added " + ref + " back into memory, ready for redelivery"); }
       }
    }
 
@@ -983,7 +968,7 @@
    {
       synchronized (refLock)
       {
-         MessageReference result = (MessageReference) messageRefs.removeFirst();
+         MessageReference result = (MessageReference)messageRefs.removeFirst();
 
          if (refsInStorage > 0)
          {
@@ -999,7 +984,8 @@
             paging = false;
          }
 
-         return (MessageReference) result;
+         if (trace) { log.trace(this + " removing first message in memory, which is " + result); }
+         return (MessageReference)result;
       }
    }
 
@@ -1091,11 +1077,7 @@
          {
             messageRefs.addLast(ref, ref.getPriority());
 
-            if (trace)
-            {
-               log.trace(this + " added " + ref
-                        + " non-transactionally in memory");
-            }
+            if (trace) { log.trace(this + " added " + ref + " in memory"); }
 
             if (messageRefs.size() == fullSize)
             {
@@ -1591,7 +1573,7 @@
          {
             MessageReference ref = (MessageReference) iter.next();
 
-            if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
+            if (trace) { log.trace(this + " adding " + ref + " to memory"); }
 
             try
             {

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -1621,11 +1621,11 @@
    {      
       if (tx != null)
       {
-         //In a tx so we just add the ref in the tx in memory for now
+         // We are in a transaction so we just add the ref in the tx in memory for now
 
          TransactionCallback callback = getCallback(tx);
-
          callback.addReferenceToAdd(channelID, ref);
+         if (trace) { log.trace(this + " added " + ref + " to database transactional callback, in transaction: " + tx); }
       }
       else
       {         
@@ -2670,6 +2670,8 @@
       //TODO - A slight optimisation - it's possible we have refs referring to the same message
       //so we will end up acquiring the lock more than once which is unnecessary
       //If find unique set of messages can avoid this
+      if (trace) { log.trace("handling before commit 1PC, tx: " + tx);  }
+
       List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
       Iterator iter = refsToAdd.iterator();
       while (iter.hasNext())

Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -194,6 +194,45 @@
       conn.close();
    }
 
+   public void test_Persistent_Transactional_Send() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+      Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+      Connection conn = cf.createConnection();
+
+      Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+      MessageProducer prod = session.createProducer(queue);
+      prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      TextMessage m = session.createTextMessage("message one");
+      prod.send(m);
+      m = session.createTextMessage("message two");
+      prod.send(m);
+
+      session.commit();
+
+      conn.close();
+
+      conn = cf.createConnection();
+
+      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageConsumer cons = session.createConsumer(queue);
+
+      conn.start();
+
+      TextMessage rm = (TextMessage)cons.receive();
+      assertEquals("message one", rm.getText());
+      rm = (TextMessage)cons.receive();
+      assertEquals("message two", rm.getText());
+
+      conn.close();
+   }
+
+
    public void test_NonPersistent_Transactional_Acknowledgment() throws Exception
    {
       ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
@@ -227,7 +266,6 @@
       conn.close();
    }
 
-
    public void test_Asynchronous_to_Client() throws Exception
    {
       ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");

Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java	2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java	2006-11-30 08:01:35 UTC (rev 1661)
@@ -41,9 +41,8 @@
 /**
  * This test was added to test regression on http://jira.jboss.com/jira/browse/JBMESSAGING-660
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision:$</tt>
- *          <p/>
- *          $Id:$
+ *
+ * $Id:$
  */
 public class ConcurrentCloseStressTest extends MessagingTestCase
 {
@@ -101,8 +100,8 @@
 
       for (int i = 0; i < 20; i++)
       {
-         producerThread[i] = new ProducerThread(connectionProducer, queue);
-         readerThread[i] = new ReaderThread(connectionReader, queue);
+         producerThread[i] = new ProducerThread(i, connectionProducer, queue);
+         readerThread[i] = new ReaderThread(i, connectionReader, queue);
          threads[i] = producerThread[i];
          threads[i+20] = readerThread[i];
       }
@@ -120,6 +119,7 @@
 
 
       boolean hasFailure=false;
+
       for (int i = 0; i < 40; i++)
       {
          if (threads[i].exceptions.size() > 0)
@@ -143,6 +143,7 @@
    static class TestThread extends Thread
    {
       ArrayList exceptions = new ArrayList();
+      protected int index;
    }
 
 
@@ -153,8 +154,9 @@
 
       Queue queue;
 
-      public ReaderThread(Connection conn, Queue queue) throws Exception
+      public ReaderThread(int index, Connection conn, Queue queue) throws Exception
       {
+         this.index = index;
          this.conn = conn;
          this.queue = queue;
       }
@@ -166,7 +168,7 @@
          int commitCounter = 0;
          try
          {
-            Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+            Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
             MessageConsumer consumer = session.createConsumer((Destination)queue);
 
             while (true)
@@ -177,24 +179,25 @@
                {
                   break;
                }
-               log.info("read message " + message.getText());
+               log.debug("read message " + message.getText());
 
                // alternating commits and rollbacks
-               if ( (commitCounter++) % 2 == 0 )
+               if ( (commitCounter++) % 2 == 0)
                {
-                  log.info("commit");
+                  log.debug("commit");
                   session.commit();
                }
                else
                {
-                  log.info("rollback");
+                  log.debug("rollback");
                   session.rollback();
                }
 
-               if (messageCounter%7 == 0 )
+               if (messageCounter%7 == 0)
                {
                   session.close();
-                  session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+                  session = conn.createSession(true, Session.SESSION_TRANSACTED);
                   consumer = session.createConsumer((Destination)queue);
                }
             }
@@ -207,6 +210,8 @@
          {
             e.printStackTrace();
             exceptions.add(e);
+//            log.debug("ReaderThread " + index + " died");
+//            System.exit(1);
          }
       }
 
@@ -221,8 +226,9 @@
 
       Queue queue;
 
-      public ProducerThread(Connection conn, Queue queue) throws Exception
+      public ProducerThread(int index, Connection conn, Queue queue) throws Exception
       {
+         this.index = index;
          this.conn = conn;
          this.queue = queue;
       }
@@ -234,21 +240,21 @@
          {
             try
             {
-               Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+               Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
                MessageProducer producer = sess.createProducer((Destination)queue);
 
                for (int j = 0; j < 20; j++)
                {
                   producer.send(sess.createTextMessage("Message " + i + ", " + j));
-                  log.info("Sending message " + j + " on i=" + i);
-                  if (j%2==0)
+                  log.debug("Message " + j + " on i=" + i + " sent");
+                  if (j % 2 == 0)
                   {
-                     log.info("commit");
+                     log.debug("commit");
                      sess.commit();
                   }
                   else
                   {
-                     log.info("rollback");
+                     log.debug("rollback");
                      sess.rollback();
                   }
 
@@ -262,6 +268,8 @@
             {
                e.printStackTrace();
                exceptions.add(e);
+//               log.debug("ProducerThread " + index + " died");
+//               System.exit(1);
             }
          }
       }




More information about the jboss-cvs-commits mailing list