[jboss-cvs] JBoss Messaging SVN: r8328 - in branches/JBMESSAGING_1878/src/main/org/jboss/jms: client/state and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 13 01:50:12 EDT 2011


Author: gaohoward
Date: 2011-06-13 01:50:11 -0400 (Mon, 13 Jun 2011)
New Revision: 8328

Modified:
   branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java
   branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java
Log:
fix tx ack case
and also case 1 in jbm-1878


Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-13 03:34:20 UTC (rev 8327)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-13 05:50:11 UTC (rev 8328)
@@ -214,7 +214,7 @@
       }
       
       DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, messageSource);
+         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
             
       m.incDeliveryCount();
       
@@ -584,7 +584,7 @@
                
                if (!isConnectionConsumer && !ignore)
                {
-                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, messageSource);
+                  final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, m.getSource());
                   
                   if (timeout <= 0)
                   {
@@ -1142,6 +1142,7 @@
                       //if source changed, discard it.
                       if (source == messageSource)
                       {
+                         proxy.setSource(source);
                          //Add it to the buffer
                          buffer.addLast(proxy, proxy.getJMSPriority());
 
@@ -1151,6 +1152,10 @@
 
                          messageAdded();
                       }
+                      else
+                      {
+                         log.debug("Discarding message from old source " + source + " on to new source " + messageSource);
+                      }
                    }
                 }
                 else

Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-13 03:34:20 UTC (rev 8327)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-13 05:50:11 UTC (rev 8328)
@@ -244,6 +244,14 @@
       // from before failover waiting in there and we don't want them to get delivered after
       // failover.
       executor.clearAllExceptCurrentTask();
+
+      //this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
+      //otherwise the message can be added to buffer after buffer cleared and added to acklist.
+      //JBMESSAGING-1878
+      synchronized (ackLock)
+      {
+         ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
+      }
       
       ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
 
@@ -300,12 +308,6 @@
 
       ConnectionState connState = (ConnectionState)getParent();
       ResourceManager rm = connState.getResourceManager();
-
-      //this guard aginst new ack info coming in the list.
-      synchronized (ackLock)
-      {
-         ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
-      }
       
       // We need to failover from one session ID to another in the resource manager
       rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);

Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java	2011-06-13 03:34:20 UTC (rev 8327)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java	2011-06-13 05:50:11 UTC (rev 8328)
@@ -95,6 +95,8 @@
       
    
    protected JBossMessage message;
+   
+   private Object source;
 
    // Constructors --------------------------------------------------
 
@@ -506,5 +508,15 @@
       needToCopyHeader = false;      
    }
 
+   public void setSource(Object source)
+   {
+      this.source = source;
+   }
+   
+   public Object getSource()
+   {
+      return this.source;
+   }
+
    // Inner classes -------------------------------------------------   
 }



More information about the jboss-cvs-commits mailing list