[jboss-cvs] JBoss Messaging SVN: r1928 - in trunk: src/etc src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools/jmx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 9 06:36:51 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-01-09 06:36:35 -0500 (Tue, 09 Jan 2007)
New Revision: 1928

Added:
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/FailoverValve.java
Removed:
   trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java
   trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java
Modified:
   trunk/src/etc/aop-messaging-client.xml
   trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
   trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
   trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
   trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
   trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
   trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java
   trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
   trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
   trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
   trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Using singleton valve on failover (Clebert's work).
http://jira.jboss.org/jira/browse/JBMESSAGING-712
http://jira.jboss.org/jira/browse/JBMESSAGING-719




Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/etc/aop-messaging-client.xml	2007-01-09 11:36:35 UTC (rev 1928)
@@ -19,7 +19,6 @@
    <aspect class="org.jboss.jms.client.container.ConnectionAspect" scope="PER_INSTANCE"/>
    <aspect class="org.jboss.jms.client.container.ClusteringAspect" scope="PER_INSTANCE"/>
    <aspect class="org.jboss.jms.client.container.FactoryAspect" scope="PER_INSTANCE"/>
-   <aspect class="org.jboss.jms.client.container.FailoverAspect" scope="PER_VM"/>
 
    <!--
        Clustered ConnectionFactory Stack
@@ -49,10 +48,6 @@
         Connection Stack
    -->
 
-   <!-- It is important that FailoverAspect intercepts performFailover() before FailoverValveInterceptor -->
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->performFailover())">
-      <advice name="handlePerformFailover" aspect="org.jboss.jms.client.container.FailoverAspect"/>
-   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.delegate.ConnectionDelegate}(..))">
       <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>

Added: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -0,0 +1,195 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client;
+
+import org.jboss.logging.Logger;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * The class in charge with performing the failover.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class FailoverCommandCenter
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(FailoverCommandCenter.class);
+
+   // Static ---------------------------------------------------------------------------------------
+
+   private static boolean trace = log.isTraceEnabled();
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private ConnectionState state;
+
+   private FailoverValve valve;
+
+   private List failoverListeners;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public FailoverCommandCenter(ConnectionState state)
+   {
+      this.state = state;
+      failoverListeners = new ArrayList();
+      valve = new FailoverValve();
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   /**
+    * Method called by failure detection components (FailoverValveInterceptors and
+    * ConnectionListeners) when they have reasons to belive that a server failure occured.
+    */
+   public void failureDetected(Throwable reason, JMSRemotingConnection remotingConnection)
+      throws Exception
+   {
+      // generate a FAILURE_DETECTED event
+      broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILURE_DETECTED, this));
+
+      log.debug(this + " initiating client-side failover");
+
+      CreateConnectionResult res = null;
+      boolean failoverSuccessful = false;
+
+      try
+      {
+         // block any other invocations ariving to any delegate from the hierarchy while we're
+         // doing failover
+
+         valve.close();
+
+         if (remotingConnection != null)
+         {
+            if (remotingConnection.isFailed())
+            {
+               log.debug(this + " ignoring failure detection notification, as failover was " +
+                  "already performed on this connection");
+            }
+            remotingConnection.setFailed(true);
+         }
+
+         // generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
+         // to insure the client-side stack is in a deterministic state
+         broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));
+
+         int failedNodeID = state.getServerID();
+         ConnectionFactoryDelegate clusteredDelegate =
+            state.getClusteredConnectionFactoryDelegate();
+
+         // re-try creating the connection
+         res = clusteredDelegate.
+            createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
+
+         if (res == null)
+         {
+            // No failover attempt was detected on the server side; this might happen if the
+            // client side network fails temporarily so the client connection breaks but the
+            // server cluster is still up and running - in this case we don't perform failover.
+            failoverSuccessful = false;
+         }
+         else
+         {
+            // recursively synchronize state
+            ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
+            state.getDelegate().synchronizeWith(newDelegate);
+            failoverSuccessful = true;
+         }
+      }
+      finally
+      {
+         if (failoverSuccessful)
+         {
+            broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_COMPLETED, this));
+         }
+         else
+         {
+            broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED, this));
+         }
+
+         valve.open();
+      }
+   }
+
+   public void registerFailoverListener(FailoverListener listener)
+   {
+      synchronized(failoverListeners)
+      {
+         failoverListeners.add(listener);
+      }
+   }
+
+   public boolean unregisterFailoverListener(FailoverListener listener)
+   {
+      synchronized(failoverListeners)
+      {
+         return failoverListeners.remove(listener);
+      }
+   }
+
+   public FailoverValve getValve()
+   {
+      return valve;
+   }
+
+   public JMSRemotingConnection getRemotingConnection()
+   {
+      return state.getRemotingConnection();
+   }
+
+   public String toString()
+   {
+      return "FailoverCommandCenter[" + state + "]";
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   private void broadcastFailoverEvent(FailoverEvent e)
+   {
+      if (trace) { log.trace(this + " broadcasting " + e); }
+
+      List listenersCopy;
+
+      synchronized(failoverListeners)
+      {
+         listenersCopy = new ArrayList(failoverListeners);
+      }
+
+      for(Iterator i = listenersCopy.iterator(); i.hasNext(); )
+      {
+         FailoverListener listener = (FailoverListener)i.next();
+
+         try
+         {
+            listener.failoverEventOccured(e);
+         }
+         catch(Exception ex)
+         {
+            log.warn("Failover listener " + listener + " did not accept event", ex);
+         }
+      }
+   }
+
+   // Inner classes --------------------------------------------------------------------------------
+}

Added: trunk/src/main/org/jboss/jms/client/FailoverValve.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -0,0 +1,297 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client;
+
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+
+import java.util.Map;
+import java.util.Stack;
+import java.util.Iterator;
+import java.io.StringWriter;
+import java.io.PrintWriter;
+
+/**
+ * The valve will block any call as long as it is closed.
+ *
+ * Usage: call enter() when performing a regular call and leave() in a finally block. Call close()
+ * when performing a failover, and open() in a finally block.
+ *
+ * The class contains logic to avoid dead locks between multiple threads closing the valve at the
+ * same time, which uses referencing counting on a threadLocal variable. That's why it's very
+ * important to aways leave the valve in a finally block.
+ *
+ * This class also generate tracing information, to help debug situations like the case the valve
+ * can't be closed, but only if trace is enabled on log4j.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class FailoverValve
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(Valve.class);
+
+   // Static ---------------------------------------------------------------------------------------
+
+   private static boolean trace = log.isTraceEnabled();
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // we keep a ThreadLocal counter to help avoid deadlocks when multiple threads are closing
+   // the valve
+   private ThreadLocal counterLocal = new ThreadLocal();
+
+   private ReadWriteLock lock;
+
+   private int activeLocks = 0;
+   private int activeCloses = 0;
+
+   // these are only initialized if tracing is enabled
+   private ThreadLocal stackCloses;
+   private ThreadLocal stackEnters;
+
+   private Map debugCloses;
+   private Map debugEnters;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public FailoverValve()
+   {
+      // We're using reentrant locks because we will need to to acquire read locks after write locks
+      // have been already acquired. There is also a case when a readLock will be promoted to
+      // writeLock when a failover occurs; using reentrant locks will make this usage transparent
+      // for the API, we just close the valve and the read lock is promoted to write lock.
+      lock = new ReentrantWriterPreferenceReadWriteLock();
+
+      if (trace)
+      {
+         stackCloses = new ThreadLocal();
+         stackEnters = new ThreadLocal();
+         debugCloses = new ConcurrentHashMap();
+         debugEnters = new ConcurrentHashMap();
+      }
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public void enter() throws InterruptedException
+   {
+      lock.readLock().acquire();
+
+      getCounter().counter++;
+
+      synchronized (this)
+      {
+         activeLocks++;
+      }
+
+      if (trace)
+      {
+         Exception ex = new Exception();
+         getStackEnters().push(ex);
+         debugEnters.put(ex, Thread.currentThread());
+      }
+   }
+
+   public void leave() throws InterruptedException
+   {
+      lock.readLock().release();
+
+      getCounter().counter--;
+
+      synchronized (this)
+      {
+         activeLocks--;
+
+         if (activeLocks < 0)
+         {
+            throw new IllegalStateException("leave() was called without a prior enter() call");
+         }
+      }
+
+      if (trace)
+      {
+         Exception ex = (Exception) getStackEnters().pop();
+         debugEnters.remove(ex);
+      }
+   }
+
+   public void close() throws InterruptedException
+   {
+      // Before assuming a write lock, we need to release reentrant read locks.
+      // When simultaneous threads are closing a valve (as simultaneous threads are capturing a
+      // failure) we won't be able to close the valve until all the readLocks are released. This
+      // release routine will be able to resolve the deadlock while we still guarantee the unicity
+      // of the lock. The useCase for this is when a failure is captured when a thread is already
+      // holding a read-lock. For example if a failure happens when sending ACKs, the valve will be
+      // already hold on receiveMessage, while the sendACK will be trying to close the Valve. This
+      // wouldn't be a problem if we had only single threads but the problem is we will be waiting
+      // on a readLock on another thread that might also be waiting to close the valve as fail event
+      // will be captured by multiple threads. So, in summary we need to completely leave the valve
+      // before closing it or a dead lock will happen if multiple threads are closing the valve at
+      // same time waiting on each others readLocks before acquiring a writeLock.
+      int counter = getCounter().counter;
+
+      for (int i = 0; i < counter; i++)
+      {
+         lock.readLock().release();
+      }
+
+      boolean acquired = false;
+      do
+      {
+         acquired = lock.writeLock().attempt(5000);
+
+         if (!acquired)
+         {
+            log.warn("Couldn't close valve, trying again", new Exception());
+            if (trace) { log.trace(debugValve()); }
+         }
+      }
+      while (!acquired);
+
+      activeCloses++;
+      activeLocks++;
+
+      // Sanity check only...
+      if (activeCloses > 1)
+      {
+         lock.writeLock().release();
+         throw new IllegalStateException("Valve closed twice");
+      }
+
+      if (trace)
+      {
+         Exception ex = new Exception();
+         getStackCloses().push(ex);
+         debugCloses.put(ex, Thread.currentThread());
+      }
+   }
+
+   public void open() throws InterruptedException
+   {
+      if (activeCloses <= 0 || activeLocks <= 0)
+      {
+         throw new IllegalStateException("Valve not closed");
+      }
+
+      activeCloses--;
+      activeLocks--;
+
+      lock.writeLock().release();
+
+      // re-apply the locks as we had before closing the valve
+      int counter = getCounter().counter;
+      for (int i = 0; i < counter; i++)
+      {
+         lock.readLock().acquire();
+      }
+
+      if (trace)
+      {
+         Exception ex = (Exception) getStackCloses().pop();
+         debugCloses.remove(ex);
+      }
+   }
+
+   public synchronized int getActiveLocks()
+   {
+      return activeLocks;
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   /**
+    * Counter of times this thread entered the valve.
+    */
+   private Counter getCounter()
+   {
+      if (counterLocal.get() == null)
+      {
+         counterLocal.set(new Counter());
+      }
+
+      return (Counter)counterLocal.get();
+   }
+
+
+   private Stack getStackCloses()
+   {
+      if (stackCloses.get() == null)
+      {
+         stackCloses.set(new Stack());
+      }
+
+      return (Stack) stackCloses.get();
+   }
+
+   private Stack getStackEnters()
+   {
+      if (stackEnters.get() == null)
+      {
+         stackEnters.set(new Stack());
+      }
+      return (Stack) stackEnters.get();
+   }
+
+   /**
+    * This method will show the threads that are currently holding locks (enters or closes).
+    * */
+   private synchronized String debugValve()
+   {
+      StringWriter buffer = new StringWriter();
+      PrintWriter writer = new PrintWriter(buffer);
+
+      writer.println("********************** Debug Valve Information *************************");
+      writer.println("Close owners");
+
+      // Close should never have more than 1 thread owning, but as this is a debug report we will
+      // consider that as a possibility just to show eventual bugs (just in case thie class is ever
+      // changed)
+      for (Iterator iter = debugCloses.entrySet().iterator(); iter.hasNext();)
+      {
+         Map.Entry entry = (Map.Entry) iter.next();
+         writer.println("Thread that own a close =" + entry.getValue());
+         writer.println("StackTrace:");
+         Exception e = (Exception) entry.getKey();
+         e.printStackTrace(writer);
+      }
+
+      writer.println("Valve owners");
+      for (Iterator iter = debugEnters.entrySet().iterator(); iter.hasNext();)
+      {
+         Map.Entry entry = (Map.Entry) iter.next();
+         writer.println("Thread that own valve =" + entry.getValue());
+         writer.println("StackTrace:");
+         Exception e = (Exception) entry.getKey();
+         e.printStackTrace(writer);
+      }
+
+      return buffer.toString();
+   }
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   /**
+    * Used to count the number of read locks (or enters) owned by this thread
+    */
+   private static class Counter
+   {
+      int counter;
+   }
+
+}

Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -14,6 +14,7 @@
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
 
 import javax.jms.JMSException;
@@ -121,21 +122,22 @@
             log.debug(this + " got local connection delegate " + cd);
 
             ConnectionState state = (ConnectionState)((DelegateSupport)cd).getState();
+            FailoverCommandCenter fcc = state.getFailoverCommandCenter();
 
             // add a connection listener to detect failure; the consolidated remoting connection
             // listener must be already in place and configured
             state.getRemotingConnection().getConnectionListener().
-               addDelegateListener(new ConnectionFailureListener(cd));
+               addDelegateListener(new ConnectionFailureListener(fcc));
 
             log.debug(this + " installed failure listener on " + cd);
 
-            // also cache the username and the password into state, useful in case FailoverAspect
-            // needs to create a new connection instead of a failed on
+            // also cache the username and the password into state, useful in case
+            // FailoverCommandCenter needs to create a new connection instead of a failed on
             state.setUsername(username);
             state.setPassword(password);
 
             // also add a reference to the clustered ConnectionFactory delegate, useful in case
-            // FailoverAspect needs to create a new connection instead of a failed on
+            // FailoverCommandCenter needs to create a new connection instead of a failed on
             state.setClusteredConnectionFactoryDeleage(clusteredDelegate);
 
             return new CreateConnectionResult(cd);

Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -196,7 +196,7 @@
       MethodInvocation mi = (MethodInvocation)invocation;
       FailoverListener listener = (FailoverListener)mi.getArguments()[0];
 
-      state.registerFailoverListener(listener);
+      state.getFailoverCommandCenter().registerFailoverListener(listener);
 
       return null;
    }
@@ -208,7 +208,7 @@
       MethodInvocation mi = (MethodInvocation)invocation;
       FailoverListener listener = (FailoverListener)mi.getArguments()[0];
 
-      boolean result = state.unregisterFailoverListener(listener);
+      boolean result = state.getFailoverCommandCenter().unregisterFailoverListener(listener);
 
       return new Boolean(result);
    }

Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -11,6 +11,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.client.state.ConnectionState;
 
 /**
@@ -32,13 +33,13 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private ClientConnectionDelegate cd;
+   private FailoverCommandCenter fcc;
 
    // Constructors ---------------------------------------------------------------------------------
 
-   ConnectionFailureListener(ClientConnectionDelegate cd)
+   ConnectionFailureListener(FailoverCommandCenter fcc)
    {
-      this.cd = cd;
+      this.fcc = fcc;
    }
 
    // ConnectionListener implementation ------------------------------------------------------------
@@ -49,13 +50,8 @@
       {
          log.debug(this + " is being notified of connection failure: " + throwable);
 
-         // generate a FAILURE_DETECTED event
-         ((ConnectionState)cd.getState()).
-            broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILURE_DETECTED, cd));
+         fcc.failureDetected(throwable, null);
 
-         log.debug(this + " initiating client-side failover");
-
-         cd.performFailover();
       }
       catch (Throwable e)
       {
@@ -67,7 +63,7 @@
 
    public String toString()
    {
-      return "ConnectionFailureListener[" + cd + "]";
+      return "ConnectionFailureListener[" + fcc + "]";
    }
 
    // Package protected ----------------------------------------------------------------------------

Deleted: trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -1,119 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.jms.client.container;
-
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.delegate.ConnectionFactoryDelegate;
-import org.jboss.logging.Logger;
-
-/**
- * PER_VM Aspect.
- *
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class FailoverAspect
-{
-   // Constants ------------------------------------------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(FailoverAspect.class);
-
-   // Static ---------------------------------------------------------------------------------------
-
-   // Attributes -----------------------------------------------------------------------------------
-
-   // Constructors ---------------------------------------------------------------------------------
-
-   // Public ---------------------------------------------------------------------------------------
-
-   public Object handlePerformFailover(Invocation invocation) throws Throwable
-   {
-      Object target = invocation.getTargetObject();
-
-      if(target instanceof ClientConnectionDelegate)
-      {
-         performConnectionFailover((ClientConnectionDelegate)target);
-      }
-
-      return null;
-   }
-
-   // Package protected ----------------------------------------------------------------------------
-
-   // Protected ------------------------------------------------------------------------------------
-
-   // Private --------------------------------------------------------------------------------------
-
-   private void performConnectionFailover(ClientConnectionDelegate delegate) throws Exception
-   {
-      ConnectionState state = (ConnectionState)delegate.getState();
-
-      CreateConnectionResult res = null;
-      boolean failoverSuccessful = false;
-
-      try
-      {
-         // block any other invocations ariving while we're doing failover, on this delegate and
-         // recursively down the hierarchy
-
-         // WARNING - this may block if there are active invocations through valves!
-         delegate.closeValve();
-
-         log.debug("starting client-side failover");
-
-         // generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
-         // to insure the client-side stack is in a deterministic state
-         state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, delegate));
-
-         int failedNodeID = state.getServerID();
-         ConnectionFactoryDelegate clusteredDelegate =
-            state.getClusteredConnectionFactoryDelegate();
-
-         // re-try creating the connection
-         res = clusteredDelegate.
-            createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
-
-         if (res == null)
-         {
-            // No failover attempt was detected on the server side; this might happen if the
-            // client side network fails temporarily so the client connection breaks but the
-            // server cluster is still up and running - in this case we don't perform failover.
-            failoverSuccessful = false;
-         }
-         else
-         {
-            // recursively synchronize state
-            ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
-            delegate.synchronizeWith(newDelegate);
-            failoverSuccessful = true;
-         }
-      }
-      finally
-      {
-         if (failoverSuccessful)
-         {
-            state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_COMPLETED,
-                                                           res.getDelegate()));
-         }
-         else
-         {
-            state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED,
-                                                           delegate));
-         }
-
-         // failover done, open valves
-         delegate.openValve();
-      }
-   }
-
-   // Inner classes --------------------------------------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -8,24 +8,24 @@
 
 import org.jboss.aop.advice.Interceptor;
 import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.logging.Logger;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.state.HierarchicalState;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.FailoverCommandCenter;
+import org.jboss.jms.client.FailoverValve;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.remoting.CannotConnectException;
 
-import java.util.List;
-import java.util.ArrayList;
 import java.io.IOException;
 
 /**
  * An interceptor that acts as a failover valve: it allows all invocations to go through as long
  * as there is no failover in progress (valve is open), and holds all invocations while client-side
- * failover is taking place (valve is closed). The interceptor fields org.jboss.jms.client.Valve's
- * method calls.
+ * failover is taking place (valve is closed). The interceptor is also a failover detector, in that
+ * it catches "failure-triggering" exceptions, and notifies the failover command center.
  *
+ * The interceptor fields org.jboss.jms.client.Valve's method calls.
+ *
  * It is a PER_INSTANCE interceptor.
  *
  * An instance of this interceptor must guard access to each connection, session, producer, consumer
@@ -39,39 +39,16 @@
 {
    // Constants ------------------------------------------------------------------------------------
 
-   private static final Logger log = Logger.getLogger(FailoverValveInterceptor.class);
-
    // Static ---------------------------------------------------------------------------------------
 
-   private static boolean trace = log.isTraceEnabled();
-
    // Attributes -----------------------------------------------------------------------------------
 
    private DelegateSupport delegate;
-   private HierarchicalState state;
-   private volatile boolean valveOpen;
+   private FailoverCommandCenter fcc;
+   private FailoverValve valve;
 
-   private ReadWriteLock rwLock;
-
-   // the number of threads currently "penetrating" the open valve
-   private int activeThreadsCount;
-
-   // only for tracing
-   private List activeMethods;
-
    // Constructors ---------------------------------------------------------------------------------
 
-   public FailoverValveInterceptor()
-   {
-      valveOpen = true;
-      activeThreadsCount = 0;
-      rwLock = new WriterPreferenceReadWriteLock();
-      if (trace)
-      {
-         activeMethods = new ArrayList();
-      }
-   }
-
    // Interceptor implemenation --------------------------------------------------------------------
 
    public String getName()
@@ -81,149 +58,52 @@
 
    public Object invoke(Invocation invocation) throws Throwable
    {
-      // maintain a reference to the delegate that sends invocation through this interceptor. It
-      // makes sense, since it's an PER_INSTANCE interceptor.
-      if (delegate == null)
+      // maintain a reference to the FailoverCommandCenter instance.
+
+      if (fcc == null)
       {
          delegate = (DelegateSupport)invocation.getTargetObject();
-         state = delegate.getState();
-      }
 
-      String methodName = ((MethodInvocation)invocation).getMethod().getName();
-      Sync writeLock =  rwLock.writeLock();
-      Sync readLock = rwLock.readLock();
-
-      if("closeValve".equals(methodName))
-      {
-         if (!valveOpen)
+         HierarchicalState hs = delegate.getState();
+         while (hs != null && !(hs instanceof ConnectionState))
          {
-            // valve already closed, this is a noop
-            log.warn(this + " already closed!");
-            return null;
+            hs = hs.getParent();
          }
 
-         state.closeChildrensValves();
+         fcc = ((ConnectionState)hs).getFailoverCommandCenter();
+         valve = fcc.getValve();
+      }
 
-         boolean acquired = false;
+      JMSRemotingConnection remotingConnection = null;
 
-         while(!acquired)
-         {
-            try
-            {
-               acquired = writeLock.attempt(500);
-            }
-            catch(InterruptedException e)
-            {
-               // OK
-            }
-
-            if (!acquired)
-            {
-               log.debug(this + " failed to close");
-            }
-         }
-
-         valveOpen = false;
-
-         log.debug(this + " has been closed");
-
-         return null;
-      }
-      else if("openValve".equals(methodName))
+      try
       {
-         if (valveOpen)
-         {
-            // valve already open, this is a noop
-            log.warn(this + " already open!");
-            return null;
-         }
+         valve.enter();
 
-         state.openChildrensValves();
-
-         writeLock.release();
-         valveOpen = true;
-
-         log.debug(this + " has been opened");
-
-         return null;
+         // it's important to retrieve the remotingConnection while inside the Valve, as there's
+         // guaranteed that no failover has happened yet
+         // guarantee that no failover has happened yet
+         remotingConnection = fcc.getRemotingConnection();
+         return invocation.invokeNext();
       }
-      else if("isValveOpen".equals(methodName))
+      catch (CannotConnectException e)
       {
-         if (valveOpen)
-         {
-            return Boolean.TRUE;
-         }
-         else
-         {
-            return Boolean.FALSE;
-         }
+         fcc.failureDetected(e, remotingConnection);
+         return invocation.invokeNext();
       }
-      else if("getActiveThreadsCount".equals(methodName))
+      catch (IOException e)
       {
-         return new Integer(activeThreadsCount);
-      }
-
-      // attempt to grab the reader's lock and go forward
-
-      boolean exempt = false;
-
-      try
-      {
-         exempt = isInvocationExempt(methodName);
-
-         if (!exempt)
-         {
-            boolean acquired = false;
-
-            while(!acquired)
-            {
-               try
-               {
-                  acquired = readLock.attempt(500);
-               }
-               catch(InterruptedException e)
-               {
-                  // OK
-               }
-
-               if (trace && !acquired ) { log.trace(methodName + "() trying to pass through " + this); }
-            }
-         }
-
-         synchronized(this)
-         {
-            activeThreadsCount++;
-            if (trace)
-            {
-               activeMethods.add(methodName);
-            }
-         }
-
-         if (trace) { log.trace(this + " allowed " + (exempt ? "exempt" : "") + " method " + methodName + "() to pass through"); }
-
+         fcc.failureDetected(e, remotingConnection);
          return invocation.invokeNext();
-
       }
-      catch(IOException e)
+      catch (Throwable e)
       {
-         // transport-level failure detected while being in the middle of an invocation
+         // not failover-triggering, rethrow
          throw e;
       }
       finally
       {
-         if (!exempt)
-         {
-            readLock.release();
-         }
-
-         synchronized(this)
-         {
-            activeThreadsCount--;
-            if (trace)
-            {
-               activeMethods.remove(methodName);
-            }
-         }
+         valve.leave();
       }
    }
 
@@ -231,10 +111,7 @@
 
    public String toString()
    {
-      return "FailoverValve." +
-         (delegate == null ? "UNITIALIZED" : delegate.toString()) +
-         (valveOpen ? "[OPEN(" + activeThreadsCount +
-            (trace ? " " + activeMethods.toString() : "") + ")]":"[CLOSED]");
+      return "FailoverValve." + (delegate == null ? "UNITIALIZED" : delegate.toString());
    }
 
    // Package protected ----------------------------------------------------------------------------
@@ -243,10 +120,5 @@
 
    // Private --------------------------------------------------------------------------------------
 
-   private boolean isInvocationExempt(String methodName)
-   {
-      return "recoverDeliveries".equals(methodName);
-   }
-
    // Inner classes --------------------------------------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -136,42 +136,6 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void closeValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void openValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public boolean isValveOpen()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public int getActiveThreadsCount()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    public String getStackName()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -268,50 +268,6 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   /**
-    * This invocation should be handled by the client-side interceptor chain.
-    */
-   public void performFailover()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void closeValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void openValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public boolean isValveOpen()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public int getActiveThreadsCount()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    public void init()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -191,42 +191,6 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void closeValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void openValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public boolean isValveOpen()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public int getActiveThreadsCount()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    public String toString()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -230,42 +230,6 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void closeValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void openValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public boolean isValveOpen()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public int getActiveThreadsCount()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    public String toString()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -466,42 +466,6 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void closeValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void openValve()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public boolean isValveOpen()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public int getActiveThreadsCount()
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    public String toString()

Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -22,16 +22,13 @@
 package org.jboss.jms.client.state;
 
 import java.util.HashSet;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.Iterator;
 
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.client.FailoverListener;
+import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.message.MessageIdGenerator;
@@ -63,8 +60,6 @@
 
    // Static ---------------------------------------------------------------------------------------
 
-   private static boolean trace = log.isTraceEnabled();
-
    // Attributes -----------------------------------------------------------------------------------
 
    private int serverID;
@@ -91,11 +86,11 @@
    // connection on a different node
    private transient String password;
 
-   private List failoverListeners;
-
    // needed to try re-creating connection in case failure is detected on the current connection
    private ConnectionFactoryDelegate clusteredConnectionFactoryDelegate;
 
+   private FailoverCommandCenter fcc;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ConnectionState(int serverID, ConnectionDelegate delegate,
@@ -121,7 +116,7 @@
       this.idGenerator = gen;
       this.serverID = serverID;
 
-      failoverListeners = new ArrayList();
+      fcc = new FailoverCommandCenter(this);
    }
 
    // HierarchicalState implementation -------------------------------------------------------------
@@ -250,48 +245,6 @@
       this.justCreated = justCreated;
    }
 
-   public void broadcastFailoverEvent(FailoverEvent e)
-   {
-      if (trace) { log.trace(this + " broadcasting " + e); }
-
-      List listenersCopy;
-
-      synchronized(failoverListeners)
-      {
-         listenersCopy = new ArrayList(failoverListeners);
-      }
-
-      for(Iterator i = listenersCopy.iterator(); i.hasNext(); )
-      {
-         FailoverListener listener = (FailoverListener)i.next();
-
-         try
-         {
-            listener.failoverEventOccured(e);
-         }
-         catch(Exception ex)
-         {
-            log.warn("Failover listener " + listener + " did not accept event", ex);
-         }
-      }
-   }
-
-   public void registerFailoverListener(FailoverListener listener)
-   {
-      synchronized(failoverListeners)
-      {
-         failoverListeners.add(listener);
-      }
-   }
-
-   public boolean unregisterFailoverListener(FailoverListener listener)
-   {
-      synchronized(failoverListeners)
-      {
-         return failoverListeners.remove(listener);
-      }
-   }
-
    public void setClusteredConnectionFactoryDeleage(ConnectionFactoryDelegate d)
    {
       this.clusteredConnectionFactoryDelegate = d;
@@ -302,6 +255,11 @@
       return clusteredConnectionFactoryDelegate;
    }
 
+   public FailoverCommandCenter getFailoverCommandCenter()
+   {
+      return fcc;
+   }
+
    public String toString()
    {
       return "ConnectionState[" + ((ClientConnectionDelegate)delegate).getID() + "]";

Modified: trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -52,18 +52,6 @@
    Version getVersionToUse();
 
    /**
-    * Closes children's failover valves, by sending closeValve() invocations down children's
-    * delegate stack. It is NOT intended to be recursive, unless the children chose so.
-    */
-   void closeChildrensValves() throws Exception;
-
-   /**
-    * Opens children's failover valves, by sending openValve() invocations down children's
-    * delegate stack. It is NOT intended to be recursive, unless the children chose so.
-    */
-   void openChildrensValves() throws Exception;
-
-   /**
     * Update my own state based on the new state.
     */
    void synchronizeWith(HierarchicalState newState) throws Exception;

Modified: trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -22,10 +22,8 @@
 package org.jboss.jms.client.state;
 
 import java.util.Set;
-import java.util.Iterator;
 
 import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.Valve;
 
 /**
  * Base implementation of HierarchicalState.
@@ -76,34 +74,6 @@
       return children;
    }
 
-   public void closeChildrensValves() throws Exception
-   {
-      if (children == null)
-      {
-         return;
-      }
-
-      for(Iterator i = children.iterator(); i.hasNext(); )
-      {
-         HierarchicalState s = (HierarchicalState)i.next();
-         ((Valve)s.getDelegate()).closeValve();
-      }
-   }
-
-   public void openChildrensValves() throws Exception
-   {
-      if (children == null)
-      {
-         return;
-      }
-
-      for(Iterator i = children.iterator(); i.hasNext(); )
-      {
-         HierarchicalState s = (HierarchicalState)i.next();
-         ((Valve)s.getDelegate()).openValve();
-      }
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -22,7 +22,6 @@
 package org.jboss.jms.delegate;
 
 import org.jboss.jms.server.endpoint.BrowserEndpoint;
-import org.jboss.jms.client.Valve;
 
 /**
  * Represents the minimal set of operations to provide browser
@@ -33,7 +32,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  */
-public interface BrowserDelegate extends Valve, BrowserEndpoint
+public interface BrowserDelegate extends BrowserEndpoint
 {
 }
    

Modified: trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -29,7 +29,6 @@
 
 import org.jboss.jms.client.JBossConnectionConsumer;
 import org.jboss.jms.client.FailoverListener;
-import org.jboss.jms.client.Valve;
 import org.jboss.jms.server.endpoint.ConnectionEndpoint;
 
 /**
@@ -42,7 +41,7 @@
  *
  * $Id$
  */
-public interface ConnectionDelegate extends Valve, ConnectionEndpoint
+public interface ConnectionDelegate extends ConnectionEndpoint
 {      
    ExceptionListener getExceptionListener() throws JMSException;
    
@@ -59,6 +58,4 @@
    void registerFailoverListener(FailoverListener failoverListener);
    boolean unregisterFailoverListener(FailoverListener failoverListener);
 
-   void performFailover() throws Exception;
-
 }

Modified: trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -27,7 +27,6 @@
 
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.server.endpoint.ConsumerEndpoint;
-import org.jboss.jms.client.Valve;
 
 /**
  * Represents the minimal set of operations to provide consumer
@@ -41,7 +40,7 @@
  *
  * $Id$
  */
-public interface ConsumerDelegate extends Valve, ConsumerEndpoint
+public interface ConsumerDelegate extends ConsumerEndpoint
 {
    MessageListener getMessageListener();
 

Modified: trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -25,7 +25,6 @@
 import javax.jms.Message;
 
 import org.jboss.jms.client.Closeable;
-import org.jboss.jms.client.Valve;
 import org.jboss.jms.destination.JBossDestination;
 
 /**
@@ -35,7 +34,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  */
-public interface ProducerDelegate extends Valve, Closeable
+public interface ProducerDelegate extends Closeable
 {
    void setDisableMessageID(boolean value) throws JMSException;
    

Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -37,7 +37,6 @@
 import org.jboss.jms.message.TextMessageProxy;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.jms.server.endpoint.SessionEndpoint;
-import org.jboss.jms.client.Valve;
 
 /**
  * Represents the minimal set of operations to provide session functionality.
@@ -49,7 +48,7 @@
  *
  * $Id$
  */
-public interface SessionDelegate extends Valve, SessionEndpoint
+public interface SessionDelegate extends SessionEndpoint
 {
    MessageProxy createMessage() throws JMSException;
 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -2178,7 +2178,7 @@
             }
          }
 
-         log.info(this + ": server side fail over is now complete");
+         log.debug(this + " finished to fail over destinations");
 
          //TODO - should this be in a finally? I'm not sure
          status.finishFailingOver();
@@ -2190,6 +2190,8 @@
          log.debug(this + " announced the cluster that failover procedure is complete");
 
          sendJMXNotification(FAILOVER_COMPLETED_NOTIFICATION);
+
+         log.info(this + ": server side fail over is now complete");
       }
       finally
       {

Deleted: trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -1,909 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.jms;
-
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.jms.client.Valve;
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossQueueBrowser;
-import org.jboss.jms.client.JBossMessageProducer;
-import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.state.HierarchicalState;
-import org.jboss.jms.client.state.HierarchicalStateSupport;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.container.FailoverValveInterceptor;
-import org.jboss.jms.server.Version;
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.aop.MethodInfo;
-import org.jboss.aop.util.MethodHashing;
-import org.jboss.aop.advice.Interceptor;
-import org.jboss.remoting.Client;
-
-import javax.naming.InitialContext;
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.ConnectionFactory;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.QueueBrowser;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.Iterator;
-
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.Slot;
-
-/**
- * This is not necessarily a clustering test, as the failover valves are installed even for a
- * non-clustered configuration.
- *
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1843 $</tt>
- *
- * $Id: JMSTest.java 1843 2006-12-21 23:41:19Z timfox $
- */
-public class FailoverValveTest extends MessagingTestCase
-{
-   // Constants ------------------------------------------------------------------------------------
-
-   // Static ---------------------------------------------------------------------------------------
-
-   // Attributes -----------------------------------------------------------------------------------
-
-   private InitialContext ic;
-   private ConnectionFactory cf;
-   private Queue queue;
-
-   // Constructors ---------------------------------------------------------------------------------
-
-   public FailoverValveTest(String name)
-   {
-      super(name);
-   }
-
-   // Public ---------------------------------------------------------------------------------------
-
-   public void testCloseValveHierarchy() throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         conn = cf.createConnection();
-
-         assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
-         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         assertTrue(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
-
-         MessageProducer prod = session.createProducer(queue);
-         assertTrue(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
-
-         MessageConsumer cons = session.createConsumer(queue);
-         assertTrue(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
-
-         QueueBrowser browser = session.createBrowser(queue);
-         assertTrue(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
-
-         ((JBossConnection)conn).getDelegate().closeValve();
-
-         log.debug("top level valve closed");
-
-         assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
-
-         ((JBossConnection)conn).getDelegate().openValve();
-
-         log.debug("top level valve open");
-
-         assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-
-   }
-
-   public void testCloseValveHierarchy2() throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         conn = cf.createConnection();
-
-         assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
-         Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-
-         MessageProducer prod1 = session1.createProducer(queue);
-         assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-
-         MessageConsumer cons1 = session1.createConsumer(queue);
-         assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-
-         QueueBrowser browser1 = session1.createBrowser(queue);
-         assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-
-         MessageProducer prod2 = session2.createProducer(queue);
-         assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-
-         MessageConsumer cons2 = session2.createConsumer(queue);
-         assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-
-         QueueBrowser browser2 = session2.createBrowser(queue);
-         assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
-         ((JBossConnection)conn).getDelegate().closeValve();
-
-         log.debug("top level valve closed");
-
-         assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
-         ((JBossConnection)conn).getDelegate().openValve();
-
-         log.debug("top level valve open");
-
-         assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-   public void testValveOpenByDefault() throws Throwable
-   {
-      FailoverValveInterceptor valve = new FailoverValveInterceptor();
-      SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
-      assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
-      // try to open the valve again, it should be a noop
-      valve.invoke(buildInvocation("openValve", target));
-
-      assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-   }
-
-   public void testPassThroughOpenValve() throws Throwable
-   {
-      final FailoverValveInterceptor valve = new FailoverValveInterceptor();
-      final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
-      // send a thread through the open valve
-      Thread t = new Thread(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               valve.invoke(buildInvocation("businessMethod1", target));
-            }
-            catch(Throwable t)
-            {
-               log.error("businessMethod1 invocation failed", t);
-            }
-         }
-      }, "Business Thread 1");
-
-      t.start();
-      t.join();
-
-      // the thread should have passed through the open valve
-      assertEquals(1, target.getBusinessMethod1InvocationCount());
-      assertEquals(1, target.getActiveThreadsHighWaterMark());
-
-   }
-
-   public void testFlipValve() throws Throwable
-   {
-      FailoverValveInterceptor valve = new FailoverValveInterceptor();
-      SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
-      valve.invoke(buildInvocation("closeValve", target));
-
-      assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
-      // child states also must have been notified to close their corresponding valves
-      ParentHierarchicalState state = (ParentHierarchicalState)target.getState();
-
-      for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
-      {
-         ChildState cs = (ChildState)i.next();
-         assertFalse(((Valve)cs.getDelegate()).isValveOpen());
-      }
-
-      // try to close the valve again, it should be a noop
-
-      valve.invoke(buildInvocation("closeValve", target));
-
-      assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-   }
-
-   public void testFlipValve2() throws Throwable
-   {
-      FailoverValveInterceptor valve = new FailoverValveInterceptor();
-      SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
-      valve.invoke(buildInvocation("closeValve", target));
-
-      assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
-      // child states also must have been notified to close their corresponding valves
-      ParentHierarchicalState state = (ParentHierarchicalState)target.getState();
-
-      for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
-      {
-         ChildState cs = (ChildState)i.next();
-         assertFalse(((Valve)cs.getDelegate()).isValveOpen());
-      }
-
-      // re-open the valve
-
-      valve.invoke(buildInvocation("openValve", target));
-
-      assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
-      // child states also must have been notified to open their corresponding valves
-      state = (ParentHierarchicalState)target.getState();
-
-      for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
-      {
-         ChildState cs = (ChildState)i.next();
-         assertTrue(((Valve)cs.getDelegate()).isValveOpen());
-      }
-   }
-
-   /**
-    * Close the valve and send a thread through it. The thread must be put on hold until the valve
-    * is opened again.
-    */
-   public void testCloseValveOneBusinessThread() throws Throwable
-   {
-      final FailoverValveInterceptor valve = new FailoverValveInterceptor();
-      final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
-      valve.invoke(buildInvocation("closeValve", target));
-
-      assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-
-      log.debug("the valve is closed");
-
-      // smack a thread into the closed valve
-      Thread t = new Thread(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               valve.invoke(buildInvocation("businessMethod1", target));
-            }
-            catch(Throwable t)
-            {
-               log.error("businessMethod1 invocation failed", t);
-            }
-         }
-      }, "Business Thread 1");
-
-      t.start();
-
-      // wait for 10 secs for the invocation to reach target object. It shouldn't ...
-      long waitTime = 10000;
-      log.info("probing target for " + waitTime / 1000 + " seconds ...");
-      InvocationToken arrived = target.waitForInvocation(waitTime);
-
-      if (arrived != null)
-      {
-         fail(arrived.getMethodName() + "() reached target, while it shouldn't have!");
-      }
-
-
-      log.debug("the business thread didn't go through");
-
-      // open the valve
-
-      valve.invoke(buildInvocation("openValve", target));
-
-      // the business invocation should complete almost immediately; wait on mutex to avoid race
-      // condition
-
-      arrived = target.waitForInvocation(2000);
-
-      assertEquals("businessMethod1", arrived.getMethodName());
-      assertEquals(1, target.getBusinessMethod1InvocationCount());
-      assertEquals(1, target.getActiveThreadsHighWaterMark());
-   }
-
-
-   /**
-    * Close the valve and send three threads through it. The threads must be put on hold until the
-    * valve is opened again.
-    */
-   public void testCloseValveThreeBusinessThread() throws Throwable
-   {
-      final FailoverValveInterceptor valve = new FailoverValveInterceptor();
-      final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
-      valve.invoke(buildInvocation("closeValve", target));
-
-      assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-
-      log.debug("the valve is closed");
-
-      // smack thread 1 into the closed valve
-      new Thread(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               valve.invoke(buildInvocation("businessMethod1", target));
-            }
-            catch(Throwable t)
-            {
-               log.error("businessMethod1 invocation failed", t);
-            }
-         }
-      }, "Business Thread 1").start();
-
-      // smack thread 2 into the closed valve
-      new Thread(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               valve.invoke(buildInvocation("businessMethod1", target));
-            }
-            catch(Throwable t)
-            {
-               log.error("businessMethod1 invocation failed", t);
-            }
-         }
-      }, "Business Thread 2").start();
-
-      // smack thread 3 into the closed valve
-      new Thread(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               valve.invoke(buildInvocation("businessMethod2", target));
-            }
-            catch(Throwable t)
-            {
-               log.error("businessMethod2 invocation failed", t);
-            }
-         }
-      }, "Business Thread 3").start();
-
-      // wait for 10 secs for any invocation to reach target object. It shouldn't ...
-      long waitTime = 10000;
-      log.info("probing target for " + waitTime / 1000 + " seconds ...");
-      InvocationToken arrived = target.waitForInvocation(waitTime);
-
-      if (arrived != null)
-      {
-         fail(arrived.getMethodName() + "() reached target, while it shouldn't have!");
-      }
-
-      log.debug("the business threads didn't go through");
-
-      // open the valve
-
-      valve.invoke(buildInvocation("openValve", target));
-
-      // the business invocations should complete almost immediately; wait on mutex to avoid race
-      // condition
-
-      arrived = target.waitForInvocation(2000);
-      assertNotNull(arrived);
-
-      arrived = target.waitForInvocation(2000);
-      assertNotNull(arrived);
-
-      arrived = target.waitForInvocation(2000);
-      assertNotNull(arrived);
-
-      // wait for 3 secs for any invocation to reach target object. It shouldn't ...
-      waitTime = 3000;
-      log.info("probing target for " + waitTime / 1000 + " seconds ...");
-      arrived = target.waitForInvocation(waitTime);
-
-      if (arrived != null)
-      {
-         fail("Extra " + arrived.getMethodName() + "() reached target, " +
-            "while it shouldn't have!");
-      }
-
-      assertEquals(2, target.getBusinessMethod1InvocationCount());
-      assertEquals(1, target.getBusinessMethod2InvocationCount());
-      assertEquals(3, target.getActiveThreadsHighWaterMark());
-   }
-
-   /**
-    * The current standard behavior is that the valve cannot be closed as long as there are
-    * active threads. closeValve() will block undefinitely.
-    */
-   public void testCloseWhileActiveThreads() throws Throwable
-   {
-      final FailoverValveInterceptor valve = new FailoverValveInterceptor();
-      final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
-      assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
-
-      // send a long running thread through the valve
-      new Thread(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               valve.invoke(buildInvocation("blockingBusinessMethod", target));
-            }
-            catch(Throwable t)
-            {
-               log.error("blockingBusinessMethod invocation failed", t);
-            }
-         }
-      }, "Long running business thread").start();
-
-      // allow blockingBusinessMethod time to block
-      Thread.sleep(2000);
-
-      assertEquals(new Integer(1), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
-      final Slot closingCompleted = new Slot();
-
-      // from a different thread try to close the valve; this thread will block until we unblock
-      // the business method
-
-      new Thread(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               valve.invoke(buildInvocation("closeValve", target));
-               closingCompleted.put(Boolean.TRUE);
-
-            }
-            catch(Throwable t)
-            {
-               log.error("blockingBusinessMethod() invocation failed", t);
-            }
-         }
-      }, "Valve closing thread").start();
-
-
-      // closing shouldn't be completed for a long time .... actually never, if I don't unblock
-      // the business method
-
-      // wait for 15 secs for closing. It shouldn't ...
-      long waitTime = 15000;
-      log.info("probing closing for " + waitTime / 1000 + " seconds ...");
-
-      Boolean closed = (Boolean)closingCompleted.poll(waitTime);
-
-      if (closed != null)
-      {
-         fail("closeValve() went through, while it shouldn't have!");
-      }
-
-      assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(1), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
-      log.info("valve still open ...");
-
-      // unblock blockingBusinessMethod
-      target.unblockBlockingBusinessMethod();
-
-      // valve closing should complete immediately after that
-      closed = (Boolean)closingCompleted.poll(1000);
-      assertTrue(closed.booleanValue());
-      assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-      assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-   }
-
-   // Package protected ----------------------------------------------------------------------------
-
-   // Protected ------------------------------------------------------------------------------------
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      ServerManagement.start("all");
-
-      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-
-      ServerManagement.deployQueue("TestQueue");
-
-      cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-      queue = (Queue)ic.lookup("/queue/TestQueue");
-
-      log.debug("setup done");
-   }
-
-   protected void tearDown() throws Exception
-   {
-      ServerManagement.undeployQueue("TestQueue");
-
-      ic.close();
-
-      super.tearDown();
-   }
-
-   // Private --------------------------------------------------------------------------------------
-
-   private Invocation buildInvocation(String methodName, Object targetObject)
-      throws Exception
-   {
-      long hash =
-         MethodHashing.calculateHash(targetObject.getClass().getMethod(methodName, new Class[0]));
-      MethodInfo mi = new MethodInfo(targetObject.getClass(), hash, hash, null);
-      MethodInvocation invocation = new MethodInvocation(mi, new Interceptor[0]);
-      invocation.setTargetObject(targetObject);
-      return invocation;
-   }
-
-   // Inner classes --------------------------------------------------------------------------------
-
-   public interface BusinessObject
-   {
-      void businessMethod1();
-      void businessMethod2();
-      /**
-       * Must be unblocked externally.
-       */
-      void blockingBusinessMethod() throws InterruptedException;
-   }
-
-   public class SimpleInvocationTargetObject
-      extends DelegateSupport implements BusinessObject, Valve
-   {
-      private int businessMethod1InvocationCount;
-      private int businessMethod2InvocationCount;
-
-      // LinkedQueue<InvocationToken>
-      private LinkedQueue invocationTokens;
-
-      private FailoverValveInterceptor valve;
-      private int activeThreadsCountHighWaterMark;
-
-      private Object blockingMethodWaitArea;
-
-      public SimpleInvocationTargetObject(FailoverValveInterceptor valve)
-      {
-         super();
-         setState(new ParentHierarchicalState());
-         businessMethod1InvocationCount = 0;
-         businessMethod2InvocationCount = 0;
-         invocationTokens = new LinkedQueue();
-         this.valve = valve;
-         activeThreadsCountHighWaterMark = 0;
-         blockingMethodWaitArea = new Object();
-      }
-
-      protected Client getClient() throws Exception
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public boolean isValveOpen()
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public void closeValve() throws Exception
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public void openValve() throws Exception
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public int getActiveThreadsCount()
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public synchronized void businessMethod1()
-      {
-         businessMethod1InvocationCount++;
-         updateActiveThreadsCountHighWaterMark();
-         notifyInvocationWaiter("businessMethod1");
-      }
-
-      public synchronized void businessMethod2()
-      {
-         businessMethod2InvocationCount++;
-         updateActiveThreadsCountHighWaterMark();
-         notifyInvocationWaiter("businessMethod2");
-      }
-
-      public void blockingBusinessMethod() throws InterruptedException
-      {
-         synchronized(blockingMethodWaitArea)
-         {
-            log.info("blockingBusinessMethod() blocking ...");
-            // block calling thread undefinitely until blockingMethodWaitArea is notified
-            blockingMethodWaitArea.wait();
-            log.info("blockingBusinessMethod() unblocked");
-         }
-      }
-
-      public synchronized int getBusinessMethod1InvocationCount()
-      {
-         return businessMethod1InvocationCount;
-      }
-
-      public synchronized int getBusinessMethod2InvocationCount()
-      {
-         return businessMethod2InvocationCount;
-      }
-
-      public synchronized int getActiveThreadsHighWaterMark()
-      {
-         return activeThreadsCountHighWaterMark;
-      }
-
-      /**
-       * Block until an invocation arrives into the target. If the invocation arrived prior to
-       * calling this method, it returns immediately.
-       *
-       * @return a token corresponding to the business invocation, or null if the method exited with
-       *         timout, without an invocation to arive.
-       */
-      public InvocationToken waitForInvocation(long timeout) throws InterruptedException
-      {
-         return (InvocationToken)invocationTokens.poll(timeout);
-      }
-
-      public void unblockBlockingBusinessMethod()
-      {
-         synchronized(blockingMethodWaitArea)
-         {
-            blockingMethodWaitArea.notify();
-         }
-      }
-
-      /**
-       * Reset the state
-       */
-      public synchronized void reset()
-      {
-         businessMethod1InvocationCount = 0;
-         businessMethod2InvocationCount = 0;
-         activeThreadsCountHighWaterMark = 0;
-      }
-
-      /**
-       * Notify someone who waits for this invocation to arrive.
-       */
-      private void notifyInvocationWaiter(String methodName)
-      {
-         try
-         {
-            invocationTokens.put(new InvocationToken(methodName));
-         }
-         catch(InterruptedException e)
-         {
-            throw new RuntimeException("Failed to deposit notification in queue", e);
-         }
-      }
-
-      private synchronized void updateActiveThreadsCountHighWaterMark()
-      {
-         try
-         {
-            int c =
-               ((Integer)valve.invoke(buildInvocation("getActiveThreadsCount", this))).intValue();
-            if (c > activeThreadsCountHighWaterMark)
-            {
-               activeThreadsCountHighWaterMark = c;
-            }
-         }
-         catch(Throwable t)
-         {
-            throw new RuntimeException("Failed to get getActiveThreadsCount", t);
-         }
-      }
-   }
-
-   private class ParentHierarchicalState extends HierarchicalStateSupport
-   {
-      private DelegateSupport delegate;
-      private HierarchicalState parent;
-
-      ParentHierarchicalState()
-      {
-         super(null, null);
-
-         children = new HashSet();
-
-         // populate it with a child state
-         children.add(new ChildState());
-      }
-
-      public DelegateSupport getDelegate()
-      {
-         return delegate;
-      }
-
-      public void setDelegate(DelegateSupport delegate)
-      {
-         this.delegate = delegate;
-      }
-
-      public HierarchicalState getParent()
-      {
-         return parent;
-      }
-
-      public void setParent(HierarchicalState parent)
-      {
-         this.parent = parent;
-      }
-
-      public Version getVersionToUse()
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public void synchronizeWith(HierarchicalState newState) throws Exception
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-   }
-
-   private class ChildState extends HierarchicalStateSupport
-   {
-      private DelegateSupport delegate;
-      private HierarchicalState parent;
-
-      ChildState()
-      {
-         super(null, new ChildDelegate());
-      }
-
-      public Set getChildren()
-      {
-         return Collections.EMPTY_SET;
-      }
-
-      public DelegateSupport getDelegate()
-      {
-         return delegate;
-      }
-
-      public void setDelegate(DelegateSupport delegate)
-      {
-         this.delegate = delegate;
-      }
-
-      public HierarchicalState getParent()
-      {
-         return parent;
-      }
-
-      public void setParent(HierarchicalState parent)
-      {
-         this.parent = parent;
-      }
-
-      public Version getVersionToUse()
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public void synchronizeWith(HierarchicalState newState) throws Exception
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-   }
-
-   private class ChildDelegate extends DelegateSupport implements Valve
-   {
-      private boolean valveOpen;
-
-      ChildDelegate()
-      {
-         valveOpen = true;
-      }
-
-      protected Client getClient() throws Exception
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-
-      public synchronized void closeValve() throws Exception
-      {
-         valveOpen = false;
-      }
-
-      public void openValve() throws Exception
-      {
-         valveOpen = true;
-      }
-
-      public boolean isValveOpen()
-      {
-         return valveOpen;
-      }
-
-      public int getActiveThreadsCount()
-      {
-         throw new RuntimeException("NOT YET IMPLEMENTED");
-      }
-   }
-
-   private class InvocationToken
-   {
-      private String methodName;
-
-      public InvocationToken(String methodName)
-      {
-         this.methodName = methodName;
-      }
-
-      public String getMethodName()
-      {
-         return methodName;
-      }
-   }
-
-}

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -11,15 +11,8 @@
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.FailoverListener;
 import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.client.Valve;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossQueueBrowser;
-import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.JBossMessageProducer;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 
 import javax.jms.Connection;
 import javax.jms.Session;
@@ -1377,77 +1370,6 @@
       }
    }
 
-   public void testCloseValveHierarchy() throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         conn = cf.createConnection();
-
-         assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
-         Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-
-         MessageProducer prod1 = session1.createProducer(queue[0]);
-         assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-
-         MessageConsumer cons1 = session1.createConsumer(queue[0]);
-         assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-
-         QueueBrowser browser1 = session1.createBrowser(queue[0]);
-         assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-
-         MessageProducer prod2 = session2.createProducer(queue[0]);
-         assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-
-         MessageConsumer cons2 = session2.createConsumer(queue[0]);
-         assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-
-         QueueBrowser browser2 = session2.createBrowser(queue[0]);
-         assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
-         ((JBossConnection)conn).getDelegate().closeValve();
-
-         log.debug("top level valve closed");
-
-         assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-         assertFalse(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
-         ((JBossConnection)conn).getDelegate().openValve();
-
-         log.debug("top level valve open");
-
-         assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-         assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
    public void testFailoverMessageOnServer() throws Exception
    {
       Connection conn = null;
@@ -1572,42 +1494,6 @@
       }
    }
 
-   public void testTemp() throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         conn = cf.createConnection();
-         conn.close();
-
-         conn = cf.createConnection();
-
-         assertEquals(1, ((JBossConnection)conn).getServerID());
-
-         // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
-         // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
-         JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
-            getDelegate()).getRemotingConnection();
-         rc.removeConnectionListener();
-
-         ServerManagement.killAndWait(1);
-
-         log.info("########");
-         log.info("######## KILLED NODE 1");
-         log.info("########");
-
-         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
    public void testSimpleFailover() throws Exception
    {
       Connection conn = null;
@@ -1667,6 +1553,42 @@
       }
    }
 
+//   public void testTemp() throws Exception
+//   {
+//      Connection conn = null;
+//
+//      try
+//      {
+//         conn = cf.createConnection();
+//         conn.close();
+//
+//         conn = cf.createConnection();
+//
+//         assertEquals(1, ((JBossConnection)conn).getServerID());
+//
+//         // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+//         // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+//         JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+//            getDelegate()).getRemotingConnection();
+//         rc.removeConnectionListener();
+//
+//         ServerManagement.killAndWait(1);
+//
+//         log.info("########");
+//         log.info("######## KILLED NODE 1");
+//         log.info("########");
+//
+//         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//      }
+//      finally
+//      {
+//         if (conn != null)
+//         {
+//            conn.close();
+//         }
+//      }
+//   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2007-01-09 11:36:35 UTC (rev 1928)
@@ -1161,13 +1161,15 @@
       //is always currently used - (we could make this configurable)
 
       String transport = config.getRemotingTransport();
+
+      long clientLeasePeriod = 20000;
       
       String params = "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
                       "unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
                       "serializationtype=" + serializationType + "&" +
                       "dataType=jms&" +
                       "socket.check_connection=false&" +
-                      "clientLeasePeriod=20000&" +
+                      "clientLeasePeriod=" + clientLeasePeriod + "&" +
                       "callbackStore=org.jboss.remoting.callback.BlockingCallbackStore&" +
                       "clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&" +
                       "serverSocketClass=org.jboss.jms.server.remoting.ServerSocketWrapper";




More information about the jboss-cvs-commits mailing list