[jboss-cvs] JBoss Messaging SVN: r1898 - in branches/Branch_Cleberts_Third_Failover: src/etc src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 5 12:20:50 EST 2007
Author: clebert.suconic at jboss.com
Date: 2007-01-05 12:20:38 -0500 (Fri, 05 Jan 2007)
New Revision: 1898
Added:
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java
Removed:
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java
branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java
Modified:
branches/Branch_Cleberts_Third_Failover/src/etc/aop-messaging-client.xml
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ClusteringAspect.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverAspect.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/ConnectionState.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalState.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/BrowserDelegate.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ProducerDelegate.java
branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/SessionDelegate.java
branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
Changing code to use a single Valve
Modified: branches/Branch_Cleberts_Third_Failover/src/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/etc/aop-messaging-client.xml 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/etc/aop-messaging-client.xml 2007-01-05 17:20:38 UTC (rev 1898)
@@ -6,7 +6,7 @@
<aop>
<interceptor class="org.jboss.jms.client.container.ExceptionInterceptor" scope="PER_VM"/>
<interceptor class="org.jboss.jms.client.container.ClientLogInterceptor" scope="PER_VM"/>
- <interceptor class="org.jboss.jms.client.container.ClosedInterceptor" scope="PER_INSTANCE"/>
+ <interceptor class="org.jboss.jms.client.container.ClosedInterceptor" scope="PER_INSTANCE"/>
<interceptor class="org.jboss.jms.client.container.ConcurrencyInterceptor" scope="PER_INSTANCE"/>
<interceptor class="org.jboss.jms.client.container.FailoverValveInterceptor" scope="PER_INSTANCE"/>
@@ -40,7 +40,7 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->$implementing{org.jboss.jms.delegate.ConnectionFactoryDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
- </bind>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->createConnectionDelegate(..))">
<advice name="handleCreateConnectionDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
</bind>
@@ -50,15 +50,15 @@
-->
<!-- It is important that FailoverAspect intercepts performFailover() before FailoverValveInterceptor -->
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->performFailover())">
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->performFailover(..))">
<advice name="handlePerformFailover" aspect="org.jboss.jms.client.container.FailoverAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.delegate.ConnectionDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
- </bind>
+ <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->start())">
<advice name="handleStart" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
</bind>
@@ -85,7 +85,7 @@
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->createSessionDelegate(..))">
<advice name="handleCreateSessionDelegate" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
- </bind>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->close())">
<advice name="handleClose" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
</bind>
@@ -105,9 +105,9 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.delegate.SessionDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createMessage())">
<advice name="handleCreateMessage" aspect="org.jboss.jms.client.container.FactoryAspect"/>
@@ -135,10 +135,10 @@
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->addAsfMessage(..))">
<advice name="handleAddAsfMessage" aspect="org.jboss.jms.client.container.AsfAspect"/>
- </bind>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->run())">
<advice name="handleRun" aspect="org.jboss.jms.client.container.AsfAspect"/>
- </bind>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->getAcknowledgeMode())">
<advice name="handleGetAcknowledgeMode" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -159,16 +159,16 @@
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->recover())">
<advice name="handleRecover" aspect="org.jboss.jms.client.container.SessionAspect"/>
- </bind>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->redeliver(..))">
<advice name="handleRedeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing())">
- <advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ <advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
- <advice name="handleClose" aspect="org.jboss.jms.client.container.SessionAspect"/>
- </bind>
+ <advice name="handleClose" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->commit())">
<advice name="handleCommit" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -179,8 +179,8 @@
<advice name="handleSend" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createConsumerDelegate(..))">
- <advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
- </bind>
+ <advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->failOverConsumer(..))">
<advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
@@ -196,111 +196,111 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createBrowserDelegate(..))">
<advice name="handleCreateBrowserDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
</bind>
-
+
<!--
Consumer Stack
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->$implementing{org.jboss.jms.delegate.ConsumerDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
- </bind>
+ <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageListener())">
- <advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ <advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->setMessageListener(..))">
- <advice name="handleSetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
- </bind>
+ <advice name="handleSetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receive(..))">
- <advice name="handleReceive" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
- </bind>
+ <advice name="handleReceive" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receiveNoWait())">
- <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
- </bind>
+ <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing())">
- <advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ <advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getDestination())">
- <advice name="handleGetDestination" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ <advice name="handleGetDestination" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getNoLocal())">
- <advice name="handleGetNoLocal" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ <advice name="handleGetNoLocal" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageSelector())">
- <advice name="handleGetMessageSelector" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
- </bind>
+ <advice name="handleGetMessageSelector" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
+ </bind>
-
+
<!--
Producer Stack
- -->
+ -->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->$implementing{org.jboss.jms.delegate.ProducerDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
- </bind>
+ <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->getDeliveryMode())">
- <advice name="handleGetDeliveryMode" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleGetDeliveryMode" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->getDestination())">
- <advice name="handleGetDestination" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleGetDestination" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->getDisableMessageID())">
- <advice name="handleGetDisableMessageID" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleGetDisableMessageID" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->getDisableMessageTimestamp())">
- <advice name="handleGetDisableMessageTimestamp" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleGetDisableMessageTimestamp" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->getPriority())">
- <advice name="handleGetPriority" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleGetPriority" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->getTimeToLive())">
- <advice name="handleGetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleGetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->send(org.jboss.jms.destination.JBossDestination, javax.jms.Message, int, int, long))">
- <advice name="handleSend" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleSend" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setDeliveryMode(..))">
- <advice name="handleSetDeliveryMode" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleSetDeliveryMode" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setDestination(..))">
- <advice name="handleSetDestination" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ <advice name="handleSetDestination" aspect="org.jboss.jms.client.container.ProducerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setDisableMessageID(..))">
- <advice name="handleSetDisableMessageID" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleSetDisableMessageID" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setDisableMessageTimestamp(..))">
- <advice name="handleSetDisableMessageTimestamp" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleSetDisableMessageTimestamp" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setPriority(..))">
- <advice name="handleSetPriority" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleSetPriority" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setTimeToLive(..))">
- <advice name="handleSetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ <advice name="handleSetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing())">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.ProducerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->close())">
<advice name="handleClose" aspect="org.jboss.jms.client.container.ProducerAspect"/>
- </bind>
+ </bind>
<!--
Browser Stack
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->$implementing{org.jboss.jms.delegate.BrowserDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
- </bind>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->nextMessage())">
<advice name="handleNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
<advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
- </bind>
+ </bind>
</aop>
\ No newline at end of file
Deleted: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -1,38 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.jms.client;
-
-/**
- * An interface that controls the permeability of implementing objects.
- *
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public interface Valve
-{
- /**
- * "Closes" the valve. The implementing object becomes impermeable. Did't go for "close()" to
- * avoid name conflicts with Closeable.close().
- */
- void closeValve() throws Exception;
-
- /**
- * "Opens" the valve. The implementing object becomes permeable. Didn't go for "open()" for
- * symmetry reasons.
- */
- void openValve() throws Exception;
-
- boolean isValveOpen();
-
- /**
- * @return the number of threads currenty "penetrating" the open valve. Shold be 0 if the valve
- * is closed.
- */
- int getActiveThreadsCount();
-
-}
Added: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/Valve.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -0,0 +1,246 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Stack;
+import org.jboss.logging.Logger;
+
+/**
+ * The idea is to hold any call when the Valve is closed.
+ * You call enter when you perform a regular call, and leave on a finally block.
+ * You call close when you are performing any major event such as fail over, and open it on a finally block also.
+ *
+ * There are controls to avoid dead locks between multiple threads closing the valve at the same time, so this
+ * thread will be using referencing counting on a threadLocal variable. So, it's very important to aways leave the valve
+ * in a finally block.
+ *
+ * This class also will show tracing information, case the Valve can't be closed, but only if trace is enabled on log4j.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class Valve
+{
+ // Constants
+ private static final Logger log = Logger.getLogger(Valve.class);
+
+ // This is not meant to run in production, but it would give us important information in case of dead locks
+ // on the valve
+ private static final boolean DEBUG_VALVE = log.isTraceEnabled();
+
+ // Attributes
+
+ // Information to avoid dead locks when multiple threads are closing the valve
+ ThreadLocal counterLocal = new ThreadLocal();
+ ReadWriteLock lock;
+
+ // Attributes used on debug operations only
+
+ private ThreadLocal stackCloses;
+ private ThreadLocal stackEnters;
+
+ private Map debugCloses;
+ private Map debugEnters;
+
+ public Valve()
+ {
+ // I'm using ReentrantLocks because on the FailoverCase, we will need readLock after writeLock was acquired
+ // on FailoverMethod, since Failover will also perform calls on the server.
+
+ // There are also a case where a readLock will be promoted to writeLock when a failover occurs, using
+ // Reentrant locks will make this usage transparent for the api, we just close the valve and readLock is
+ // promoted to writeLock.
+ lock = new ReentrantWriterPreferenceReadWriteLock();
+
+ if (DEBUG_VALVE)
+ {
+ stackCloses = new ThreadLocal();
+ stackEnters = new ThreadLocal();
+ debugCloses = new ConcurrentHashMap();
+ debugEnters = new ConcurrentHashMap();
+ }
+ }
+
+ public void enter() throws InterruptedException
+ {
+ //log.info("Entering Valve on Thread " + Thread.currentThread().getName());
+ lock.readLock().acquire();
+ getCounter().counter++;
+ if (DEBUG_VALVE)
+ {
+ Exception ex = new Exception();
+ getStackEnters().push(ex);
+ debugEnters.put(ex, Thread.currentThread());
+ }
+ }
+
+ public void leave() throws InterruptedException
+ {
+ //log.info("Leaving Valve on Thread " + Thread.currentThread().getName());
+ lock.readLock().release();
+ getCounter().counter--;
+ if (DEBUG_VALVE)
+ {
+ Exception ex = (Exception) getStackEnters().pop();
+ debugEnters.remove(ex);
+ }
+ }
+
+ public void close() throws InterruptedException
+ {
+ //log.info("Closing Valve on Thread " + Thread.currentThread().getName());
+
+ // Before assuming a write lock, we need to release reentrant read locks.
+ // When simultaneous threads are closing a valve (as simultaneous threads are capturing a failure)
+ // we won't be able to close the valve until all the readLocks are released.
+ // this release routine will be able to resolve the deadLock while we still guarantee the unicity of the lock
+ // The useCase for this is when a failure is captured when a thread is already holding a read-lock.
+ // For example if a failure happens when sending ACKs, the valve will be already hold on
+ // receiveMessage, while the sendACK will be trying to close the Valve.
+ // This wouldn't be a problem if we had only single threads
+ // but the problem is we will be waiting on a readLock on another thread that might also be waiting
+ // to close the valve as fail event will be captured by multiple threads
+ //
+ // So, in summary we need to completely leave the valve before closing it or a dead lock will happen
+ // if multiple threads are closing the valve at same time waiting on each others readLocks
+ // before acquiring a writeLock
+ int counter = getCounter().counter;
+ for (int i = 0; i < counter; i++)
+ {
+ lock.readLock().release();
+ }
+
+ boolean acquired = false;
+ do
+ {
+ acquired = lock.writeLock().attempt(5000);
+ if (!acquired)
+ {
+ log.warn("Couldn't close valve, trying again", new Exception());
+ if (DEBUG_VALVE)
+ {
+ log.trace(debugValve());
+ }
+ }
+ } while (!acquired);
+
+ if (DEBUG_VALVE)
+ {
+ Exception ex = new Exception();
+ getStackCloses().push(ex);
+ debugCloses.put(ex, Thread.currentThread());
+ }
+ }
+
+ public void open() throws InterruptedException
+ {
+ //log.info("Opening Valve on Thread " + Thread.currentThread().getName());
+ lock.writeLock().release();
+
+ // re-apply the locks as we had before closing the valve
+ int counter = getCounter().counter;
+ for (int i = 0; i < counter; i++)
+ {
+ lock.readLock().acquire();
+ }
+
+
+ if (DEBUG_VALVE)
+ {
+ Exception ex = (Exception) getStackCloses().pop();
+ debugCloses.remove(ex);
+ }
+ }
+
+ // Private Methods
+
+
+ /** This method will show the threads that are currently holding locks (enters or closes) */
+ private synchronized String debugValve()
+ {
+ StringWriter buffer = new StringWriter();
+ PrintWriter writer = new PrintWriter(buffer);
+
+ writer.println("******************************* Debug Valve Information *************************");
+ writer.println("Close owners");
+
+ // Close should never have more than 1 thread owning, but as this is a debug report
+ // we will consider that as a possibility just to show eventual bugs
+ // (just in case thie class is ever changed)
+ for (Iterator iter = debugCloses.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ writer.println("Thread that own a close =" + entry.getValue());
+ writer.println("StackTrace:");
+ Exception e = (Exception) entry.getKey();
+ e.printStackTrace(writer);
+ }
+
+
+ writer.println("Valve owners");
+ for (Iterator iter = debugEnters.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ writer.println("Thread that own valve =" + entry.getValue());
+ writer.println("StackTrace:");
+ Exception e = (Exception) entry.getKey();
+ e.printStackTrace(writer);
+ }
+
+ return buffer.toString();
+ }
+
+
+ /**
+ * Counter of times this thread entered the valve
+ */
+ private Counter getCounter()
+ {
+ if (counterLocal.get() == null)
+ {
+ counterLocal.set(new Counter());
+ }
+
+ return (Counter) counterLocal.get();
+ }
+
+ private Stack getStackCloses()
+ {
+ if (stackCloses.get() == null)
+ {
+ stackCloses.set(new Stack());
+ }
+
+ return (Stack) stackCloses.get();
+ }
+
+ private Stack getStackEnters()
+ {
+ if (stackEnters.get() == null)
+ {
+ stackEnters.set(new Stack());
+ }
+ return (Stack) stackEnters.get();
+ }
+
+ // Inner classes
+
+ // Used to count the number of readLocks (or enters) owned by this thread
+
+ private static class Counter
+ {
+ int counter;
+ }
+}
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -14,6 +14,7 @@
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.Valve;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
import javax.jms.JMSException;
@@ -137,6 +138,7 @@
// also add a reference to the clustered ConnectionFactory delegate, useful in case
// FailoverAspect needs to create a new connection instead of a failed on
state.setClusteredConnectionFactoryDeleage(clusteredDelegate);
+ state.setValve(new Valve());
return new CreateConnectionResult(cd);
}
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -11,6 +11,7 @@
import org.jboss.logging.Logger;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
/**
@@ -34,11 +35,15 @@
private ClientConnectionDelegate cd;
- // Constructors ---------------------------------------------------------------------------------
+ // We use this to avoid calling failover on an already failed RemotingConnection
+ private JMSRemotingConnection remotingConnection;
+ // Constructors ---------------------------------------------------------------------------------
+
ConnectionFailureListener(ClientConnectionDelegate cd)
{
this.cd = cd;
+ remotingConnection = ((ConnectionState)cd.getState()).getRemotingConnection();
}
// ConnectionListener implementation ------------------------------------------------------------
@@ -55,7 +60,7 @@
log.debug(this + " initiating client-side failover");
- cd.performFailover();
+ cd.performFailover(remotingConnection);
}
catch (Throwable e)
{
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverAspect.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverAspect.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverAspect.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -7,9 +7,11 @@
package org.jboss.jms.client.container;
import org.jboss.aop.joinpoint.Invocation;
+import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.logging.Logger;
@@ -41,7 +43,9 @@
if(target instanceof ClientConnectionDelegate)
{
- performConnectionFailover((ClientConnectionDelegate)target);
+ MethodInvocation mi = (MethodInvocation) invocation;
+ JMSRemotingConnection remotingConnection = (JMSRemotingConnection)mi.getArguments()[0];
+ performConnectionFailover((ClientConnectionDelegate)target, remotingConnection);
}
return null;
@@ -53,7 +57,7 @@
// Private --------------------------------------------------------------------------------------
- private void performConnectionFailover(ClientConnectionDelegate delegate) throws Exception
+ private void performConnectionFailover(ClientConnectionDelegate delegate, JMSRemotingConnection remotingConnection) throws Exception
{
ConnectionState state = (ConnectionState)delegate.getState();
@@ -62,14 +66,21 @@
try
{
- // block any other invocations ariving while we're doing failover, on this delegate and
+ // block any other invocations arriving while we're doing failover, on this delegate and
// recursively down the hierarchy
- // WARNING - this may block if there are active invocations through valves!
- delegate.closeValve();
-
log.debug("starting client-side failover");
+ state.getValve().close();
+
+ if (remotingConnection.isFailed())
+ {
+ log.info("Ignoring failovercall into delegate=" + delegate + " as failover was already performed on this connection");
+ return;
+ }
+
+ remotingConnection.setFailed(true);
+
// generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
// to insure the client-side stack is in a deterministic state
state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, delegate));
@@ -109,9 +120,7 @@
state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED,
delegate));
}
-
- // failover done, open valves
- delegate.openValve();
+ state.getValve().open();
}
}
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -12,12 +12,18 @@
import org.jboss.logging.Logger;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.HierarchicalState;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.Valve;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.delegate.ConnectionDelegate;
+import org.jboss.remoting.CannotConnectException;
import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
import EDU.oswego.cs.dl.util.concurrent.Sync;
import java.util.List;
import java.util.ArrayList;
+import java.io.IOException;
/**
* An interceptor that acts as a failover valve: it allows all invocations to go through as long
@@ -48,9 +54,10 @@
private DelegateSupport delegate;
private HierarchicalState state;
+ private ConnectionState connectionState;
private volatile boolean valveOpen;
- private ReadWriteLock rwLock;
+ private Valve valve;
// the number of threads currently "penetrating" the open valve
private int activeThreadsCount;
@@ -64,7 +71,6 @@
{
valveOpen = true;
activeThreadsCount = 0;
- rwLock = new WriterPreferenceReadWriteLock();
if (trace)
{
activeMethods = new ArrayList();
@@ -86,126 +92,57 @@
{
delegate = (DelegateSupport)invocation.getTargetObject();
state = delegate.getState();
- }
- String methodName = ((MethodInvocation)invocation).getMethod().getName();
- Sync writeLock = rwLock.writeLock();
- Sync readLock = rwLock.readLock();
-
- if("closeValve".equals(methodName))
- {
- if (!valveOpen)
+ // we need to navigate until we get to ConnectionState as we need to extract the valve instance from there
+ HierarchicalState hierarchical = state;
+ while (hierarchical!=null && !(hierarchical instanceof ConnectionState))
{
- // valve already closed, this is a noop
- log.warn(this + " already closed!");
- return null;
+ hierarchical = hierarchical.getParent();
}
-
- state.closeChildrensValves();
-
- boolean acquired = false;
-
- while(!acquired)
- {
- try
- {
- acquired = writeLock.attempt(500);
- }
- catch(InterruptedException e)
- {
- // OK
- }
-
- if (!acquired)
- {
- log.debug(this + " failed to close");
- }
- }
-
- valveOpen = false;
-
- log.debug(this + " has been closed");
-
- return null;
+ connectionState = (ConnectionState)hierarchical;
+ valve = ((ConnectionState)hierarchical).getValve();
}
- else if("openValve".equals(methodName))
+
+ // No Valve.. then just ignore this interceptor and keep going...
+ if (valve == null)
{
- if (valveOpen)
- {
- // valve already open, this is a noop
- log.warn(this + " already open!");
- return null;
- }
+ return invocation.invokeNext();
+ }
- state.openChildrensValves();
- writeLock.release();
- valveOpen = true;
- log.debug(this + " has been opened");
+ String methodName = ((MethodInvocation)invocation).getMethod().getName();
- return null;
- }
- else if("isValveOpen".equals(methodName))
+ JMSRemotingConnection remotingConnection = null;
+ try
{
- if (valveOpen)
- {
- return Boolean.TRUE;
- }
- else
- {
- return Boolean.FALSE;
- }
+ valve.enter();
+ // it's important to only retrieve the remotingConnection while inside the Valve, as we guarantee
+ // that no failover has happened yet
+ remotingConnection = connectionState.getRemotingConnection();
+ return invocation.invokeNext();
}
- else if("getActiveThreadsCount".equals(methodName))
+ catch (CannotConnectException e)
{
- return new Integer(activeThreadsCount);
- }
+ log.warn("We got a CannotConnectionException and we are trying a failover", e);
+ ((ConnectionDelegate)connectionState.getDelegate()).performFailover(remotingConnection);
+ return invocation.invokeNext();
- // attempt to grab the reader's lock and go forward
-
- try
+ }
+ catch (IOException e)
{
- boolean acquired = false;
-
- while(!acquired)
- {
- try
- {
- acquired = readLock.attempt(500);
- }
- catch(InterruptedException e)
- {
- // OK
- }
-
- if (trace && !acquired ) { log.trace(methodName + "() trying to pass through " + this); }
- }
-
- synchronized(this)
- {
- activeThreadsCount++;
- if (trace)
- {
- activeMethods.add(methodName);
- }
- }
-
- if(trace) { log.trace(this + " has let " + methodName + "() pass through"); }
-
+ log.warn("We got an IOException and we are trying a failover", e);
+ ((ConnectionDelegate)connectionState.getDelegate()).performFailover(remotingConnection);
return invocation.invokeNext();
}
+ catch (Throwable e)
+ {
+ // just rethrow
+ throw e;
+ }
finally
{
- readLock.release();
- synchronized(this)
- {
- activeThreadsCount--;
- if (trace)
- {
- activeMethods.remove(methodName);
- }
- }
+ valve.leave();
}
}
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -61,7 +61,7 @@
private int serverID;
private transient JMSRemotingConnection remotingConnection;
private Version versionToUse;
-
+
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
@@ -271,47 +271,11 @@
/**
* This invocation should be handled by the client-side interceptor chain.
*/
- public void performFailover()
+ public void performFailover(JMSRemotingConnection originalConnection)
{
throw new IllegalStateException("This invocation should not be handled here!");
}
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void closeValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void openValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public boolean isValveOpen()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public int getActiveThreadsCount()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Public ---------------------------------------------------------------------------------------
public void init()
@@ -328,17 +292,17 @@
{
return remotingConnection;
}
-
+
public int getServerID()
{
return serverID;
}
-
+
public Version getVersionToUse()
{
return versionToUse;
}
-
+
public void setVersionToUse(Version versionToUse)
{
this.versionToUse = versionToUse;
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -33,6 +33,7 @@
import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.FailoverEvent;
import org.jboss.jms.client.FailoverListener;
+import org.jboss.jms.client.Valve;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.message.MessageIdGenerator;
@@ -45,9 +46,9 @@
import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
/**
- *
+ *
* State corresponding to a connection. This state is acessible inside aspects/interceptors.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -85,6 +86,9 @@
private ResourceManager resourceManager;
private MessageIdGenerator idGenerator;
+ // this is used on ClusteredConnections only
+ private transient Valve valve;
+
// Cached by the connection state in case ClusteringAspect needs to re-try establishing
// connection on a different node
private transient String username;
@@ -158,6 +162,16 @@
return versionToUse;
}
+ public Valve getValve()
+ {
+ return valve;
+ }
+
+ public void setValve(Valve valve)
+ {
+ this.valve = valve;
+ }
+
public void synchronizeWith(HierarchicalState ns) throws Exception
{
ConnectionState newState = (ConnectionState)ns;
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalState.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalState.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalState.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -29,7 +29,7 @@
/**
* Any state that is Hierarchical in nature implements this interface (e.g. a connection has child
* sessions). Or, a session has child consumers, producers and browsers.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -40,30 +40,18 @@
public interface HierarchicalState
{
Set getChildren();
-
+
DelegateSupport getDelegate();
-
+
void setDelegate(DelegateSupport delegate);
HierarchicalState getParent();
-
+
void setParent(HierarchicalState parent);
Version getVersionToUse();
/**
- * Closes children's failover valves, by sending closeValve() invocations down children's
- * delegate stack. It is NOT intended to be recursive, unless the children chose so.
- */
- void closeChildrensValves() throws Exception;
-
- /**
- * Opens children's failover valves, by sending openValve() invocations down children's
- * delegate stack. It is NOT intended to be recursive, unless the children chose so.
- */
- void openChildrensValves() throws Exception;
-
- /**
* Update my own state based on the new state.
*/
void synchronizeWith(HierarchicalState newState) throws Exception;
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -22,19 +22,17 @@
package org.jboss.jms.client.state;
import java.util.Set;
-import java.util.Iterator;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.Valve;
/**
* Base implementation of HierarchicalState.
- *
+ *
* State is created and maintained by the StateCreationAspect. The state is placed in the meta data
* for the invocation, so that it is available in any of the interceptors/aspects, this enables each
* interceptor/aspect to access the state for it's delegate without having to add multiple get/set
* methods on the delegate API.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -76,34 +74,6 @@
return children;
}
- public void closeChildrensValves() throws Exception
- {
- if (children == null)
- {
- return;
- }
-
- for(Iterator i = children.iterator(); i.hasNext(); )
- {
- HierarchicalState s = (HierarchicalState)i.next();
- ((Valve)s.getDelegate()).closeValve();
- }
- }
-
- public void openChildrensValves() throws Exception
- {
- if (children == null)
- {
- return;
- }
-
- for(Iterator i = children.iterator(); i.hasNext(); )
- {
- HierarchicalState s = (HierarchicalState)i.next();
- ((Valve)s.getDelegate()).openValve();
- }
- }
-
// Public ---------------------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/BrowserDelegate.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/BrowserDelegate.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/BrowserDelegate.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -22,19 +22,18 @@
package org.jboss.jms.delegate;
import org.jboss.jms.server.endpoint.BrowserEndpoint;
-import org.jboss.jms.client.Valve;
/**
* Represents the minimal set of operations to provide browser
* functionality.
* Some of the methods may be implemented on the server, others
* will be handled in the advice stack.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*/
-public interface BrowserDelegate extends Valve, BrowserEndpoint
+public interface BrowserDelegate extends BrowserEndpoint
{
}
-
+
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -29,27 +29,27 @@
import org.jboss.jms.client.JBossConnectionConsumer;
import org.jboss.jms.client.FailoverListener;
-import org.jboss.jms.client.Valve;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.server.endpoint.ConnectionEndpoint;
/**
* Represents the minimal set of operations to provide connection functionality.
* Some of the methods may be implemented on the server, others will be handled in the advice stack.
- *
+ *
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
* $Id$
*/
-public interface ConnectionDelegate extends Valve, ConnectionEndpoint
-{
+public interface ConnectionDelegate extends ConnectionEndpoint
+{
ExceptionListener getExceptionListener() throws JMSException;
-
+
void setExceptionListener(ExceptionListener listener) throws JMSException;
-
+
ConnectionMetaData getConnectionMetaData() throws JMSException;
-
+
JBossConnectionConsumer createConnectionConsumer(Destination dest,
String subscriptionName,
String messageSelector,
@@ -59,6 +59,7 @@
void registerFailoverListener(FailoverListener failoverListener);
boolean unregisterFailoverListener(FailoverListener failoverListener);
- void performFailover() throws Exception;
+ // TODO: Should this be part of the public API?
+ void performFailover(JMSRemotingConnection failedConnection) throws Exception;
}
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -27,7 +27,6 @@
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.server.endpoint.ConsumerEndpoint;
-import org.jboss.jms.client.Valve;
/**
* Represents the minimal set of operations to provide consumer
@@ -41,7 +40,7 @@
*
* $Id$
*/
-public interface ConsumerDelegate extends Valve, ConsumerEndpoint
+public interface ConsumerDelegate extends ConsumerEndpoint
{
MessageListener getMessageListener();
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ProducerDelegate.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -25,40 +25,39 @@
import javax.jms.Message;
import org.jboss.jms.client.Closeable;
-import org.jboss.jms.client.Valve;
import org.jboss.jms.destination.JBossDestination;
/**
* Represents the minimal set of operations to provide producer functionality.
- *
+ *
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*/
-public interface ProducerDelegate extends Valve, Closeable
+public interface ProducerDelegate extends Closeable
{
void setDisableMessageID(boolean value) throws JMSException;
-
+
boolean getDisableMessageID() throws JMSException;
-
+
void setDisableMessageTimestamp(boolean value) throws JMSException;
-
+
boolean getDisableMessageTimestamp() throws JMSException;
-
+
void setDeliveryMode(int deliveryMode) throws JMSException;
-
+
int getDeliveryMode() throws JMSException;
-
+
void setPriority(int defaultPriority) throws JMSException;
-
+
int getPriority() throws JMSException;
-
+
void setTimeToLive(long timeToLive) throws JMSException;
-
+
long getTimeToLive() throws JMSException;
-
+
JBossDestination getDestination() throws JMSException;
-
+
void setDestination(JBossDestination dest);
/**
@@ -70,5 +69,5 @@
int deliveryMode,
int priority,
long timeToLive) throws JMSException;
-
+
}
Modified: branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -37,19 +37,18 @@
import org.jboss.jms.message.TextMessageProxy;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.jms.server.endpoint.SessionEndpoint;
-import org.jboss.jms.client.Valve;
/**
* Represents the minimal set of operations to provide session functionality.
* Some of the methods may be implemented on the server, others will be handled in the advice stack.
- *
+ *
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
* $Id$
*/
-public interface SessionDelegate extends Valve, SessionEndpoint
+public interface SessionDelegate extends SessionEndpoint
{
MessageProxy createMessage() throws JMSException;
Deleted: branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -1,909 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.jms;
-
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.jms.client.Valve;
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossQueueBrowser;
-import org.jboss.jms.client.JBossMessageProducer;
-import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.state.HierarchicalState;
-import org.jboss.jms.client.state.HierarchicalStateSupport;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.container.FailoverValveInterceptor;
-import org.jboss.jms.server.Version;
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.aop.MethodInfo;
-import org.jboss.aop.util.MethodHashing;
-import org.jboss.aop.advice.Interceptor;
-import org.jboss.remoting.Client;
-
-import javax.naming.InitialContext;
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.ConnectionFactory;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.QueueBrowser;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.Iterator;
-
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.Slot;
-
-/**
- * This is not necessarily a clustering test, as the failover valves are installed even for a
- * non-clustered configuration.
- *
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1843 $</tt>
- *
- * $Id: JMSTest.java 1843 2006-12-21 23:41:19Z timfox $
- */
-public class FailoverValveTest extends MessagingTestCase
-{
- // Constants ------------------------------------------------------------------------------------
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- private InitialContext ic;
- private ConnectionFactory cf;
- private Queue queue;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public FailoverValveTest(String name)
- {
- super(name);
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void testCloseValveHierarchy() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- assertTrue(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
-
- MessageProducer prod = session.createProducer(queue);
- assertTrue(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
-
- MessageConsumer cons = session.createConsumer(queue);
- assertTrue(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
-
- QueueBrowser browser = session.createBrowser(queue);
- assertTrue(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().closeValve();
-
- log.debug("top level valve closed");
-
- assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().openValve();
-
- log.debug("top level valve open");
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- public void testCloseValveHierarchy2() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
- Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-
- MessageProducer prod1 = session1.createProducer(queue);
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-
- MessageConsumer cons1 = session1.createConsumer(queue);
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-
- QueueBrowser browser1 = session1.createBrowser(queue);
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-
- MessageProducer prod2 = session2.createProducer(queue);
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-
- MessageConsumer cons2 = session2.createConsumer(queue);
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-
- QueueBrowser browser2 = session2.createBrowser(queue);
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().closeValve();
-
- log.debug("top level valve closed");
-
- assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().openValve();
-
- log.debug("top level valve open");
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void testValveOpenByDefault() throws Throwable
- {
- FailoverValveInterceptor valve = new FailoverValveInterceptor();
- SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // try to open the valve again, it should be a noop
- valve.invoke(buildInvocation("openValve", target));
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
- }
-
- public void testPassThroughOpenValve() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- // send a thread through the open valve
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 1");
-
- t.start();
- t.join();
-
- // the thread should have passed through the open valve
- assertEquals(1, target.getBusinessMethod1InvocationCount());
- assertEquals(1, target.getActiveThreadsHighWaterMark());
-
- }
-
- public void testFlipValve() throws Throwable
- {
- FailoverValveInterceptor valve = new FailoverValveInterceptor();
- SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // child states also must have been notified to close their corresponding valves
- ParentHierarchicalState state = (ParentHierarchicalState)target.getState();
-
- for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
- {
- ChildState cs = (ChildState)i.next();
- assertFalse(((Valve)cs.getDelegate()).isValveOpen());
- }
-
- // try to close the valve again, it should be a noop
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
- }
-
- public void testFlipValve2() throws Throwable
- {
- FailoverValveInterceptor valve = new FailoverValveInterceptor();
- SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // child states also must have been notified to close their corresponding valves
- ParentHierarchicalState state = (ParentHierarchicalState)target.getState();
-
- for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
- {
- ChildState cs = (ChildState)i.next();
- assertFalse(((Valve)cs.getDelegate()).isValveOpen());
- }
-
- // re-open the valve
-
- valve.invoke(buildInvocation("openValve", target));
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // child states also must have been notified to open their corresponding valves
- state = (ParentHierarchicalState)target.getState();
-
- for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
- {
- ChildState cs = (ChildState)i.next();
- assertTrue(((Valve)cs.getDelegate()).isValveOpen());
- }
- }
-
- /**
- * Close the valve and send a thread through it. The thread must be put on hold until the valve
- * is opened again.
- */
- public void testCloseValveOneBusinessThread() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-
- log.debug("the valve is closed");
-
- // smack a thread into the closed valve
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 1");
-
- t.start();
-
- // wait for 10 secs for the invocation to reach target object. It shouldn't ...
- long waitTime = 10000;
- log.info("probing target for " + waitTime / 1000 + " seconds ...");
- InvocationToken arrived = target.waitForInvocation(waitTime);
-
- if (arrived != null)
- {
- fail(arrived.getMethodName() + "() reached target, while it shouldn't have!");
- }
-
-
- log.debug("the business thread didn't go through");
-
- // open the valve
-
- valve.invoke(buildInvocation("openValve", target));
-
- // the business invocation should complete almost immediately; wait on mutex to avoid race
- // condition
-
- arrived = target.waitForInvocation(2000);
-
- assertEquals("businessMethod1", arrived.getMethodName());
- assertEquals(1, target.getBusinessMethod1InvocationCount());
- assertEquals(1, target.getActiveThreadsHighWaterMark());
- }
-
-
- /**
- * Close the valve and send three threads through it. The threads must be put on hold until the
- * valve is opened again.
- */
- public void testCloseValveThreeBusinessThread() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-
- log.debug("the valve is closed");
-
- // smack thread 1 into the closed valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 1").start();
-
- // smack thread 2 into the closed valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 2").start();
-
- // smack thread 3 into the closed valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod2", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod2 invocation failed", t);
- }
- }
- }, "Business Thread 3").start();
-
- // wait for 10 secs for any invocation to reach target object. It shouldn't ...
- long waitTime = 10000;
- log.info("probing target for " + waitTime / 1000 + " seconds ...");
- InvocationToken arrived = target.waitForInvocation(waitTime);
-
- if (arrived != null)
- {
- fail(arrived.getMethodName() + "() reached target, while it shouldn't have!");
- }
-
- log.debug("the business threads didn't go through");
-
- // open the valve
-
- valve.invoke(buildInvocation("openValve", target));
-
- // the business invocations should complete almost immediately; wait on mutex to avoid race
- // condition
-
- arrived = target.waitForInvocation(2000);
- assertNotNull(arrived);
-
- arrived = target.waitForInvocation(2000);
- assertNotNull(arrived);
-
- arrived = target.waitForInvocation(2000);
- assertNotNull(arrived);
-
- // wait for 3 secs for any invocation to reach target object. It shouldn't ...
- waitTime = 3000;
- log.info("probing target for " + waitTime / 1000 + " seconds ...");
- arrived = target.waitForInvocation(waitTime);
-
- if (arrived != null)
- {
- fail("Extra " + arrived.getMethodName() + "() reached target, " +
- "while it shouldn't have!");
- }
-
- assertEquals(2, target.getBusinessMethod1InvocationCount());
- assertEquals(1, target.getBusinessMethod2InvocationCount());
- assertEquals(3, target.getActiveThreadsHighWaterMark());
- }
-
- /**
- * The current standard behavior is that the valve cannot be closed as long as there are
- * active threads. closeValve() will block undefinitely.
- */
- public void testCloseWhileActiveThreads() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
-
- // send a long running thread through the valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("blockingBusinessMethod", target));
- }
- catch(Throwable t)
- {
- log.error("blockingBusinessMethod invocation failed", t);
- }
- }
- }, "Long running business thread").start();
-
- // allow blockingBusinessMethod time to block
- Thread.sleep(2000);
-
- assertEquals(new Integer(1), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- final Slot closingCompleted = new Slot();
-
- // from a different thread try to close the valve; this thread will block until we unblock
- // the business method
-
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("closeValve", target));
- closingCompleted.put(Boolean.TRUE);
-
- }
- catch(Throwable t)
- {
- log.error("blockingBusinessMethod() invocation failed", t);
- }
- }
- }, "Valve closing thread").start();
-
-
- // closing shouldn't be completed for a long time .... actually never, if I don't unblock
- // the business method
-
- // wait for 15 secs for closing. It shouldn't ...
- long waitTime = 15000;
- log.info("probing closing for " + waitTime / 1000 + " seconds ...");
-
- Boolean closed = (Boolean)closingCompleted.poll(waitTime);
-
- if (closed != null)
- {
- fail("closeValve() went through, while it shouldn't have!");
- }
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(1), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- log.info("valve still open ...");
-
- // unblock blockingBusinessMethod
- target.unblockBlockingBusinessMethod();
-
- // valve closing should complete immediately after that
- closed = (Boolean)closingCompleted.poll(1000);
- assertTrue(closed.booleanValue());
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- ServerManagement.start("all");
-
- ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-
- ServerManagement.deployQueue("TestQueue");
-
- cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
- queue = (Queue)ic.lookup("/queue/TestQueue");
-
- log.debug("setup done");
- }
-
- protected void tearDown() throws Exception
- {
- ServerManagement.undeployQueue("TestQueue");
-
- ic.close();
-
- super.tearDown();
- }
-
- // Private --------------------------------------------------------------------------------------
-
- private Invocation buildInvocation(String methodName, Object targetObject)
- throws Exception
- {
- long hash =
- MethodHashing.calculateHash(targetObject.getClass().getMethod(methodName, new Class[0]));
- MethodInfo mi = new MethodInfo(targetObject.getClass(), hash, hash, null);
- MethodInvocation invocation = new MethodInvocation(mi, new Interceptor[0]);
- invocation.setTargetObject(targetObject);
- return invocation;
- }
-
- // Inner classes --------------------------------------------------------------------------------
-
- public interface BusinessObject
- {
- void businessMethod1();
- void businessMethod2();
- /**
- * Must be unblocked externally.
- */
- void blockingBusinessMethod() throws InterruptedException;
- }
-
- public class SimpleInvocationTargetObject
- extends DelegateSupport implements BusinessObject, Valve
- {
- private int businessMethod1InvocationCount;
- private int businessMethod2InvocationCount;
-
- // LinkedQueue<InvocationToken>
- private LinkedQueue invocationTokens;
-
- private FailoverValveInterceptor valve;
- private int activeThreadsCountHighWaterMark;
-
- private Object blockingMethodWaitArea;
-
- public SimpleInvocationTargetObject(FailoverValveInterceptor valve)
- {
- super();
- setState(new ParentHierarchicalState());
- businessMethod1InvocationCount = 0;
- businessMethod2InvocationCount = 0;
- invocationTokens = new LinkedQueue();
- this.valve = valve;
- activeThreadsCountHighWaterMark = 0;
- blockingMethodWaitArea = new Object();
- }
-
- protected Client getClient() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public boolean isValveOpen()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void closeValve() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void openValve() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public int getActiveThreadsCount()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public synchronized void businessMethod1()
- {
- businessMethod1InvocationCount++;
- updateActiveThreadsCountHighWaterMark();
- notifyInvocationWaiter("businessMethod1");
- }
-
- public synchronized void businessMethod2()
- {
- businessMethod2InvocationCount++;
- updateActiveThreadsCountHighWaterMark();
- notifyInvocationWaiter("businessMethod2");
- }
-
- public void blockingBusinessMethod() throws InterruptedException
- {
- synchronized(blockingMethodWaitArea)
- {
- log.info("blockingBusinessMethod() blocking ...");
- // block calling thread undefinitely until blockingMethodWaitArea is notified
- blockingMethodWaitArea.wait();
- log.info("blockingBusinessMethod() unblocked");
- }
- }
-
- public synchronized int getBusinessMethod1InvocationCount()
- {
- return businessMethod1InvocationCount;
- }
-
- public synchronized int getBusinessMethod2InvocationCount()
- {
- return businessMethod2InvocationCount;
- }
-
- public synchronized int getActiveThreadsHighWaterMark()
- {
- return activeThreadsCountHighWaterMark;
- }
-
- /**
- * Block until an invocation arrives into the target. If the invocation arrived prior to
- * calling this method, it returns immediately.
- *
- * @return a token corresponding to the business invocation, or null if the method exited with
- * timout, without an invocation to arive.
- */
- public InvocationToken waitForInvocation(long timeout) throws InterruptedException
- {
- return (InvocationToken)invocationTokens.poll(timeout);
- }
-
- public void unblockBlockingBusinessMethod()
- {
- synchronized(blockingMethodWaitArea)
- {
- blockingMethodWaitArea.notify();
- }
- }
-
- /**
- * Reset the state
- */
- public synchronized void reset()
- {
- businessMethod1InvocationCount = 0;
- businessMethod2InvocationCount = 0;
- activeThreadsCountHighWaterMark = 0;
- }
-
- /**
- * Notify someone who waits for this invocation to arrive.
- */
- private void notifyInvocationWaiter(String methodName)
- {
- try
- {
- invocationTokens.put(new InvocationToken(methodName));
- }
- catch(InterruptedException e)
- {
- throw new RuntimeException("Failed to deposit notification in queue", e);
- }
- }
-
- private synchronized void updateActiveThreadsCountHighWaterMark()
- {
- try
- {
- int c =
- ((Integer)valve.invoke(buildInvocation("getActiveThreadsCount", this))).intValue();
- if (c > activeThreadsCountHighWaterMark)
- {
- activeThreadsCountHighWaterMark = c;
- }
- }
- catch(Throwable t)
- {
- throw new RuntimeException("Failed to get getActiveThreadsCount", t);
- }
- }
- }
-
- private class ParentHierarchicalState extends HierarchicalStateSupport
- {
- private DelegateSupport delegate;
- private HierarchicalState parent;
-
- ParentHierarchicalState()
- {
- super(null, null);
-
- children = new HashSet();
-
- // populate it with a child state
- children.add(new ChildState());
- }
-
- public DelegateSupport getDelegate()
- {
- return delegate;
- }
-
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate = delegate;
- }
-
- public HierarchicalState getParent()
- {
- return parent;
- }
-
- public void setParent(HierarchicalState parent)
- {
- this.parent = parent;
- }
-
- public Version getVersionToUse()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void synchronizeWith(HierarchicalState newState) throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
- }
-
- private class ChildState extends HierarchicalStateSupport
- {
- private DelegateSupport delegate;
- private HierarchicalState parent;
-
- ChildState()
- {
- super(null, new ChildDelegate());
- }
-
- public Set getChildren()
- {
- return Collections.EMPTY_SET;
- }
-
- public DelegateSupport getDelegate()
- {
- return delegate;
- }
-
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate = delegate;
- }
-
- public HierarchicalState getParent()
- {
- return parent;
- }
-
- public void setParent(HierarchicalState parent)
- {
- this.parent = parent;
- }
-
- public Version getVersionToUse()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void synchronizeWith(HierarchicalState newState) throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
- }
-
- private class ChildDelegate extends DelegateSupport implements Valve
- {
- private boolean valveOpen;
-
- ChildDelegate()
- {
- valveOpen = true;
- }
-
- protected Client getClient() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public synchronized void closeValve() throws Exception
- {
- valveOpen = false;
- }
-
- public void openValve() throws Exception
- {
- valveOpen = true;
- }
-
- public boolean isValveOpen()
- {
- return valveOpen;
- }
-
- public int getActiveThreadsCount()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
- }
-
- private class InvocationToken
- {
- private String methodName;
-
- public InvocationToken(String methodName)
- {
- this.methodName = methodName;
- }
-
- public String getMethodName()
- {
- return methodName;
- }
- }
-
-}
Modified: branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-05 06:02:17 UTC (rev 1897)
+++ branches/Branch_Cleberts_Third_Failover/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-05 17:20:38 UTC (rev 1898)
@@ -11,11 +11,6 @@
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.FailoverListener;
import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.client.Valve;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossQueueBrowser;
-import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.JBossMessageProducer;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.delegate.DelegateSupport;
@@ -1111,77 +1106,6 @@
}
}
- public void testCloseValveHierarchy() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
- Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-
- MessageProducer prod1 = session1.createProducer(queue[0]);
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-
- MessageConsumer cons1 = session1.createConsumer(queue[0]);
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-
- QueueBrowser browser1 = session1.createBrowser(queue[0]);
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-
- MessageProducer prod2 = session2.createProducer(queue[0]);
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-
- MessageConsumer cons2 = session2.createConsumer(queue[0]);
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-
- QueueBrowser browser2 = session2.createBrowser(queue[0]);
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().closeValve();
-
- log.debug("top level valve closed");
-
- assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().openValve();
-
- log.debug("top level valve open");
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
public void testFailoverMessageOnServer() throws Exception
{
Connection conn = null;
More information about the jboss-cvs-commits
mailing list