[jboss-cvs] JBoss Messaging SVN: r8402 - in branches/Branch_1_4/src/main/org/jboss/jms: client/state and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 3 22:33:38 EDT 2011


Author: gaohoward
Date: 2011-08-03 22:33:38 -0400 (Wed, 03 Aug 2011)
New Revision: 8402

Modified:
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
JBMESSAGING-1894


Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-08-02 06:29:01 UTC (rev 8401)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-08-04 02:33:38 UTC (rev 8402)
@@ -31,6 +31,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -737,13 +738,32 @@
        this.consumerID = consumerId;
    }
    
-   public void addToFrontOfBuffer(MessageProxy proxy) throws Exception
+   public void addToFrontOfBuffer(MessageProxy proxy, ReentrantLock failoverLock) throws Exception
    {
-      synchronized (mainLock)
+      boolean sourceUpdated = false;
+      
+      if (failoverLock.isHeldByCurrentThread())
       {
-         //because this is local re-delivery, we update the source to allow to put into the Ack list again.
          proxy.setSource(this.messageSource);
+         sourceUpdated = true;
          
+         //We must unlock here because the dead lock where
+         //someone grabs the mainLock and makes a remote call. If then failover happens,
+         //another thread that is rolling back the tx may get the failoverLock first and then
+         //go to get mainLock. But the failover thread already holds on the mainLock and will
+         //try to get the failoverLock, resulting a dead lock.
+         failoverLock.unlock();
+      }
+      
+      synchronized (mainLock)
+      {
+         if (!sourceUpdated)
+         {
+            if (trace) { log.trace(this + " the source not updated yet, now update: " + this.messageSource); }
+            //because this is local re-delivery, we update the source to allow to put into the Ack list again.
+            proxy.setSource(this.messageSource);
+         }
+         
          buffer.addFirst(proxy, proxy.getJMSPriority());
          
          if (trace) { log.trace(this + " added to the front of buffer " + proxy); }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-08-02 06:29:01 UTC (rev 8401)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-08-04 02:33:38 UTC (rev 8402)
@@ -599,6 +599,7 @@
          MessageProxy proxy = info.getMessageProxy();        
          
          ClientConsumer handler = state.getCallbackHandler(info.getConsumerId());
+         ResourceManager rm = state.getResourceManager();
          
          if (handler == null)
          {
@@ -616,7 +617,7 @@
          {
             if (trace) { log.trace("Adding proxy back to front of buffer"); }
             
-            handler.addToFrontOfBuffer(proxy);
+            handler.addToFrontOfBuffer(proxy, rm.failoverLock);
          }                                    
       }
               

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-02 06:29:01 UTC (rev 8401)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-04 02:33:38 UTC (rev 8402)
@@ -228,6 +228,12 @@
    {
       return asfMessages;
    }
+   
+   public ResourceManager getResourceManager()
+   {
+      ConnectionState connState = (ConnectionState)getParent();
+      return connState.getResourceManager();
+   }
 
    // HierarchicalStateSupport overrides -----------------------------------------------------------
 
@@ -251,7 +257,9 @@
       List ackInfos = Collections.EMPTY_LIST;
       ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
       
-      synchronized(rm.failoverLock)
+      rm.failoverLock.lock();
+      
+      try
       {
       //this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
       //otherwise the message can be added to buffer after buffer cleared and added to acklist.
@@ -384,6 +392,13 @@
          }
       }
       }
+      finally
+      {
+         while (rm.failoverLock.isHeldByCurrentThread())
+         {
+            rm.failoverLock.unlock();
+         }
+      }
 
       List recoveryInfos = new ArrayList();
       if (!ackInfos.isEmpty())

Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java	2011-08-02 06:29:01 UTC (rev 8401)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java	2011-08-04 02:33:38 UTC (rev 8402)
@@ -39,6 +39,7 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.*;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * The ResourceManager manages work done in both local and global (XA) transactions.
@@ -68,7 +69,7 @@
    
    private int serverID;
    
-   public Object failoverLock = new Object();
+   public ReentrantLock failoverLock = new ReentrantLock();
    
    // Static ---------------------------------------------------------------------------------------
    
@@ -239,7 +240,8 @@
    {
       if (trace) { log.trace("rolling back local xid " + xid); }
       
-      synchronized(failoverLock)
+      failoverLock.lock();
+      try
       {
       ClientTransaction ts = removeTxInternal(xid);
       
@@ -250,6 +252,13 @@
       
       this.rollbackLocal(xid, ts);
       }
+      finally
+      {
+         while (failoverLock.isHeldByCurrentThread())
+         {
+            failoverLock.unlock();
+         }
+      }
    }
    
    private void rollbackLocal(Object xid, ClientTransaction ts) throws JMSException
@@ -437,7 +446,9 @@
       if (trace) { log.trace("rolling back xid " + xid); }
       
       ClientTransaction tx = null;
-      synchronized(failoverLock)
+      
+      failoverLock.lock();
+      try
       {
       tx = removeTxInternal(xid);
       
@@ -452,6 +463,13 @@
          this.rollback(xid, tx, connection);
       }
       }
+      finally
+      {
+         while (failoverLock.isHeldByCurrentThread())
+         {
+            failoverLock.unlock();
+         }
+      }
       
       if (tx.getState() == ClientTransaction.TX_PREPARED)
       {
@@ -780,5 +798,4 @@
    }
    
    // Inner Classes --------------------------------------------------------------------------------
-  
 }



More information about the jboss-cvs-commits mailing list