[jboss-cvs] JBoss Messaging SVN: r8333 - in branches/Branch_1_4: src/main/org/jboss/jms/client/delegate and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 13 22:20:57 EDT 2011


Author: gaohoward
Date: 2011-06-13 22:20:56 -0400 (Mon, 13 Jun 2011)
New Revision: 8333

Modified:
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java
   branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
JBMESSAGING-1878


Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -37,6 +37,7 @@
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
+import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.delegate.Cancel;
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.DefaultCancel;
@@ -145,7 +146,7 @@
       }
       
       DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, null);
             
       m.incDeliveryCount();
       
@@ -213,7 +214,7 @@
       }
       
       DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
             
       m.incDeliveryCount();
       
@@ -224,7 +225,10 @@
          //We need to call preDeliver, deliver the message then call postDeliver - this is because
          //it is legal to call session.recover(), or session.rollback() from within the onMessage()
          //method in which case the last message needs to be delivered so it needs to know about it
-         sess.preDeliver(deliveryInfo);
+         if (!sess.preDeliver(deliveryInfo))
+         {
+            return;
+         }
       } 
       
       try
@@ -299,6 +303,17 @@
    private long maxRetryChangeRate;
    private long retryChangeRateInterval;
    private boolean abortReceive;
+   
+   //JBMESSAGING-1878 (case 1)
+   //first the buffer need to be synchronized between 
+   //failover clearing and message adding.
+   //when messageSource changes, all messages from old messageSource
+   //will be discarded as they will be redelivered.
+   //we simple take CallbackManager as the messageSource because
+   //every new connection will create a new CallbackManager
+   private Object consumerLock;
+   private Object messageSource;
+   private boolean isClustered = false;
 
    public int getBufferSize()
    {
@@ -314,7 +329,9 @@
                          int maxDeliveries, boolean shouldAck,
                          long redeliveryDelay,
                          long maxRetryChangeRate,
-                         long retryChangeRateInterval)
+                         long retryChangeRateInterval,
+                         boolean isClustered,
+                         CallbackManager cbManager)
    {
       if (bufferSize < 1)
       {
@@ -336,6 +353,12 @@
       this.redeliveryDelay = redeliveryDelay;
       this.maxRetryChangeRate = maxRetryChangeRate;
       this.retryChangeRateInterval = retryChangeRateInterval;
+      this.isClustered = isClustered;
+      if (isClustered)
+      {
+         consumerLock = new Object();
+      }
+      messageSource = cbManager;
    }
         
    // Public ---------------------------------------------------------------------------------------
@@ -350,7 +373,7 @@
     *
     * @param message The message
     */
-   public void handleMessage(final Object message) throws Exception
+   public void handleMessage(final Object message, CallbackManager cbManager) throws Exception
    {
       ClientDelivery del = (ClientDelivery)message;
       
@@ -366,6 +389,8 @@
       {
          proxy.setJMSDestination(msg.getOriginalSuckerDestination());
       }
+      
+      proxy.setSource(cbManager);
 
       //TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
       //       failover where a message is sent then the valve is locked, and the message send cause
@@ -561,19 +586,21 @@
                
                if (!isConnectionConsumer && !ignore)
                {
-                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
+                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, m.getSource());
                   
                   if (timeout <= 0)
                   {
-                     sessionDelegate.preDeliver(info);
+                     ignore = ! sessionDelegate.preDeliver(info);
 
                      // If post deliver didn't succeed and acknowledgement mode is auto_ack
                      // That means the ref wasn't acked since it couldn't be found.
                      // In order to maintain at most once semantics we must therefore not return
                      // the message
-
-                     ignore = !sessionDelegate.postDeliver();
                      
+                     if (!ignore)
+                     {
+                        ignore = !sessionDelegate.postDeliver();
+                     }
                   }
                   else
                   {
@@ -582,7 +609,10 @@
                      {
                         public Boolean call() throws Exception
                         {
-                           sessionDelegate.preDeliver(info);
+                           if (! sessionDelegate.preDeliver(info))
+                           {
+                              return true;
+                           }
 
                            // If post deliver didn't succeed and acknowledgement mode is auto_ack
                            // That means the ref wasn't acked since it couldn't be found.
@@ -698,6 +728,9 @@
    {
       synchronized (mainLock)
       {
+         //because this is local re-delivery, we update the source to allow to put into the Ack list again.
+         proxy.setSource(this.messageSource);
+         
          buffer.addFirst(proxy, proxy.getJMSPriority());
          
          consumeCount--;
@@ -716,12 +749,19 @@
       currentToken++;
    	
       consumerID = newHandler.consumerID;
+      
+      //must be clustered
+      synchronized(consumerLock)
+      {
+         //this will prevent 'old' messages comes in, see JBMESSAGING-1878
+         this.messageSource = newHandler.messageSource;
 
-      // Clear the buffer. This way the non persistent messages that managed to arrive are
-      // irredeemably lost, while the persistent ones are failed-over on the server and will be
-      // resent
+         // Clear the buffer. This way the non persistent messages that managed to arrive are
+         // irredeemably lost, while the persistent ones are failed-over on the server and will be
+         // resent
 
-      buffer.clear();
+         buffer.clear();
+      }
       
       consumeCount = 0;
    }
@@ -1096,14 +1136,39 @@
 
                 proxy.getMessage().doBeforeReceive();
 
-                //Add it to the buffer
-                buffer.addLast(proxy, proxy.getJMSPriority());
+                if (isClustered)
+                {
+                   synchronized (consumerLock)
+                   {
+                      //if source changed, discard it.
+                      if (proxy.getSource() == messageSource)
+                      {
+                         //Add it to the buffer
+                         buffer.addLast(proxy, proxy.getJMSPriority());
 
-                lastDeliveryId = proxy.getDeliveryId();
+                         lastDeliveryId = proxy.getDeliveryId();
+                      
+                         if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+                         messageAdded();
+                      }
+                      else
+                      {
+                         log.debug("Discarding message from old source " + proxy.getSource() + " on to new source " + messageSource);
+                      }
+                   }
+                }
+                else
+                {
+                   //Add it to the buffer
+                   buffer.addLast(proxy, proxy.getJMSPriority());
+
+                   lastDeliveryId = proxy.getDeliveryId();
                 
-                if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+                   if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
 
-                messageAdded();
+                   messageAdded();
+                }
              }
          }
          catch (Exception e)

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -27,6 +27,7 @@
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.client.state.ConnectionState;
@@ -89,6 +90,8 @@
       int maxDeliveries = consumerState.getMaxDeliveries();
       long redeliveryDelay = consumerState.getRedeliveryDelay();
       
+      FailoverCommandCenter fcc = connectionState.getFailoverCommandCenter();
+      
       //We need the queue name for recovering any deliveries after failover
       String queueName = null;
       if (consumerState.getSubscriptionName() != null)
@@ -109,16 +112,18 @@
       
       boolean autoFlowControl = ((Boolean)mi.getArguments()[5]).booleanValue();
       
+      CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+
       ClientConsumer messageHandler =
          new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
                             sessionDelegate, consumerDelegate, consumerID, queueName,
                             prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
                             redeliveryDelay, consumerState.getMaxRetryChangeRate(), 
-                            consumerState.getRetryChangeRateInterval());
+                            consumerState.getRetryChangeRateInterval(),
+                            fcc != null, cm);
       
       sessionState.addCallbackHandler(messageHandler);
       
-      CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
       cm.registerHandler(consumerID, messageHandler);
          
       consumerState.setClientConsumer(messageHandler);

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -234,6 +234,8 @@
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
       
+      boolean result = true;
+      
       synchronized (state)
       {
       
@@ -254,8 +256,9 @@
                throw new IllegalStateException(
                   "CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
             }
-                     
-            state.getClientAckList().add(info);
+            
+            result = state.addToClientAckList(info);
+               
          }
          // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
          // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
@@ -300,12 +303,20 @@
                String sessionId = connectionConsumerDelegate != null ?
                   connectionConsumerDelegate.getID() : state.getSessionID();
                
-               connState.getResourceManager().addAck(txID, sessionId, info);
+               if (info.getSource() != null)
+               {
+                  //from a normal session (non CC).
+                  result = state.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
+               }
+               else
+               {
+                  connState.getResourceManager().addAck(txID, sessionId, info);
+               }
             }        
          }
       }
       
-      return null;
+      return Boolean.valueOf(result);
    }
    
    public Object handlePostDeliver(Invocation invocation) throws Throwable

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -382,7 +382,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void preDeliver(DeliveryInfo deliveryInfo) throws JMSException
+   public boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -97,7 +97,7 @@
 
          try
          {
-            handler.handleMessage(dr);
+            handler.handleMessage(dr, this);
          }
          catch (Exception e)
          {

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -31,6 +31,7 @@
 import java.util.Map;
 import java.util.concurrent.Executors;
 
+import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
@@ -128,6 +129,11 @@
    
    private boolean isCC;
    
+   private Object ackLock = new Object();
+   
+   private Object ackSource;
+   
+   
    // Constructors ---------------------------------------------------------------------------------
    
    public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -172,6 +178,8 @@
       this.setEnableOrderingGroup(enableOrderingGroup);
       
       this.setDefaultOrderingGroupName(defaultOrderingGroupName);
+      
+      this.ackSource = parent.getRemotingConnection().getCallbackManager();
    }
 
    // HierarchicalState implementation -------------------------------------------------------------
@@ -236,6 +244,14 @@
       // from before failover waiting in there and we don't want them to get delivered after
       // failover.
       executor.clearAllExceptCurrentTask();
+
+      //this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
+      //otherwise the message can be added to buffer after buffer cleared and added to acklist.
+      //JBMESSAGING-1878
+      synchronized (ackLock)
+      {
+         ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
+      }
       
       ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
 
@@ -292,7 +308,7 @@
 
       ConnectionState connState = (ConnectionState)getParent();
       ResourceManager rm = connState.getResourceManager();
-
+      
       // We need to failover from one session ID to another in the resource manager
       rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
 
@@ -402,6 +418,23 @@
       return clientAckList;
    }
    
+   public boolean addToClientAckList(DeliveryInfo info)
+   {
+      synchronized (ackLock)
+      {
+         if (ackSource == info.getSource())
+         {
+            clientAckList.add(info);
+            return true;
+         }
+         else
+         {
+            log.debug("Rejecting ack " + info + " from old source: " + info.getSource() + " on new source " + ackSource);
+            return false;
+         }
+      }
+   }
+   
    public void setClientAckList(List list)
    {
       this.clientAckList = list;
@@ -596,5 +629,22 @@
       }
    }
 
+   public boolean addAckToResourceManager(ResourceManager rm, Object txID, String sessId, DeliveryInfo info) throws JMSException
+   {
+      synchronized (ackLock)
+      {
+         if (ackSource == info.getSource())
+         {
+            rm.addAck(txID, sessId, info);
+            return true;
+         }
+         else
+         {
+            log.debug("Rejecting tx ack " + info + " from old source " + info.getSource() + " on new source " + ackSource);
+            return false;
+         }
+      }
+   }
+
 }
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -58,12 +58,15 @@
    //to the connection consumer's session, otherwise it will be null
    private SessionDelegate connectionConsumerSession;
    
+   //mark where the msg is delivered from (a CallbackManager)
+   private Object source;
+   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
    public DeliveryInfo(MessageProxy msg, String consumerId, String queueName,
-                       SessionDelegate connectionConsumerSession, boolean shouldAck)
+                       SessionDelegate connectionConsumerSession, boolean shouldAck, Object source)
    {      
       this.msg = msg;
       
@@ -74,6 +77,8 @@
       this.connectionConsumerSession = connectionConsumerSession;
       
       this.shouldAck = shouldAck;
+      
+      this.source = source;
    }
 
    // Public --------------------------------------------------------
@@ -103,6 +108,11 @@
    	return shouldAck;
    }
    
+   public Object getSource()
+   {
+      return source;
+   }
+   
    public String toString()
    {
       return "Delivery[" + getDeliveryID() + ", " + msg + "]";

Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -64,7 +64,7 @@
 
    TextMessageProxy createTextMessage(String text) throws JMSException;
 
-   void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
+   boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
 
    boolean postDeliver() throws JMSException;
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -95,6 +95,8 @@
       
    
    protected JBossMessage message;
+   
+   private Object source;
 
    // Constructors --------------------------------------------------
 
@@ -506,5 +508,15 @@
       needToCopyHeader = false;      
    }
 
+   public void setSource(Object source)
+   {
+      this.source = source;
+   }
+   
+   public Object getSource()
+   {
+      return this.source;
+   }
+
    // Inner classes -------------------------------------------------   
 }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2011-06-14 02:03:47 UTC (rev 8332)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2011-06-14 02:20:56 UTC (rev 8333)
@@ -643,7 +643,7 @@
 			{
 	         assertRemainingMessages(NUM_MESSAGES - i);
 	         
-				m = consumer.receive(200);
+				m = consumer.receive(2000);
 	         
 	         assertRemainingMessages(NUM_MESSAGES - (i + 1));
 	         
@@ -663,7 +663,7 @@
 	
 			log.trace("Session recover called");
 	
-			m = consumer.receive(200);
+			m = consumer.receive(2000);
 	
 			log.trace("Message is:" + m);
 	
@@ -755,7 +755,7 @@
 	      Message m = null;
 	      for (int i = 0; i < 10; i++)
 	      {
-	         m = consumer.receive(200);
+	         m = consumer.receive(2000);
 	         
 	         assertNotNull(m);
 	          
@@ -771,7 +771,7 @@
 	      
 	      for (int i = 0; i < 9; i++)
 	      {
-	         m = consumer.receive(200);
+	         m = consumer.receive(2000);
 	         
 	         assertNotNull(m);
 	         



More information about the jboss-cvs-commits mailing list