[jboss-cvs] JBoss Messaging SVN: r8371 - in branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026: src/main/org/jboss/jms/client/container and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 23 19:39:21 EDT 2011


Author: jbertram
Date: 2011-06-23 19:39:20 -0400 (Thu, 23 Jun 2011)
New Revision: 8371

Modified:
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
SOA-3026


Property changes on: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026
___________________________________________________________________
Modified: svn:mergeinfo
   - /branches/Branch_1_4:7883-7884,7887-7892,7900,7906,7912-7914,7916,7930-7934,7936-7937,7941-7944,7962-7964,7966,7968-7971,7978-7979,7996,7999,8013,8060,8083,8114,8133-8134,8138,8141-8142,8154-8155,8157-8158,8160,8233-8234,8236,8256,8312-8313,8318,8323,8356,8360-8361
/branches/JBM1842:8169-8232
   + /branches/Branch_1_4:7883-7884,7887-7892,7900,7906,7912-7914,7916,7930-7934,7936-7937,7941-7944,7962-7964,7966,7968-7971,7978-7979,7996,7999,8013,8060,8083,8114,8133-8134,8138,8141-8142,8154-8155,8157-8158,8160,8233-8234,8236,8256,8312-8313,8318,8323,8333,8356,8360-8361
/branches/JBM1842:8169-8232

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -37,6 +37,7 @@
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
+import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.delegate.Cancel;
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.DefaultCancel;
@@ -145,7 +146,7 @@
       }
       
       DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, null);
             
       m.incDeliveryCount();
       
@@ -213,7 +214,7 @@
       }
       
       DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
             
       m.incDeliveryCount();
       
@@ -224,7 +225,10 @@
          //We need to call preDeliver, deliver the message then call postDeliver - this is because
          //it is legal to call session.recover(), or session.rollback() from within the onMessage()
          //method in which case the last message needs to be delivered so it needs to know about it
-         sess.preDeliver(deliveryInfo);
+         if (!sess.preDeliver(deliveryInfo))
+         {
+            return;
+         }
       } 
       
       try
@@ -296,6 +300,17 @@
    private boolean firstTime = true;
    private volatile Thread onMessageThread;
    private ExecutorService pool = Executors.newCachedThreadPool();
+   
+   //JBMESSAGING-1878 (case 1)
+   //first the buffer need to be synchronized between 
+   //failover clearing and message adding.
+   //when messageSource changes, all messages from old messageSource
+   //will be discarded as they will be redelivered.
+   //we simple take CallbackManager as the messageSource because
+   //every new connection will create a new CallbackManager
+   private Object consumerLock;
+   private Object messageSource;
+   private boolean isClustered = false;
 
    //JBMESSAGING-1876
    private long minTimeoutProcessTime;
@@ -336,6 +351,12 @@
       this.shouldAck = shouldAck;
       this.redeliveryDelay = redeliveryDelay;
       this.minTimeoutProcessTime = minTimeoutProcessTime;
+      this.isClustered = isClustered;
+      if (isClustered)
+      {
+         consumerLock = new Object();
+      }
+      messageSource = cbManager;
    }
         
    // Public ---------------------------------------------------------------------------------------
@@ -350,7 +371,7 @@
     *
     * @param message The message
     */
-   public void handleMessage(final Object message) throws Exception
+   public void handleMessage(final Object message, CallbackManager cbManager) throws Exception
    {
       ClientDelivery del = (ClientDelivery)message;
       
@@ -366,6 +387,8 @@
       {
          proxy.setJMSDestination(msg.getOriginalSuckerDestination());
       }
+      
+      proxy.setSource(cbManager);
 
       //TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
       //       failover where a message is sent then the valve is locked, and the message send cause
@@ -561,19 +584,21 @@
                
                if (!isConnectionConsumer && !ignore)
                {
-                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
+                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, m.getSource());
                   
                   if (timeout <= 0 || sessionDelegate.getTransacted())
                   {
-                     sessionDelegate.preDeliver(info);
+                     ignore = ! sessionDelegate.preDeliver(info);
 
                      // If post deliver didn't succeed and acknowledgement mode is auto_ack
                      // That means the ref wasn't acked since it couldn't be found.
                      // In order to maintain at most once semantics we must therefore not return
                      // the message
-
-                     ignore = !sessionDelegate.postDeliver();
                      
+                     if (!ignore)
+                     {
+                        ignore = !sessionDelegate.postDeliver();
+                     }
                   }
                   else
                   {
@@ -582,7 +607,10 @@
                      {
                         public Boolean call() throws Exception
                         {
-                           sessionDelegate.preDeliver(info);
+                           if (! sessionDelegate.preDeliver(info))
+                           {
+                              return true;
+                           }
 
                            // If post deliver didn't succeed and acknowledgement mode is auto_ack
                            // That means the ref wasn't acked since it couldn't be found.
@@ -701,6 +729,9 @@
    {
       synchronized (mainLock)
       {
+         //because this is local re-delivery, we update the source to allow to put into the Ack list again.
+         proxy.setSource(this.messageSource);
+         
          buffer.addFirst(proxy, proxy.getJMSPriority());
          
          consumeCount--;
@@ -719,12 +750,19 @@
       currentToken++;
    	
       consumerID = newHandler.consumerID;
+      
+      //must be clustered
+      synchronized(consumerLock)
+      {
+         //this will prevent 'old' messages comes in, see JBMESSAGING-1878
+         this.messageSource = newHandler.messageSource;
 
-      // Clear the buffer. This way the non persistent messages that managed to arrive are
-      // irredeemably lost, while the persistent ones are failed-over on the server and will be
-      // resent
+         // Clear the buffer. This way the non persistent messages that managed to arrive are
+         // irredeemably lost, while the persistent ones are failed-over on the server and will be
+         // resent
 
-      buffer.clear();
+         buffer.clear();
+      }
       
       consumeCount = 0;
    }
@@ -1052,14 +1090,39 @@
 
                 proxy.getMessage().doBeforeReceive();
 
-                //Add it to the buffer
-                buffer.addLast(proxy, proxy.getJMSPriority());
+                if (isClustered)
+                {
+                   synchronized (consumerLock)
+                   {
+                      //if source changed, discard it.
+                      if (proxy.getSource() == messageSource)
+                      {
+                         //Add it to the buffer
+                         buffer.addLast(proxy, proxy.getJMSPriority());
 
-                lastDeliveryId = proxy.getDeliveryId();
+                         lastDeliveryId = proxy.getDeliveryId();
+                      
+                         if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+                         messageAdded();
+                      }
+                      else
+                      {
+                         log.debug("Discarding message from old source " + proxy.getSource() + " on to new source " + messageSource);
+                      }
+                   }
+                }
+                else
+                {
+                   //Add it to the buffer
+                   buffer.addLast(proxy, proxy.getJMSPriority());
+
+                   lastDeliveryId = proxy.getDeliveryId();
                 
-                if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+                   if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
 
-                messageAdded();
+                   messageAdded();
+                }
              }
          }
          catch (Exception e)

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -27,6 +27,7 @@
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.client.state.ConnectionState;
@@ -89,6 +90,8 @@
       int maxDeliveries = consumerState.getMaxDeliveries();
       long redeliveryDelay = consumerState.getRedeliveryDelay();
       
+      FailoverCommandCenter fcc = connectionState.getFailoverCommandCenter();
+      
       //We need the queue name for recovering any deliveries after failover
       String queueName = null;
       if (consumerState.getSubscriptionName() != null)
@@ -109,6 +112,8 @@
       
       boolean autoFlowControl = ((Boolean)mi.getArguments()[5]).booleanValue();
       
+      CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+
       ClientConsumer messageHandler =
          new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
                             sessionDelegate, consumerDelegate, consumerID, queueName,
@@ -118,7 +123,6 @@
       
       sessionState.addCallbackHandler(messageHandler);
       
-      CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
       cm.registerHandler(consumerID, messageHandler);
          
       consumerState.setClientConsumer(messageHandler);

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -234,6 +234,8 @@
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
       
+      boolean result = true;
+      
       synchronized (state)
       {
       
@@ -254,8 +256,9 @@
                throw new IllegalStateException(
                   "CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
             }
-                     
-            state.getClientAckList().add(info);
+            
+            result = state.addToClientAckList(info);
+               
          }
          // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
          // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
@@ -300,12 +303,20 @@
                String sessionId = connectionConsumerDelegate != null ?
                   connectionConsumerDelegate.getID() : state.getSessionID();
                
-               connState.getResourceManager().addAck(txID, sessionId, info);
+               if (info.getSource() != null)
+               {
+                  //from a normal session (non CC).
+                  result = state.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
+               }
+               else
+               {
+                  connState.getResourceManager().addAck(txID, sessionId, info);
+               }
             }        
          }
       }
       
-      return null;
+      return Boolean.valueOf(result);
    }
    
    public Object handlePostDeliver(Invocation invocation) throws Throwable

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -382,7 +382,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void preDeliver(DeliveryInfo deliveryInfo) throws JMSException
+   public boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -97,7 +97,7 @@
 
          try
          {
-            handler.handleMessage(dr);
+            handler.handleMessage(dr, this);
          }
          catch (Exception e)
          {

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -31,6 +31,7 @@
 import java.util.Map;
 import java.util.concurrent.Executors;
 
+import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
@@ -128,6 +129,11 @@
    
    private boolean isCC;
    
+   private Object ackLock = new Object();
+   
+   private Object ackSource;
+   
+   
    // Constructors ---------------------------------------------------------------------------------
    
    public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -172,6 +178,8 @@
       this.setEnableOrderingGroup(enableOrderingGroup);
       
       this.setDefaultOrderingGroupName(defaultOrderingGroupName);
+      
+      this.ackSource = parent.getRemotingConnection().getCallbackManager();
    }
 
    // HierarchicalState implementation -------------------------------------------------------------
@@ -236,6 +244,14 @@
       // from before failover waiting in there and we don't want them to get delivered after
       // failover.
       executor.clearAllExceptCurrentTask();
+
+      //this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
+      //otherwise the message can be added to buffer after buffer cleared and added to acklist.
+      //JBMESSAGING-1878
+      synchronized (ackLock)
+      {
+         ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
+      }
       
       ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
 
@@ -292,7 +308,7 @@
 
       ConnectionState connState = (ConnectionState)getParent();
       ResourceManager rm = connState.getResourceManager();
-
+      
       // We need to failover from one session ID to another in the resource manager
       rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
 
@@ -407,6 +423,23 @@
       return clientAckList;
    }
    
+   public boolean addToClientAckList(DeliveryInfo info)
+   {
+      synchronized (ackLock)
+      {
+         if (ackSource == info.getSource())
+         {
+            clientAckList.add(info);
+            return true;
+         }
+         else
+         {
+            log.debug("Rejecting ack " + info + " from old source: " + info.getSource() + " on new source " + ackSource);
+            return false;
+         }
+      }
+   }
+   
    public void setClientAckList(List list)
    {
       this.clientAckList = list;
@@ -580,5 +613,22 @@
       return parent.getMinTimeoutProcessTime();
    }
 
+   public boolean addAckToResourceManager(ResourceManager rm, Object txID, String sessId, DeliveryInfo info) throws JMSException
+   {
+      synchronized (ackLock)
+      {
+         if (ackSource == info.getSource())
+         {
+            rm.addAck(txID, sessId, info);
+            return true;
+         }
+         else
+         {
+            log.debug("Rejecting tx ack " + info + " from old source " + info.getSource() + " on new source " + ackSource);
+            return false;
+         }
+      }
+   }
+
 }
 

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -58,12 +58,15 @@
    //to the connection consumer's session, otherwise it will be null
    private SessionDelegate connectionConsumerSession;
    
+   //mark where the msg is delivered from (a CallbackManager)
+   private Object source;
+   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
    public DeliveryInfo(MessageProxy msg, String consumerId, String queueName,
-                       SessionDelegate connectionConsumerSession, boolean shouldAck)
+                       SessionDelegate connectionConsumerSession, boolean shouldAck, Object source)
    {      
       this.msg = msg;
       
@@ -74,6 +77,8 @@
       this.connectionConsumerSession = connectionConsumerSession;
       
       this.shouldAck = shouldAck;
+      
+      this.source = source;
    }
 
    // Public --------------------------------------------------------
@@ -103,6 +108,11 @@
    	return shouldAck;
    }
    
+   public Object getSource()
+   {
+      return source;
+   }
+   
    public String toString()
    {
       return "Delivery[" + getDeliveryID() + ", " + msg + "]";

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -64,7 +64,7 @@
 
    TextMessageProxy createTextMessage(String text) throws JMSException;
 
-   void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
+   boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
 
    boolean postDeliver() throws JMSException;
 

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -95,6 +95,8 @@
       
    
    protected JBossMessage message;
+   
+   private Object source;
 
    // Constructors --------------------------------------------------
 
@@ -506,5 +508,15 @@
       needToCopyHeader = false;      
    }
 
+   public void setSource(Object source)
+   {
+      this.source = source;
+   }
+   
+   public Object getSource()
+   {
+      return this.source;
+   }
+
    // Inner classes -------------------------------------------------   
 }

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -131,7 +131,7 @@
                                                       boolean strictTck,
                                                       boolean sendAcksAsync,
                                                       boolean enableOrderingGroup,
-                                                      String defaultOrderingGroupName)
+                                                      String defaultOrderingGroupName,
                                                       long minTimeoutProcessTime)
       throws Exception
    {

Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2011-06-23 23:39:20 UTC (rev 8371)
@@ -643,7 +643,7 @@
 			{
 	         assertRemainingMessages(NUM_MESSAGES - i);
 	         
-				m = consumer.receive(200);
+				m = consumer.receive(2000);
 	         
 	         assertRemainingMessages(NUM_MESSAGES - (i + 1));
 	         
@@ -663,7 +663,7 @@
 	
 			log.trace("Session recover called");
 	
-			m = consumer.receive(200);
+			m = consumer.receive(2000);
 	
 			log.trace("Message is:" + m);
 	
@@ -755,7 +755,7 @@
 	      Message m = null;
 	      for (int i = 0; i < 10; i++)
 	      {
-	         m = consumer.receive(200);
+	         m = consumer.receive(2000);
 	         
 	         assertNotNull(m);
 	          
@@ -771,7 +771,7 @@
 	      
 	      for (int i = 0; i < 9; i++)
 	      {
-	         m = consumer.receive(200);
+	         m = consumer.receive(2000);
 	         
 	         assertNotNull(m);
 	         



More information about the jboss-cvs-commits mailing list