[jboss-cvs] JBoss Messaging SVN: r8416 - in branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms: client/state and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 12 16:16:21 EDT 2011


Author: raggz
Date: 2011-08-12 16:16:21 -0400 (Fri, 12 Aug 2011)
New Revision: 8416

Modified:
   branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
JBPAPP-7009


Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-08-12 20:09:34 UTC (rev 8415)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-08-12 20:16:21 UTC (rev 8416)
@@ -31,6 +31,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -146,7 +147,7 @@
       }
       
       DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, null);
+         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
             
       m.incDeliveryCount();
       
@@ -157,7 +158,11 @@
          //We need to call preDeliver, deliver the message then call postDeliver - this is because
          //it is legal to call session.recover(), or session.rollback() from within the onMessage()
          //method in which case the last message needs to be delivered so it needs to know about it
-         sess.preDeliver(deliveryInfo);
+         if (!sess.preDeliver(deliveryInfo))
+         {
+            if (trace) { log.trace("reference " + deliveryInfo + " rejected, don't call onMessage"); }
+            return;
+         }
       } 
       
       try
@@ -706,7 +711,7 @@
             receiverThread = null;            
          }
       } 
-           
+      
       if (trace) { log.trace(this + " receive() returning " + m); }
       
       return m;
@@ -732,17 +737,37 @@
        this.consumerID = consumerId;
    }
    
-   public void addToFrontOfBuffer(MessageProxy proxy) throws Exception
+   public void addToFrontOfBuffer(MessageProxy proxy, ReentrantLock failoverLock) throws Exception
    {
-      synchronized (mainLock)
+      boolean sourceUpdated = false;
+      
+      if (failoverLock.isHeldByCurrentThread())
       {
-         //because this is local re-delivery, we update the source to allow to put into the Ack list again.
          proxy.setSource(this.messageSource);
+         sourceUpdated = true;
          
+         //We must unlock here because the dead lock where
+         //someone grabs the mainLock and makes a remote call. If then failover happens,
+         //another thread that is rolling back the tx may get the failoverLock first and then
+         //go to get mainLock. But the failover thread already holds on the mainLock and will
+         //try to get the failoverLock, resulting a dead lock.
+         failoverLock.unlock();
+      }
+      
+      synchronized (mainLock)
+      {
+         if (!sourceUpdated)
+         {
+            if (trace) { log.trace(this + " the source not updated yet, now update: " + this.messageSource); }
+            //because this is local re-delivery, we update the source to allow to put into the Ack list again.
+            proxy.setSource(this.messageSource);
+         }
+         
          buffer.addFirst(proxy, proxy.getJMSPriority());
          
+         if (trace) { log.trace(this + " added to the front of buffer " + proxy); }
+         
          consumeCount--;
-         
          messageAdded();
       }
    }

Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-08-12 20:09:34 UTC (rev 8415)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-08-12 20:16:21 UTC (rev 8416)
@@ -303,19 +303,20 @@
                String sessionId = connectionConsumerDelegate != null ?
                   connectionConsumerDelegate.getID() : state.getSessionID();
                
-               if (info.getSource() != null)
+               if (connectionConsumerDelegate == null)
                {
                   //from a normal session (non CC).
                   result = state.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
                }
                else
                {
-                  connState.getResourceManager().addAck(txID, sessionId, info);
+                  SessionState ccState = (SessionState)connectionConsumerDelegate.getState();
+                  result = ccState.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
                }
             }        
          }
       }
-      
+
       return Boolean.valueOf(result);
    }
    
@@ -598,6 +599,7 @@
          MessageProxy proxy = info.getMessageProxy();        
          
          ClientConsumer handler = state.getCallbackHandler(info.getConsumerId());
+         ResourceManager rm = state.getResourceManager();
          
          if (handler == null)
          {
@@ -615,7 +617,7 @@
          {
             if (trace) { log.trace("Adding proxy back to front of buffer"); }
             
-            handler.addToFrontOfBuffer(proxy);
+            handler.addToFrontOfBuffer(proxy, rm.failoverLock);
          }                                    
       }
               

Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-12 20:09:34 UTC (rev 8415)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-12 20:16:21 UTC (rev 8416)
@@ -228,6 +228,12 @@
    {
       return asfMessages;
    }
+   
+   public ResourceManager getResourceManager()
+   {
+      ConnectionState connState = (ConnectionState)getParent();
+      return connState.getResourceManager();
+   }
 
    // HierarchicalStateSupport overrides -----------------------------------------------------------
 
@@ -245,6 +251,16 @@
       // failover.
       executor.clearAllExceptCurrentTask();
 
+      ConnectionState connState = (ConnectionState)getParent();
+      ResourceManager rm = connState.getResourceManager();
+
+      List ackInfos = Collections.EMPTY_LIST;
+      ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
+      
+      rm.failoverLock.lock();
+      
+      try
+      {
       //this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
       //otherwise the message can be added to buffer after buffer cleared and added to acklist.
       //JBMESSAGING-1878
@@ -252,8 +268,6 @@
       {
          ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
       }
-      
-      ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
 
       for (Iterator i = getChildren().iterator(); i.hasNext(); )
       {
@@ -305,14 +319,9 @@
             log.trace(this + " synchronized failover browser " + browserDelegate);
          }
       }
-
-      ConnectionState connState = (ConnectionState)getParent();
-      ResourceManager rm = connState.getResourceManager();
       
       // We need to failover from one session ID to another in the resource manager
       rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
-      
-      List ackInfos = Collections.EMPTY_LIST;
 
       if (isCC)
       {
@@ -382,6 +391,14 @@
             ackInfos = rm.getDeliveriesForSession(getSessionID());
          }
       }
+      }
+      finally
+      {
+         while (rm.failoverLock.isHeldByCurrentThread())
+         {
+            rm.failoverLock.unlock();
+         }
+      }
 
       List recoveryInfos = new ArrayList();
       if (!ackInfos.isEmpty())
@@ -399,14 +416,14 @@
       }
       
       log.trace(this + " sending delivery recovery " + recoveryInfos + " on failover");
-      
+
       //Note we only recover sessions that are transacted or client ack
       if (transacted || xa || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
       {
          //Note! We ALWAYS call recoverDeliveries even if there are no deliveries since it also does other stuff
-         //like remove from recovery Area refs corresponding to messages in client consumer buffers
+         //like remove from recovery Area refs corresponding to messages in client consumer buffers         
          
-      	newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
+         newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
       }
    }
    

Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/tx/ResourceManager.java	2011-08-12 20:09:34 UTC (rev 8415)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/tx/ResourceManager.java	2011-08-12 20:16:21 UTC (rev 8416)
@@ -39,6 +39,7 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.*;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * The ResourceManager manages work done in both local and global (XA) transactions.
@@ -65,11 +66,11 @@
    private boolean trace = log.isTraceEnabled();
    
    private ConcurrentHashMap transactions = new ConcurrentHashMap();
-
-   private ConcurrentHashMap convertedIds = new ConcurrentHashMap();
    
    private int serverID;
    
+   public ReentrantLock failoverLock = new ReentrantLock();
+   
    // Static ---------------------------------------------------------------------------------------
    
    private static final Logger log = Logger.getLogger(ResourceManager.class);
@@ -152,7 +153,7 @@
    public List getDeliveriesForSession(String sessionID)
    {
       List ackInfos = new ArrayList();
-           
+
       for (Iterator i = transactions.values().iterator(); i.hasNext(); )
       {
          ClientTransaction tx = (ClientTransaction)i.next();
@@ -161,7 +162,7 @@
          
          ackInfos.addAll(acks);
       }
-      
+
       return ackInfos;
    }
    
@@ -239,6 +240,9 @@
    {
       if (trace) { log.trace("rolling back local xid " + xid); }
       
+      failoverLock.lock();
+      try
+      {
       ClientTransaction ts = removeTxInternal(xid);
       
       if (ts == null)
@@ -247,6 +251,14 @@
       }
       
       this.rollbackLocal(xid, ts);
+      }
+      finally
+      {
+         while (failoverLock.isHeldByCurrentThread())
+         {
+            failoverLock.unlock();
+         }
+      }
    }
    
    private void rollbackLocal(Object xid, ClientTransaction ts) throws JMSException
@@ -373,8 +385,8 @@
       ClientTransaction tx = removeTxInternal(xid);
       
       if (tx != null && trace) 
-      { 
-	log.trace("got tx: " + tx + " state " + tx.getState()); 
+      {
+         log.trace("got tx: " + tx + " state " + tx.getState()); 
       }
       
       //roll back for onePhase only. for 2pc, rollback only is processed in prepare
@@ -433,14 +445,36 @@
    {
       if (trace) { log.trace("rolling back xid " + xid); }
       
-      ClientTransaction tx = removeTxInternal(xid);
+      ClientTransaction tx = null;
       
+      failoverLock.lock();
+      try
+      {
+      tx = removeTxInternal(xid);
+      
       if (tx == null)
       {
          throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
       }
       
-      this.rollback(xid, tx, connection);
+      //only synchronize local one-phase rollback
+      if (tx.getState() != ClientTransaction.TX_PREPARED)
+      {
+         this.rollback(xid, tx, connection);
+      }
+      }
+      finally
+      {
+         while (failoverLock.isHeldByCurrentThread())
+         {
+            failoverLock.unlock();
+         }
+      }
+      
+      if (tx.getState() == ClientTransaction.TX_PREPARED)
+      {
+         this.rollback(xid, tx,connection);
+      }
    }
    
    private void rollback(Xid xid, ClientTransaction tx, ConnectionDelegate connection) throws XAException
@@ -764,5 +798,4 @@
    }
    
    // Inner Classes --------------------------------------------------------------------------------
-  
 }



More information about the jboss-cvs-commits mailing list