[jboss-cvs] JBoss Messaging SVN: r7917 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Dec 10 02:49:48 EST 2009
Author: gaohoward
Date: 2009-12-10 02:49:47 -0500 (Thu, 10 Dec 2009)
New Revision: 7917
Added:
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java
Modified:
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
Log:
JBMESSAGING-1743
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2009-12-03 15:30:36 UTC (rev 7916)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2009-12-10 07:49:47 UTC (rev 7917)
@@ -21,9 +21,11 @@
*/
package org.jboss.jms.client.container;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.IllegalStateException;
@@ -65,10 +67,10 @@
private boolean trace = log.isTraceEnabled();
// The current state of the object guarded by this interceptor
- private int state = NOT_CLOSED;
+ private volatile int state = NOT_CLOSED;
- // The inuse count
- private int inUseCount;
+ // The inuse count
+ private ThreadValveCounter threadCounter = new ThreadValveCounter();
// The identity of the delegate this interceptor is associated with
private DelegateIdentity id;
@@ -89,7 +91,6 @@
public ClosedInterceptor()
{
state = NOT_CLOSED;
- inUseCount = 0;
id = null;
}
@@ -129,7 +130,7 @@
boolean isClosing = methodName.equals("closing");
boolean isClose = methodName.equals("close");
-
+
if (isClosing)
{
if (checkClosingAlreadyDone())
@@ -146,18 +147,19 @@
}
else
{
- synchronized(this)
+ // object "in use", increment inUseCount
+ if (state == IN_CLOSE || state == CLOSED)
{
- // object "in use", increment inUseCount
- if (state == IN_CLOSE || state == CLOSED)
- {
- log.error(this + ": method " + methodName + "() did not go through, " +
- "the interceptor is " + stateToString(state));
+ log.error(this + ": method " +
+ methodName +
+ "() did not go through, " +
+ "the interceptor is " +
+ stateToString(state));
- throw new IllegalStateException("The object is closed");
- }
- ++inUseCount;
+ throw new IllegalStateException("The object is closed");
}
+
+ threadCounter.pushThread();
}
if (isClosing)
@@ -198,13 +200,13 @@
}
else
{
- done();
+ threadCounter.popThread();
}
}
}
// Protected ------------------------------------------------------
-
+
/**
* Check the closing notification has not already been done
*
@@ -234,16 +236,13 @@
*
* @return true when already closed
*/
- protected synchronized boolean checkCloseAlreadyDone() throws Throwable
+ protected boolean checkCloseAlreadyDone() throws Throwable
{
if (state != CLOSING)
{
return true;
}
- while (inUseCount > 0)
- {
- wait();
- }
+ threadCounter.waitForCompletion();
state = IN_CLOSE;
return false;
}
@@ -258,17 +257,6 @@
}
/**
- * Mark the object as no longer inuse
- */
- protected synchronized void done() throws Throwable
- {
- if (--inUseCount == 0)
- {
- notifyAll();
- }
- }
-
- /**
* Close children and remove from parent
*
* @param invocation the invocation
Added: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java (rev 0)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ThreadValveCounter.java 2009-12-10 07:49:47 UTC (rev 7917)
@@ -0,0 +1,152 @@
+package org.jboss.jms.client.container;
+
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A ThreadValveCounter
+ *
+ * This is a utility class to avoid a calling thread waiting on itself. In cases where an object is accessed concurrently,
+ * closing that object requires first waiting for all the current ongoing calls to be finished, which means all current calling
+ * threads (except the one who is calling close()) returning from their respective method calls on that object. Sometimes
+ * the calling thread who is calling close() can come from one of those who are calling normal methods. In that situation
+ * this calling thread will wait forever as it is waiting itself to return.
+ *
+ * This class keeps a per-thread counter to avoid the above deadlock. Before waiting, the thread can use this class
+ * to exempt itself from the counter, thus eliminating the possibility of hang on itself.
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Dec 9, 2009 4:37:14 PM
+ *
+ *
+ */
+public class ThreadValveCounter
+{
+
+ private ConcurrentHashMap<Long, IntHolder> threadCounter = new ConcurrentHashMap<Long, IntHolder>();
+
+ public void pushThread()
+ {
+ Long tid = Thread.currentThread().getId();
+ IntHolder counter = threadCounter.get(tid);
+ if (counter == null)
+ {
+ counter = new IntHolder();
+ threadCounter.put(tid, counter);
+ }
+ counter.add();
+ }
+
+ public int popThread()
+ {
+ IntHolder counter = threadCounter.get(Thread.currentThread().getId());
+ if (counter != null)
+ {
+ if (counter.release() == 0)
+ {
+ threadCounter.remove(Thread.currentThread().getId());
+ synchronized (threadCounter)
+ {
+ threadCounter.notifyAll();
+ }
+ }
+ }
+ return 0;
+ }
+
+ public void waitForCompletion()
+ {
+ exemptCurrentThread();
+ synchronized (threadCounter)
+ {
+ while (getCallCount() > 0)
+ {
+ try
+ {
+ threadCounter.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
+ }
+ }
+
+ private void exemptCurrentThread()
+ {
+ IntHolder counter = threadCounter.get(Thread.currentThread().getId());
+ if (counter != null)
+ {
+ counter.clear();
+ }
+ synchronized (threadCounter)
+ {
+ threadCounter.notifyAll();
+ }
+ }
+
+ private int getCallCount()
+ {
+ synchronized (threadCounter)
+ {
+ Iterator<IntHolder> iter = threadCounter.values().iterator();
+ int total = 0;
+ while (iter.hasNext())
+ {
+ total += iter.next().get();
+ }
+ return total;
+ }
+ }
+
+ private static class IntHolder
+ {
+ private int counter = 0;
+
+ public void add()
+ {
+ counter++;
+ }
+
+ public int get()
+ {
+ return counter;
+ }
+
+ public int release()
+ {
+ counter--;
+ return counter;
+ }
+
+ public void clear()
+ {
+ counter = 0;
+ }
+ }
+
+}
Added: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java (rev 0)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ThreadValveCounterTest.java 2009-12-10 07:49:47 UTC (rev 7917)
@@ -0,0 +1,235 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import org.jboss.jms.client.container.ThreadValveCounter;
+
+import junit.framework.TestCase;
+
+/**
+ * A ThreadValveCounterTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Dec 9, 2009 5:46:31 PM
+ *
+ */
+public class ThreadValveCounterTest extends TestCase
+{
+
+ public void testHangCondition1()
+ {
+ SimpleValveInterceptor interceptor = new SimpleValveInterceptor();
+ SharedTestObject obj = new SharedTestObject(interceptor);
+ obj.invoke(4);
+ }
+
+ public void testHangCondition2()
+ {
+ SimpleValveInterceptor interceptor = new SimpleValveInterceptor();
+ SharedTestObject obj = new SharedTestObject(interceptor);
+
+ ValveTestThread thr1 = new ValveTestThread(obj, 1);
+ thr1.start();
+
+ ValveTestThread thr2 = new ValveTestThread(obj, 0);
+ thr2.start();
+
+ try
+ {
+ thr1.join();
+ thr2.join();
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ }
+
+ public void testHangCondition3()
+ {
+ SimpleValveInterceptor interceptor = new SimpleValveInterceptor();
+ SharedTestObject obj = new SharedTestObject(interceptor);
+
+ ValveTestThread thr1 = new ValveTestThread(obj, 1);
+ thr1.start();
+
+ ValveTestThread thr2 = new ValveTestThread(obj, 0);
+ thr2.start();
+
+ ValveTestThread thr3 = new ValveTestThread(obj, 4);
+ thr3.start();
+
+ try
+ {
+ thr1.join();
+ thr2.join();
+ thr3.join();
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ }
+
+ public void testMultithreadSimulation()
+ {
+
+ }
+
+ public static class ValveTestThread extends Thread
+ {
+ private SharedTestObject target;
+
+ private int mthd;
+
+ public ValveTestThread(SharedTestObject obj, int m)
+ {
+ target = obj;
+ mthd = m;
+ }
+
+ public void run()
+ {
+ target.invoke(mthd);
+ }
+ }
+
+ public static class SimpleValveInterceptor
+ {
+ private ThreadValveCounter counter = new ThreadValveCounter();
+
+ private volatile boolean closed = false;
+
+ public void invoke(SharedTestObject object, int method)
+ {
+ if (closed)
+ return;
+
+ if (method == SharedTestObject.METHOD_CLOSE)
+ {
+ System.out.println("--------close called, waiting....");
+ waitingForClose();
+ return;
+ }
+
+ counter.pushThread();
+ object.invokeMethod(method);
+ counter.popThread();
+ }
+
+ protected void waitingForClose()
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ counter.waitForCompletion();
+
+ closed = true;
+ }
+ }
+
+ public static class SharedTestObject
+ {
+ public static final int METHOD_CLOSE = 0;
+
+ public static final int METHOD_1 = 1;
+
+ public static final int METHOD_2 = 2;
+
+ public static final int METHOD_3 = 3;
+
+ public static final int METHOD_4 = 4;
+
+ private SimpleValveInterceptor inter;
+
+ public SharedTestObject(SimpleValveInterceptor interceptor)
+ {
+ inter = interceptor;
+ }
+
+ public void invoke(int m)
+ {
+ inter.invoke(this, m);
+ }
+
+ public void invokeMethod(int m)
+ {
+ switch (m)
+ {
+ case METHOD_CLOSE:
+ close();
+ break;
+ case METHOD_1:
+ method1();
+ break;
+ case METHOD_2:
+ method2();
+ break;
+ case METHOD_3:
+ method3();
+ break;
+ case METHOD_4:
+ method4();
+ break;
+ }
+ }
+
+ public void close()
+ {
+
+ }
+
+ public void method1()
+ {
+ System.out.println("------m1------");
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("------m1 returns-----");
+ }
+
+ public void method2()
+ {
+ System.out.println("------m2------");
+ }
+
+ public void method3()
+ {
+ System.out.println("------m3------");
+ }
+
+ public void method4()
+ {
+ System.out.println("------m4------");
+ this.invoke(0);
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list