[jboss-cvs] JBoss Messaging SVN: r7917 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Dec 10 02:49:48 EST 2009


Author: gaohoward
Date: 2009-12-10 02:49:47 -0500 (Thu, 10 Dec 2009)
New Revision: 7917

Added:
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java
Modified:
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
Log:
JBMESSAGING-1743


Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2009-12-03 15:30:36 UTC (rev 7916)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2009-12-10 07:49:47 UTC (rev 7917)
@@ -21,9 +21,11 @@
   */
 package org.jboss.jms.client.container;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.IllegalStateException;
 
@@ -65,10 +67,10 @@
    private boolean trace = log.isTraceEnabled();
 
    // The current state of the object guarded by this interceptor
-   private int state = NOT_CLOSED;
+   private volatile int state = NOT_CLOSED;
 
-   // The inuse count
-   private int inUseCount;
+   // The inuse count   
+   private ThreadValveCounter threadCounter = new ThreadValveCounter();
 
    // The identity of the delegate this interceptor is associated with
    private DelegateIdentity id;
@@ -89,7 +91,6 @@
    public ClosedInterceptor()
    {
       state = NOT_CLOSED;
-      inUseCount = 0;
       id = null;
    }
 
@@ -129,7 +130,7 @@
 
       boolean isClosing = methodName.equals("closing");
       boolean isClose = methodName.equals("close");
-
+      
       if (isClosing)
       {
          if (checkClosingAlreadyDone())
@@ -146,18 +147,19 @@
       }
       else
       {
-         synchronized(this)
+         // object "in use", increment inUseCount
+         if (state == IN_CLOSE || state == CLOSED)
          {
-            // object "in use", increment inUseCount
-            if (state == IN_CLOSE || state == CLOSED)
-            {
-               log.error(this + ": method " + methodName + "() did not go through, " +
-                                "the interceptor is " + stateToString(state));
+            log.error(this + ": method " +
+                      methodName +
+                      "() did not go through, " +
+                      "the interceptor is " +
+                      stateToString(state));
 
-               throw new IllegalStateException("The object is closed");
-            }
-            ++inUseCount;
+            throw new IllegalStateException("The object is closed");
          }
+
+         threadCounter.pushThread();
       }
 
       if (isClosing)
@@ -198,13 +200,13 @@
          }
          else
          {
-            done();
+            threadCounter.popThread();
          }
       }
    }
 
    // Protected ------------------------------------------------------
-
+   
    /**
     * Check the closing notification has not already been done
     *
@@ -234,16 +236,13 @@
     *
     * @return true when already closed
     */
-   protected synchronized boolean checkCloseAlreadyDone() throws Throwable
+   protected boolean checkCloseAlreadyDone() throws Throwable
    {
       if (state != CLOSING)
       {
          return true;
       }
-      while (inUseCount > 0)
-      {
-         wait();
-      }
+      threadCounter.waitForCompletion();
       state = IN_CLOSE;
       return false;
    }
@@ -258,17 +257,6 @@
    }
 
    /**
-    * Mark the object as no longer inuse
-    */
-   protected synchronized void done() throws Throwable
-   {
-      if (--inUseCount == 0)
-      {
-         notifyAll();
-      }
-   }
-
-   /**
     * Close children and remove from parent
     *
     * @param invocation the invocation

Added: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java	                        (rev 0)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java	2009-12-10 07:49:47 UTC (rev 7917)
@@ -0,0 +1,152 @@
+package org.jboss.jms.client.container;
+
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A ThreadValveCounter
+ * 
+ * This is a utility class to avoid a calling thread waiting on itself. In cases where an object is accessed concurrently,
+ * closing that object requires first waiting for all the current ongoing calls to be finished, which means all current calling 
+ * threads (except the one who is calling close()) returning from their respective method calls on that object. Sometimes
+ * the calling thread who is calling close() can come from one of those who are calling normal methods. In that situation
+ * this calling thread will wait forever as it is waiting itself to return. 
+ * 
+ * This class keeps a per-thread counter to avoid the above deadlock. Before waiting, the thread can use this class 
+ * to exempt itself from the counter, thus eliminating the possibility of hang on itself.
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Dec 9, 2009 4:37:14 PM
+ *
+ *
+ */
+public class ThreadValveCounter
+{
+
+   private ConcurrentHashMap<Long, IntHolder> threadCounter = new ConcurrentHashMap<Long, IntHolder>();
+
+   public void pushThread()
+   {
+      Long tid = Thread.currentThread().getId();
+      IntHolder counter = threadCounter.get(tid);
+      if (counter == null)
+      {
+         counter = new IntHolder();
+         threadCounter.put(tid, counter);
+      }
+      counter.add();
+   }
+
+   public int popThread()
+   {
+      IntHolder counter = threadCounter.get(Thread.currentThread().getId());
+      if (counter != null)
+      {
+         if (counter.release() == 0)
+         {
+            threadCounter.remove(Thread.currentThread().getId());
+            synchronized (threadCounter)
+            {
+               threadCounter.notifyAll();
+            }
+         }
+      }
+      return 0;
+   }
+
+   public void waitForCompletion()
+   {
+      exemptCurrentThread();
+      synchronized (threadCounter)
+      {
+         while (getCallCount() > 0)
+         {
+            try
+            {
+               threadCounter.wait();
+            }
+            catch (InterruptedException e)
+            {
+               // ignore
+            }
+         }
+      }
+   }
+
+   private void exemptCurrentThread()
+   {
+      IntHolder counter = threadCounter.get(Thread.currentThread().getId());
+      if (counter != null)
+      {
+         counter.clear();
+      }
+      synchronized (threadCounter)
+      {
+         threadCounter.notifyAll();
+      }
+   }
+
+   private int getCallCount()
+   {
+      synchronized (threadCounter)
+      {
+         Iterator<IntHolder> iter = threadCounter.values().iterator();
+         int total = 0;
+         while (iter.hasNext())
+         {
+            total += iter.next().get();
+         }
+         return total;
+      }
+   }
+
+   private static class IntHolder
+   {
+      private int counter = 0;
+
+      public void add()
+      {
+         counter++;
+      }
+
+      public int get()
+      {
+         return counter;
+      }
+
+      public int release()
+      {
+         counter--;
+         return counter;
+      }
+
+      public void clear()
+      {
+         counter = 0;
+      }
+   }
+
+}

Added: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java	                        (rev 0)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java	2009-12-10 07:49:47 UTC (rev 7917)
@@ -0,0 +1,235 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import org.jboss.jms.client.container.ThreadValveCounter;
+
+import junit.framework.TestCase;
+
+/**
+ * A ThreadValveCounterTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Dec 9, 2009 5:46:31 PM
+ *
+ */
+public class ThreadValveCounterTest extends TestCase
+{
+
+   public void testHangCondition1()
+   {
+      SimpleValveInterceptor interceptor = new SimpleValveInterceptor();
+      SharedTestObject obj = new SharedTestObject(interceptor);
+      obj.invoke(4);
+   }
+
+   public void testHangCondition2()
+   {
+      SimpleValveInterceptor interceptor = new SimpleValveInterceptor();
+      SharedTestObject obj = new SharedTestObject(interceptor);
+
+      ValveTestThread thr1 = new ValveTestThread(obj, 1);
+      thr1.start();
+
+      ValveTestThread thr2 = new ValveTestThread(obj, 0);
+      thr2.start();
+
+      try
+      {
+         thr1.join();
+         thr2.join();
+      }
+      catch (InterruptedException e)
+      {
+      }
+
+   }
+
+   public void testHangCondition3()
+   {
+      SimpleValveInterceptor interceptor = new SimpleValveInterceptor();
+      SharedTestObject obj = new SharedTestObject(interceptor);
+
+      ValveTestThread thr1 = new ValveTestThread(obj, 1);
+      thr1.start();
+
+      ValveTestThread thr2 = new ValveTestThread(obj, 0);
+      thr2.start();
+
+      ValveTestThread thr3 = new ValveTestThread(obj, 4);
+      thr3.start();
+
+      try
+      {
+         thr1.join();
+         thr2.join();
+         thr3.join();
+      }
+      catch (InterruptedException e)
+      {
+      }
+
+   }
+
+   public void testMultithreadSimulation()
+   {
+
+   }
+
+   public static class ValveTestThread extends Thread
+   {
+      private SharedTestObject target;
+
+      private int mthd;
+
+      public ValveTestThread(SharedTestObject obj, int m)
+      {
+         target = obj;
+         mthd = m;
+      }
+
+      public void run()
+      {
+         target.invoke(mthd);
+      }
+   }
+
+   public static class SimpleValveInterceptor
+   {
+      private ThreadValveCounter counter = new ThreadValveCounter();
+
+      private volatile boolean closed = false;
+
+      public void invoke(SharedTestObject object, int method)
+      {
+         if (closed)
+            return;
+
+         if (method == SharedTestObject.METHOD_CLOSE)
+         {
+            System.out.println("--------close called, waiting....");
+            waitingForClose();
+            return;
+         }
+
+         counter.pushThread();
+         object.invokeMethod(method);
+         counter.popThread();
+      }
+
+      protected void waitingForClose()
+      {
+         if (closed)
+         {
+            return;
+         }
+
+         counter.waitForCompletion();
+
+         closed = true;
+      }
+   }
+
+   public static class SharedTestObject
+   {
+      public static final int METHOD_CLOSE = 0;
+
+      public static final int METHOD_1 = 1;
+
+      public static final int METHOD_2 = 2;
+
+      public static final int METHOD_3 = 3;
+
+      public static final int METHOD_4 = 4;
+
+      private SimpleValveInterceptor inter;
+
+      public SharedTestObject(SimpleValveInterceptor interceptor)
+      {
+         inter = interceptor;
+      }
+
+      public void invoke(int m)
+      {
+         inter.invoke(this, m);
+      }
+
+      public void invokeMethod(int m)
+      {
+         switch (m)
+         {
+            case METHOD_CLOSE:
+               close();
+               break;
+            case METHOD_1:
+               method1();
+               break;
+            case METHOD_2:
+               method2();
+               break;
+            case METHOD_3:
+               method3();
+               break;
+            case METHOD_4:
+               method4();
+               break;
+         }
+      }
+
+      public void close()
+      {
+
+      }
+
+      public void method1()
+      {
+         System.out.println("------m1------");
+         try
+         {
+            Thread.sleep(5000);
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();
+         }
+         System.out.println("------m1 returns-----");
+      }
+
+      public void method2()
+      {
+         System.out.println("------m2------");
+      }
+
+      public void method3()
+      {
+         System.out.println("------m3------");
+      }
+
+      public void method4()
+      {
+         System.out.println("------m4------");
+         this.invoke(0);
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list