[jboss-cvs] JBoss Messaging SVN: r1928 - in trunk: src/etc src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools/jmx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 9 06:36:51 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-09 06:36:35 -0500 (Tue, 09 Jan 2007)
New Revision: 1928
Added:
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/FailoverValve.java
Removed:
trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java
trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java
Modified:
trunk/src/etc/aop-messaging-client.xml
trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java
trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Using singleton valve on failover (Clebert's work).
http://jira.jboss.org/jira/browse/JBMESSAGING-712
http://jira.jboss.org/jira/browse/JBMESSAGING-719
Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/etc/aop-messaging-client.xml 2007-01-09 11:36:35 UTC (rev 1928)
@@ -19,7 +19,6 @@
<aspect class="org.jboss.jms.client.container.ConnectionAspect" scope="PER_INSTANCE"/>
<aspect class="org.jboss.jms.client.container.ClusteringAspect" scope="PER_INSTANCE"/>
<aspect class="org.jboss.jms.client.container.FactoryAspect" scope="PER_INSTANCE"/>
- <aspect class="org.jboss.jms.client.container.FailoverAspect" scope="PER_VM"/>
<!--
Clustered ConnectionFactory Stack
@@ -49,10 +48,6 @@
Connection Stack
-->
- <!-- It is important that FailoverAspect intercepts performFailover() before FailoverValveInterceptor -->
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->performFailover())">
- <advice name="handlePerformFailover" aspect="org.jboss.jms.client.container.FailoverAspect"/>
- </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.delegate.ConnectionDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
Added: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -0,0 +1,195 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client;
+
+import org.jboss.logging.Logger;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * The class in charge with performing the failover.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class FailoverCommandCenter
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(FailoverCommandCenter.class);
+
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private ConnectionState state;
+
+ private FailoverValve valve;
+
+ private List failoverListeners;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public FailoverCommandCenter(ConnectionState state)
+ {
+ this.state = state;
+ failoverListeners = new ArrayList();
+ valve = new FailoverValve();
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ /**
+ * Method called by failure detection components (FailoverValveInterceptors and
+ * ConnectionListeners) when they have reasons to belive that a server failure occured.
+ */
+ public void failureDetected(Throwable reason, JMSRemotingConnection remotingConnection)
+ throws Exception
+ {
+ // generate a FAILURE_DETECTED event
+ broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILURE_DETECTED, this));
+
+ log.debug(this + " initiating client-side failover");
+
+ CreateConnectionResult res = null;
+ boolean failoverSuccessful = false;
+
+ try
+ {
+ // block any other invocations ariving to any delegate from the hierarchy while we're
+ // doing failover
+
+ valve.close();
+
+ if (remotingConnection != null)
+ {
+ if (remotingConnection.isFailed())
+ {
+ log.debug(this + " ignoring failure detection notification, as failover was " +
+ "already performed on this connection");
+ }
+ remotingConnection.setFailed(true);
+ }
+
+ // generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
+ // to insure the client-side stack is in a deterministic state
+ broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));
+
+ int failedNodeID = state.getServerID();
+ ConnectionFactoryDelegate clusteredDelegate =
+ state.getClusteredConnectionFactoryDelegate();
+
+ // re-try creating the connection
+ res = clusteredDelegate.
+ createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
+
+ if (res == null)
+ {
+ // No failover attempt was detected on the server side; this might happen if the
+ // client side network fails temporarily so the client connection breaks but the
+ // server cluster is still up and running - in this case we don't perform failover.
+ failoverSuccessful = false;
+ }
+ else
+ {
+ // recursively synchronize state
+ ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
+ state.getDelegate().synchronizeWith(newDelegate);
+ failoverSuccessful = true;
+ }
+ }
+ finally
+ {
+ if (failoverSuccessful)
+ {
+ broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_COMPLETED, this));
+ }
+ else
+ {
+ broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED, this));
+ }
+
+ valve.open();
+ }
+ }
+
+ public void registerFailoverListener(FailoverListener listener)
+ {
+ synchronized(failoverListeners)
+ {
+ failoverListeners.add(listener);
+ }
+ }
+
+ public boolean unregisterFailoverListener(FailoverListener listener)
+ {
+ synchronized(failoverListeners)
+ {
+ return failoverListeners.remove(listener);
+ }
+ }
+
+ public FailoverValve getValve()
+ {
+ return valve;
+ }
+
+ public JMSRemotingConnection getRemotingConnection()
+ {
+ return state.getRemotingConnection();
+ }
+
+ public String toString()
+ {
+ return "FailoverCommandCenter[" + state + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ private void broadcastFailoverEvent(FailoverEvent e)
+ {
+ if (trace) { log.trace(this + " broadcasting " + e); }
+
+ List listenersCopy;
+
+ synchronized(failoverListeners)
+ {
+ listenersCopy = new ArrayList(failoverListeners);
+ }
+
+ for(Iterator i = listenersCopy.iterator(); i.hasNext(); )
+ {
+ FailoverListener listener = (FailoverListener)i.next();
+
+ try
+ {
+ listener.failoverEventOccured(e);
+ }
+ catch(Exception ex)
+ {
+ log.warn("Failover listener " + listener + " did not accept event", ex);
+ }
+ }
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
+}
Added: trunk/src/main/org/jboss/jms/client/FailoverValve.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -0,0 +1,297 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client;
+
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+
+import java.util.Map;
+import java.util.Stack;
+import java.util.Iterator;
+import java.io.StringWriter;
+import java.io.PrintWriter;
+
+/**
+ * The valve will block any call as long as it is closed.
+ *
+ * Usage: call enter() when performing a regular call and leave() in a finally block. Call close()
+ * when performing a failover, and open() in a finally block.
+ *
+ * The class contains logic to avoid dead locks between multiple threads closing the valve at the
+ * same time, which uses referencing counting on a threadLocal variable. That's why it's very
+ * important to aways leave the valve in a finally block.
+ *
+ * This class also generate tracing information, to help debug situations like the case the valve
+ * can't be closed, but only if trace is enabled on log4j.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class FailoverValve
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(Valve.class);
+
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // we keep a ThreadLocal counter to help avoid deadlocks when multiple threads are closing
+ // the valve
+ private ThreadLocal counterLocal = new ThreadLocal();
+
+ private ReadWriteLock lock;
+
+ private int activeLocks = 0;
+ private int activeCloses = 0;
+
+ // these are only initialized if tracing is enabled
+ private ThreadLocal stackCloses;
+ private ThreadLocal stackEnters;
+
+ private Map debugCloses;
+ private Map debugEnters;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public FailoverValve()
+ {
+ // We're using reentrant locks because we will need to to acquire read locks after write locks
+ // have been already acquired. There is also a case when a readLock will be promoted to
+ // writeLock when a failover occurs; using reentrant locks will make this usage transparent
+ // for the API, we just close the valve and the read lock is promoted to write lock.
+ lock = new ReentrantWriterPreferenceReadWriteLock();
+
+ if (trace)
+ {
+ stackCloses = new ThreadLocal();
+ stackEnters = new ThreadLocal();
+ debugCloses = new ConcurrentHashMap();
+ debugEnters = new ConcurrentHashMap();
+ }
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void enter() throws InterruptedException
+ {
+ lock.readLock().acquire();
+
+ getCounter().counter++;
+
+ synchronized (this)
+ {
+ activeLocks++;
+ }
+
+ if (trace)
+ {
+ Exception ex = new Exception();
+ getStackEnters().push(ex);
+ debugEnters.put(ex, Thread.currentThread());
+ }
+ }
+
+ public void leave() throws InterruptedException
+ {
+ lock.readLock().release();
+
+ getCounter().counter--;
+
+ synchronized (this)
+ {
+ activeLocks--;
+
+ if (activeLocks < 0)
+ {
+ throw new IllegalStateException("leave() was called without a prior enter() call");
+ }
+ }
+
+ if (trace)
+ {
+ Exception ex = (Exception) getStackEnters().pop();
+ debugEnters.remove(ex);
+ }
+ }
+
+ public void close() throws InterruptedException
+ {
+ // Before assuming a write lock, we need to release reentrant read locks.
+ // When simultaneous threads are closing a valve (as simultaneous threads are capturing a
+ // failure) we won't be able to close the valve until all the readLocks are released. This
+ // release routine will be able to resolve the deadlock while we still guarantee the unicity
+ // of the lock. The useCase for this is when a failure is captured when a thread is already
+ // holding a read-lock. For example if a failure happens when sending ACKs, the valve will be
+ // already hold on receiveMessage, while the sendACK will be trying to close the Valve. This
+ // wouldn't be a problem if we had only single threads but the problem is we will be waiting
+ // on a readLock on another thread that might also be waiting to close the valve as fail event
+ // will be captured by multiple threads. So, in summary we need to completely leave the valve
+ // before closing it or a dead lock will happen if multiple threads are closing the valve at
+ // same time waiting on each others readLocks before acquiring a writeLock.
+ int counter = getCounter().counter;
+
+ for (int i = 0; i < counter; i++)
+ {
+ lock.readLock().release();
+ }
+
+ boolean acquired = false;
+ do
+ {
+ acquired = lock.writeLock().attempt(5000);
+
+ if (!acquired)
+ {
+ log.warn("Couldn't close valve, trying again", new Exception());
+ if (trace) { log.trace(debugValve()); }
+ }
+ }
+ while (!acquired);
+
+ activeCloses++;
+ activeLocks++;
+
+ // Sanity check only...
+ if (activeCloses > 1)
+ {
+ lock.writeLock().release();
+ throw new IllegalStateException("Valve closed twice");
+ }
+
+ if (trace)
+ {
+ Exception ex = new Exception();
+ getStackCloses().push(ex);
+ debugCloses.put(ex, Thread.currentThread());
+ }
+ }
+
+ public void open() throws InterruptedException
+ {
+ if (activeCloses <= 0 || activeLocks <= 0)
+ {
+ throw new IllegalStateException("Valve not closed");
+ }
+
+ activeCloses--;
+ activeLocks--;
+
+ lock.writeLock().release();
+
+ // re-apply the locks as we had before closing the valve
+ int counter = getCounter().counter;
+ for (int i = 0; i < counter; i++)
+ {
+ lock.readLock().acquire();
+ }
+
+ if (trace)
+ {
+ Exception ex = (Exception) getStackCloses().pop();
+ debugCloses.remove(ex);
+ }
+ }
+
+ public synchronized int getActiveLocks()
+ {
+ return activeLocks;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ /**
+ * Counter of times this thread entered the valve.
+ */
+ private Counter getCounter()
+ {
+ if (counterLocal.get() == null)
+ {
+ counterLocal.set(new Counter());
+ }
+
+ return (Counter)counterLocal.get();
+ }
+
+
+ private Stack getStackCloses()
+ {
+ if (stackCloses.get() == null)
+ {
+ stackCloses.set(new Stack());
+ }
+
+ return (Stack) stackCloses.get();
+ }
+
+ private Stack getStackEnters()
+ {
+ if (stackEnters.get() == null)
+ {
+ stackEnters.set(new Stack());
+ }
+ return (Stack) stackEnters.get();
+ }
+
+ /**
+ * This method will show the threads that are currently holding locks (enters or closes).
+ * */
+ private synchronized String debugValve()
+ {
+ StringWriter buffer = new StringWriter();
+ PrintWriter writer = new PrintWriter(buffer);
+
+ writer.println("********************** Debug Valve Information *************************");
+ writer.println("Close owners");
+
+ // Close should never have more than 1 thread owning, but as this is a debug report we will
+ // consider that as a possibility just to show eventual bugs (just in case thie class is ever
+ // changed)
+ for (Iterator iter = debugCloses.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ writer.println("Thread that own a close =" + entry.getValue());
+ writer.println("StackTrace:");
+ Exception e = (Exception) entry.getKey();
+ e.printStackTrace(writer);
+ }
+
+ writer.println("Valve owners");
+ for (Iterator iter = debugEnters.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ writer.println("Thread that own valve =" + entry.getValue());
+ writer.println("StackTrace:");
+ Exception e = (Exception) entry.getKey();
+ e.printStackTrace(writer);
+ }
+
+ return buffer.toString();
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ /**
+ * Used to count the number of read locks (or enters) owned by this thread
+ */
+ private static class Counter
+ {
+ int counter;
+ }
+
+}
Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -14,6 +14,7 @@
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
import javax.jms.JMSException;
@@ -121,21 +122,22 @@
log.debug(this + " got local connection delegate " + cd);
ConnectionState state = (ConnectionState)((DelegateSupport)cd).getState();
+ FailoverCommandCenter fcc = state.getFailoverCommandCenter();
// add a connection listener to detect failure; the consolidated remoting connection
// listener must be already in place and configured
state.getRemotingConnection().getConnectionListener().
- addDelegateListener(new ConnectionFailureListener(cd));
+ addDelegateListener(new ConnectionFailureListener(fcc));
log.debug(this + " installed failure listener on " + cd);
- // also cache the username and the password into state, useful in case FailoverAspect
- // needs to create a new connection instead of a failed on
+ // also cache the username and the password into state, useful in case
+ // FailoverCommandCenter needs to create a new connection instead of a failed on
state.setUsername(username);
state.setPassword(password);
// also add a reference to the clustered ConnectionFactory delegate, useful in case
- // FailoverAspect needs to create a new connection instead of a failed on
+ // FailoverCommandCenter needs to create a new connection instead of a failed on
state.setClusteredConnectionFactoryDeleage(clusteredDelegate);
return new CreateConnectionResult(cd);
Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -196,7 +196,7 @@
MethodInvocation mi = (MethodInvocation)invocation;
FailoverListener listener = (FailoverListener)mi.getArguments()[0];
- state.registerFailoverListener(listener);
+ state.getFailoverCommandCenter().registerFailoverListener(listener);
return null;
}
@@ -208,7 +208,7 @@
MethodInvocation mi = (MethodInvocation)invocation;
FailoverListener listener = (FailoverListener)mi.getArguments()[0];
- boolean result = state.unregisterFailoverListener(listener);
+ boolean result = state.getFailoverCommandCenter().unregisterFailoverListener(listener);
return new Boolean(result);
}
Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -11,6 +11,7 @@
import org.jboss.logging.Logger;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.state.ConnectionState;
/**
@@ -32,13 +33,13 @@
// Attributes -----------------------------------------------------------------------------------
- private ClientConnectionDelegate cd;
+ private FailoverCommandCenter fcc;
// Constructors ---------------------------------------------------------------------------------
- ConnectionFailureListener(ClientConnectionDelegate cd)
+ ConnectionFailureListener(FailoverCommandCenter fcc)
{
- this.cd = cd;
+ this.fcc = fcc;
}
// ConnectionListener implementation ------------------------------------------------------------
@@ -49,13 +50,8 @@
{
log.debug(this + " is being notified of connection failure: " + throwable);
- // generate a FAILURE_DETECTED event
- ((ConnectionState)cd.getState()).
- broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILURE_DETECTED, cd));
+ fcc.failureDetected(throwable, null);
- log.debug(this + " initiating client-side failover");
-
- cd.performFailover();
}
catch (Throwable e)
{
@@ -67,7 +63,7 @@
public String toString()
{
- return "ConnectionFailureListener[" + cd + "]";
+ return "ConnectionFailureListener[" + fcc + "]";
}
// Package protected ----------------------------------------------------------------------------
Deleted: trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverAspect.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -1,119 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.jms.client.container;
-
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.delegate.ConnectionFactoryDelegate;
-import org.jboss.logging.Logger;
-
-/**
- * PER_VM Aspect.
- *
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class FailoverAspect
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(FailoverAspect.class);
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
- // Public ---------------------------------------------------------------------------------------
-
- public Object handlePerformFailover(Invocation invocation) throws Throwable
- {
- Object target = invocation.getTargetObject();
-
- if(target instanceof ClientConnectionDelegate)
- {
- performConnectionFailover((ClientConnectionDelegate)target);
- }
-
- return null;
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private void performConnectionFailover(ClientConnectionDelegate delegate) throws Exception
- {
- ConnectionState state = (ConnectionState)delegate.getState();
-
- CreateConnectionResult res = null;
- boolean failoverSuccessful = false;
-
- try
- {
- // block any other invocations ariving while we're doing failover, on this delegate and
- // recursively down the hierarchy
-
- // WARNING - this may block if there are active invocations through valves!
- delegate.closeValve();
-
- log.debug("starting client-side failover");
-
- // generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
- // to insure the client-side stack is in a deterministic state
- state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, delegate));
-
- int failedNodeID = state.getServerID();
- ConnectionFactoryDelegate clusteredDelegate =
- state.getClusteredConnectionFactoryDelegate();
-
- // re-try creating the connection
- res = clusteredDelegate.
- createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
-
- if (res == null)
- {
- // No failover attempt was detected on the server side; this might happen if the
- // client side network fails temporarily so the client connection breaks but the
- // server cluster is still up and running - in this case we don't perform failover.
- failoverSuccessful = false;
- }
- else
- {
- // recursively synchronize state
- ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
- delegate.synchronizeWith(newDelegate);
- failoverSuccessful = true;
- }
- }
- finally
- {
- if (failoverSuccessful)
- {
- state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_COMPLETED,
- res.getDelegate()));
- }
- else
- {
- state.broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED,
- delegate));
- }
-
- // failover done, open valves
- delegate.openValve();
- }
- }
-
- // Inner classes --------------------------------------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -8,24 +8,24 @@
import org.jboss.aop.advice.Interceptor;
import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.logging.Logger;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.HierarchicalState;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.FailoverCommandCenter;
+import org.jboss.jms.client.FailoverValve;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.remoting.CannotConnectException;
-import java.util.List;
-import java.util.ArrayList;
import java.io.IOException;
/**
* An interceptor that acts as a failover valve: it allows all invocations to go through as long
* as there is no failover in progress (valve is open), and holds all invocations while client-side
- * failover is taking place (valve is closed). The interceptor fields org.jboss.jms.client.Valve's
- * method calls.
+ * failover is taking place (valve is closed). The interceptor is also a failover detector, in that
+ * it catches "failure-triggering" exceptions, and notifies the failover command center.
*
+ * The interceptor fields org.jboss.jms.client.Valve's method calls.
+ *
* It is a PER_INSTANCE interceptor.
*
* An instance of this interceptor must guard access to each connection, session, producer, consumer
@@ -39,39 +39,16 @@
{
// Constants ------------------------------------------------------------------------------------
- private static final Logger log = Logger.getLogger(FailoverValveInterceptor.class);
-
// Static ---------------------------------------------------------------------------------------
- private static boolean trace = log.isTraceEnabled();
-
// Attributes -----------------------------------------------------------------------------------
private DelegateSupport delegate;
- private HierarchicalState state;
- private volatile boolean valveOpen;
+ private FailoverCommandCenter fcc;
+ private FailoverValve valve;
- private ReadWriteLock rwLock;
-
- // the number of threads currently "penetrating" the open valve
- private int activeThreadsCount;
-
- // only for tracing
- private List activeMethods;
-
// Constructors ---------------------------------------------------------------------------------
- public FailoverValveInterceptor()
- {
- valveOpen = true;
- activeThreadsCount = 0;
- rwLock = new WriterPreferenceReadWriteLock();
- if (trace)
- {
- activeMethods = new ArrayList();
- }
- }
-
// Interceptor implemenation --------------------------------------------------------------------
public String getName()
@@ -81,149 +58,52 @@
public Object invoke(Invocation invocation) throws Throwable
{
- // maintain a reference to the delegate that sends invocation through this interceptor. It
- // makes sense, since it's an PER_INSTANCE interceptor.
- if (delegate == null)
+ // maintain a reference to the FailoverCommandCenter instance.
+
+ if (fcc == null)
{
delegate = (DelegateSupport)invocation.getTargetObject();
- state = delegate.getState();
- }
- String methodName = ((MethodInvocation)invocation).getMethod().getName();
- Sync writeLock = rwLock.writeLock();
- Sync readLock = rwLock.readLock();
-
- if("closeValve".equals(methodName))
- {
- if (!valveOpen)
+ HierarchicalState hs = delegate.getState();
+ while (hs != null && !(hs instanceof ConnectionState))
{
- // valve already closed, this is a noop
- log.warn(this + " already closed!");
- return null;
+ hs = hs.getParent();
}
- state.closeChildrensValves();
+ fcc = ((ConnectionState)hs).getFailoverCommandCenter();
+ valve = fcc.getValve();
+ }
- boolean acquired = false;
+ JMSRemotingConnection remotingConnection = null;
- while(!acquired)
- {
- try
- {
- acquired = writeLock.attempt(500);
- }
- catch(InterruptedException e)
- {
- // OK
- }
-
- if (!acquired)
- {
- log.debug(this + " failed to close");
- }
- }
-
- valveOpen = false;
-
- log.debug(this + " has been closed");
-
- return null;
- }
- else if("openValve".equals(methodName))
+ try
{
- if (valveOpen)
- {
- // valve already open, this is a noop
- log.warn(this + " already open!");
- return null;
- }
+ valve.enter();
- state.openChildrensValves();
-
- writeLock.release();
- valveOpen = true;
-
- log.debug(this + " has been opened");
-
- return null;
+ // it's important to retrieve the remotingConnection while inside the Valve, as there's
+ // guaranteed that no failover has happened yet
+ // guarantee that no failover has happened yet
+ remotingConnection = fcc.getRemotingConnection();
+ return invocation.invokeNext();
}
- else if("isValveOpen".equals(methodName))
+ catch (CannotConnectException e)
{
- if (valveOpen)
- {
- return Boolean.TRUE;
- }
- else
- {
- return Boolean.FALSE;
- }
+ fcc.failureDetected(e, remotingConnection);
+ return invocation.invokeNext();
}
- else if("getActiveThreadsCount".equals(methodName))
+ catch (IOException e)
{
- return new Integer(activeThreadsCount);
- }
-
- // attempt to grab the reader's lock and go forward
-
- boolean exempt = false;
-
- try
- {
- exempt = isInvocationExempt(methodName);
-
- if (!exempt)
- {
- boolean acquired = false;
-
- while(!acquired)
- {
- try
- {
- acquired = readLock.attempt(500);
- }
- catch(InterruptedException e)
- {
- // OK
- }
-
- if (trace && !acquired ) { log.trace(methodName + "() trying to pass through " + this); }
- }
- }
-
- synchronized(this)
- {
- activeThreadsCount++;
- if (trace)
- {
- activeMethods.add(methodName);
- }
- }
-
- if (trace) { log.trace(this + " allowed " + (exempt ? "exempt" : "") + " method " + methodName + "() to pass through"); }
-
+ fcc.failureDetected(e, remotingConnection);
return invocation.invokeNext();
-
}
- catch(IOException e)
+ catch (Throwable e)
{
- // transport-level failure detected while being in the middle of an invocation
+ // not failover-triggering, rethrow
throw e;
}
finally
{
- if (!exempt)
- {
- readLock.release();
- }
-
- synchronized(this)
- {
- activeThreadsCount--;
- if (trace)
- {
- activeMethods.remove(methodName);
- }
- }
+ valve.leave();
}
}
@@ -231,10 +111,7 @@
public String toString()
{
- return "FailoverValve." +
- (delegate == null ? "UNITIALIZED" : delegate.toString()) +
- (valveOpen ? "[OPEN(" + activeThreadsCount +
- (trace ? " " + activeMethods.toString() : "") + ")]":"[CLOSED]");
+ return "FailoverValve." + (delegate == null ? "UNITIALIZED" : delegate.toString());
}
// Package protected ----------------------------------------------------------------------------
@@ -243,10 +120,5 @@
// Private --------------------------------------------------------------------------------------
- private boolean isInvocationExempt(String methodName)
- {
- return "recoverDeliveries".equals(methodName);
- }
-
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -136,42 +136,6 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void closeValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void openValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public boolean isValveOpen()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public int getActiveThreadsCount()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Public ---------------------------------------------------------------------------------------
public String getStackName()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -268,50 +268,6 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- /**
- * This invocation should be handled by the client-side interceptor chain.
- */
- public void performFailover()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void closeValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void openValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public boolean isValveOpen()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public int getActiveThreadsCount()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Public ---------------------------------------------------------------------------------------
public void init()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -191,42 +191,6 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void closeValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void openValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public boolean isValveOpen()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public int getActiveThreadsCount()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Public ---------------------------------------------------------------------------------------
public String toString()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -230,42 +230,6 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void closeValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void openValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public boolean isValveOpen()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public int getActiveThreadsCount()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Public ---------------------------------------------------------------------------------------
public String toString()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -466,42 +466,6 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void closeValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void openValve()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public boolean isValveOpen()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public int getActiveThreadsCount()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Public ---------------------------------------------------------------------------------------
public String toString()
Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -22,16 +22,13 @@
package org.jboss.jms.client.state;
import java.util.HashSet;
-import java.util.List;
-import java.util.ArrayList;
import java.util.Iterator;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.client.FailoverListener;
+import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.message.MessageIdGenerator;
@@ -63,8 +60,6 @@
// Static ---------------------------------------------------------------------------------------
- private static boolean trace = log.isTraceEnabled();
-
// Attributes -----------------------------------------------------------------------------------
private int serverID;
@@ -91,11 +86,11 @@
// connection on a different node
private transient String password;
- private List failoverListeners;
-
// needed to try re-creating connection in case failure is detected on the current connection
private ConnectionFactoryDelegate clusteredConnectionFactoryDelegate;
+ private FailoverCommandCenter fcc;
+
// Constructors ---------------------------------------------------------------------------------
public ConnectionState(int serverID, ConnectionDelegate delegate,
@@ -121,7 +116,7 @@
this.idGenerator = gen;
this.serverID = serverID;
- failoverListeners = new ArrayList();
+ fcc = new FailoverCommandCenter(this);
}
// HierarchicalState implementation -------------------------------------------------------------
@@ -250,48 +245,6 @@
this.justCreated = justCreated;
}
- public void broadcastFailoverEvent(FailoverEvent e)
- {
- if (trace) { log.trace(this + " broadcasting " + e); }
-
- List listenersCopy;
-
- synchronized(failoverListeners)
- {
- listenersCopy = new ArrayList(failoverListeners);
- }
-
- for(Iterator i = listenersCopy.iterator(); i.hasNext(); )
- {
- FailoverListener listener = (FailoverListener)i.next();
-
- try
- {
- listener.failoverEventOccured(e);
- }
- catch(Exception ex)
- {
- log.warn("Failover listener " + listener + " did not accept event", ex);
- }
- }
- }
-
- public void registerFailoverListener(FailoverListener listener)
- {
- synchronized(failoverListeners)
- {
- failoverListeners.add(listener);
- }
- }
-
- public boolean unregisterFailoverListener(FailoverListener listener)
- {
- synchronized(failoverListeners)
- {
- return failoverListeners.remove(listener);
- }
- }
-
public void setClusteredConnectionFactoryDeleage(ConnectionFactoryDelegate d)
{
this.clusteredConnectionFactoryDelegate = d;
@@ -302,6 +255,11 @@
return clusteredConnectionFactoryDelegate;
}
+ public FailoverCommandCenter getFailoverCommandCenter()
+ {
+ return fcc;
+ }
+
public String toString()
{
return "ConnectionState[" + ((ClientConnectionDelegate)delegate).getID() + "]";
Modified: trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -52,18 +52,6 @@
Version getVersionToUse();
/**
- * Closes children's failover valves, by sending closeValve() invocations down children's
- * delegate stack. It is NOT intended to be recursive, unless the children chose so.
- */
- void closeChildrensValves() throws Exception;
-
- /**
- * Opens children's failover valves, by sending openValve() invocations down children's
- * delegate stack. It is NOT intended to be recursive, unless the children chose so.
- */
- void openChildrensValves() throws Exception;
-
- /**
* Update my own state based on the new state.
*/
void synchronizeWith(HierarchicalState newState) throws Exception;
Modified: trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -22,10 +22,8 @@
package org.jboss.jms.client.state;
import java.util.Set;
-import java.util.Iterator;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.Valve;
/**
* Base implementation of HierarchicalState.
@@ -76,34 +74,6 @@
return children;
}
- public void closeChildrensValves() throws Exception
- {
- if (children == null)
- {
- return;
- }
-
- for(Iterator i = children.iterator(); i.hasNext(); )
- {
- HierarchicalState s = (HierarchicalState)i.next();
- ((Valve)s.getDelegate()).closeValve();
- }
- }
-
- public void openChildrensValves() throws Exception
- {
- if (children == null)
- {
- return;
- }
-
- for(Iterator i = children.iterator(); i.hasNext(); )
- {
- HierarchicalState s = (HierarchicalState)i.next();
- ((Valve)s.getDelegate()).openValve();
- }
- }
-
// Public ---------------------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/BrowserDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -22,7 +22,6 @@
package org.jboss.jms.delegate;
import org.jboss.jms.server.endpoint.BrowserEndpoint;
-import org.jboss.jms.client.Valve;
/**
* Represents the minimal set of operations to provide browser
@@ -33,7 +32,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*/
-public interface BrowserDelegate extends Valve, BrowserEndpoint
+public interface BrowserDelegate extends BrowserEndpoint
{
}
Modified: trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -29,7 +29,6 @@
import org.jboss.jms.client.JBossConnectionConsumer;
import org.jboss.jms.client.FailoverListener;
-import org.jboss.jms.client.Valve;
import org.jboss.jms.server.endpoint.ConnectionEndpoint;
/**
@@ -42,7 +41,7 @@
*
* $Id$
*/
-public interface ConnectionDelegate extends Valve, ConnectionEndpoint
+public interface ConnectionDelegate extends ConnectionEndpoint
{
ExceptionListener getExceptionListener() throws JMSException;
@@ -59,6 +58,4 @@
void registerFailoverListener(FailoverListener failoverListener);
boolean unregisterFailoverListener(FailoverListener failoverListener);
- void performFailover() throws Exception;
-
}
Modified: trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -27,7 +27,6 @@
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.server.endpoint.ConsumerEndpoint;
-import org.jboss.jms.client.Valve;
/**
* Represents the minimal set of operations to provide consumer
@@ -41,7 +40,7 @@
*
* $Id$
*/
-public interface ConsumerDelegate extends Valve, ConsumerEndpoint
+public interface ConsumerDelegate extends ConsumerEndpoint
{
MessageListener getMessageListener();
Modified: trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -25,7 +25,6 @@
import javax.jms.Message;
import org.jboss.jms.client.Closeable;
-import org.jboss.jms.client.Valve;
import org.jboss.jms.destination.JBossDestination;
/**
@@ -35,7 +34,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*/
-public interface ProducerDelegate extends Valve, Closeable
+public interface ProducerDelegate extends Closeable
{
void setDisableMessageID(boolean value) throws JMSException;
Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -37,7 +37,6 @@
import org.jboss.jms.message.TextMessageProxy;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.jms.server.endpoint.SessionEndpoint;
-import org.jboss.jms.client.Valve;
/**
* Represents the minimal set of operations to provide session functionality.
@@ -49,7 +48,7 @@
*
* $Id$
*/
-public interface SessionDelegate extends Valve, SessionEndpoint
+public interface SessionDelegate extends SessionEndpoint
{
MessageProxy createMessage() throws JMSException;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -2178,7 +2178,7 @@
}
}
- log.info(this + ": server side fail over is now complete");
+ log.debug(this + " finished to fail over destinations");
//TODO - should this be in a finally? I'm not sure
status.finishFailingOver();
@@ -2190,6 +2190,8 @@
log.debug(this + " announced the cluster that failover procedure is complete");
sendJMXNotification(FAILOVER_COMPLETED_NOTIFICATION);
+
+ log.info(this + ": server side fail over is now complete");
}
finally
{
Deleted: trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverValveTest.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -1,909 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.jms;
-
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.jms.client.Valve;
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossQueueBrowser;
-import org.jboss.jms.client.JBossMessageProducer;
-import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.state.HierarchicalState;
-import org.jboss.jms.client.state.HierarchicalStateSupport;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.container.FailoverValveInterceptor;
-import org.jboss.jms.server.Version;
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.aop.MethodInfo;
-import org.jboss.aop.util.MethodHashing;
-import org.jboss.aop.advice.Interceptor;
-import org.jboss.remoting.Client;
-
-import javax.naming.InitialContext;
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.ConnectionFactory;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.QueueBrowser;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.Iterator;
-
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.Slot;
-
-/**
- * This is not necessarily a clustering test, as the failover valves are installed even for a
- * non-clustered configuration.
- *
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1843 $</tt>
- *
- * $Id: JMSTest.java 1843 2006-12-21 23:41:19Z timfox $
- */
-public class FailoverValveTest extends MessagingTestCase
-{
- // Constants ------------------------------------------------------------------------------------
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- private InitialContext ic;
- private ConnectionFactory cf;
- private Queue queue;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public FailoverValveTest(String name)
- {
- super(name);
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void testCloseValveHierarchy() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- assertTrue(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
-
- MessageProducer prod = session.createProducer(queue);
- assertTrue(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
-
- MessageConsumer cons = session.createConsumer(queue);
- assertTrue(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
-
- QueueBrowser browser = session.createBrowser(queue);
- assertTrue(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().closeValve();
-
- log.debug("top level valve closed");
-
- assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().openValve();
-
- log.debug("top level valve open");
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser).getDelegate()).isValveOpen());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- public void testCloseValveHierarchy2() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
- Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-
- MessageProducer prod1 = session1.createProducer(queue);
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-
- MessageConsumer cons1 = session1.createConsumer(queue);
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-
- QueueBrowser browser1 = session1.createBrowser(queue);
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-
- MessageProducer prod2 = session2.createProducer(queue);
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-
- MessageConsumer cons2 = session2.createConsumer(queue);
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-
- QueueBrowser browser2 = session2.createBrowser(queue);
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().closeValve();
-
- log.debug("top level valve closed");
-
- assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().openValve();
-
- log.debug("top level valve open");
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void testValveOpenByDefault() throws Throwable
- {
- FailoverValveInterceptor valve = new FailoverValveInterceptor();
- SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // try to open the valve again, it should be a noop
- valve.invoke(buildInvocation("openValve", target));
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
- }
-
- public void testPassThroughOpenValve() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- // send a thread through the open valve
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 1");
-
- t.start();
- t.join();
-
- // the thread should have passed through the open valve
- assertEquals(1, target.getBusinessMethod1InvocationCount());
- assertEquals(1, target.getActiveThreadsHighWaterMark());
-
- }
-
- public void testFlipValve() throws Throwable
- {
- FailoverValveInterceptor valve = new FailoverValveInterceptor();
- SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // child states also must have been notified to close their corresponding valves
- ParentHierarchicalState state = (ParentHierarchicalState)target.getState();
-
- for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
- {
- ChildState cs = (ChildState)i.next();
- assertFalse(((Valve)cs.getDelegate()).isValveOpen());
- }
-
- // try to close the valve again, it should be a noop
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
- }
-
- public void testFlipValve2() throws Throwable
- {
- FailoverValveInterceptor valve = new FailoverValveInterceptor();
- SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // child states also must have been notified to close their corresponding valves
- ParentHierarchicalState state = (ParentHierarchicalState)target.getState();
-
- for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
- {
- ChildState cs = (ChildState)i.next();
- assertFalse(((Valve)cs.getDelegate()).isValveOpen());
- }
-
- // re-open the valve
-
- valve.invoke(buildInvocation("openValve", target));
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- // child states also must have been notified to open their corresponding valves
- state = (ParentHierarchicalState)target.getState();
-
- for(Iterator i = state.getChildren().iterator(); i.hasNext(); )
- {
- ChildState cs = (ChildState)i.next();
- assertTrue(((Valve)cs.getDelegate()).isValveOpen());
- }
- }
-
- /**
- * Close the valve and send a thread through it. The thread must be put on hold until the valve
- * is opened again.
- */
- public void testCloseValveOneBusinessThread() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-
- log.debug("the valve is closed");
-
- // smack a thread into the closed valve
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 1");
-
- t.start();
-
- // wait for 10 secs for the invocation to reach target object. It shouldn't ...
- long waitTime = 10000;
- log.info("probing target for " + waitTime / 1000 + " seconds ...");
- InvocationToken arrived = target.waitForInvocation(waitTime);
-
- if (arrived != null)
- {
- fail(arrived.getMethodName() + "() reached target, while it shouldn't have!");
- }
-
-
- log.debug("the business thread didn't go through");
-
- // open the valve
-
- valve.invoke(buildInvocation("openValve", target));
-
- // the business invocation should complete almost immediately; wait on mutex to avoid race
- // condition
-
- arrived = target.waitForInvocation(2000);
-
- assertEquals("businessMethod1", arrived.getMethodName());
- assertEquals(1, target.getBusinessMethod1InvocationCount());
- assertEquals(1, target.getActiveThreadsHighWaterMark());
- }
-
-
- /**
- * Close the valve and send three threads through it. The threads must be put on hold until the
- * valve is opened again.
- */
- public void testCloseValveThreeBusinessThread() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- valve.invoke(buildInvocation("closeValve", target));
-
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
-
- log.debug("the valve is closed");
-
- // smack thread 1 into the closed valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 1").start();
-
- // smack thread 2 into the closed valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod1", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod1 invocation failed", t);
- }
- }
- }, "Business Thread 2").start();
-
- // smack thread 3 into the closed valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("businessMethod2", target));
- }
- catch(Throwable t)
- {
- log.error("businessMethod2 invocation failed", t);
- }
- }
- }, "Business Thread 3").start();
-
- // wait for 10 secs for any invocation to reach target object. It shouldn't ...
- long waitTime = 10000;
- log.info("probing target for " + waitTime / 1000 + " seconds ...");
- InvocationToken arrived = target.waitForInvocation(waitTime);
-
- if (arrived != null)
- {
- fail(arrived.getMethodName() + "() reached target, while it shouldn't have!");
- }
-
- log.debug("the business threads didn't go through");
-
- // open the valve
-
- valve.invoke(buildInvocation("openValve", target));
-
- // the business invocations should complete almost immediately; wait on mutex to avoid race
- // condition
-
- arrived = target.waitForInvocation(2000);
- assertNotNull(arrived);
-
- arrived = target.waitForInvocation(2000);
- assertNotNull(arrived);
-
- arrived = target.waitForInvocation(2000);
- assertNotNull(arrived);
-
- // wait for 3 secs for any invocation to reach target object. It shouldn't ...
- waitTime = 3000;
- log.info("probing target for " + waitTime / 1000 + " seconds ...");
- arrived = target.waitForInvocation(waitTime);
-
- if (arrived != null)
- {
- fail("Extra " + arrived.getMethodName() + "() reached target, " +
- "while it shouldn't have!");
- }
-
- assertEquals(2, target.getBusinessMethod1InvocationCount());
- assertEquals(1, target.getBusinessMethod2InvocationCount());
- assertEquals(3, target.getActiveThreadsHighWaterMark());
- }
-
- /**
- * The current standard behavior is that the valve cannot be closed as long as there are
- * active threads. closeValve() will block undefinitely.
- */
- public void testCloseWhileActiveThreads() throws Throwable
- {
- final FailoverValveInterceptor valve = new FailoverValveInterceptor();
- final SimpleInvocationTargetObject target = new SimpleInvocationTargetObject(valve);
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
-
- // send a long running thread through the valve
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("blockingBusinessMethod", target));
- }
- catch(Throwable t)
- {
- log.error("blockingBusinessMethod invocation failed", t);
- }
- }
- }, "Long running business thread").start();
-
- // allow blockingBusinessMethod time to block
- Thread.sleep(2000);
-
- assertEquals(new Integer(1), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- final Slot closingCompleted = new Slot();
-
- // from a different thread try to close the valve; this thread will block until we unblock
- // the business method
-
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- valve.invoke(buildInvocation("closeValve", target));
- closingCompleted.put(Boolean.TRUE);
-
- }
- catch(Throwable t)
- {
- log.error("blockingBusinessMethod() invocation failed", t);
- }
- }
- }, "Valve closing thread").start();
-
-
- // closing shouldn't be completed for a long time .... actually never, if I don't unblock
- // the business method
-
- // wait for 15 secs for closing. It shouldn't ...
- long waitTime = 15000;
- log.info("probing closing for " + waitTime / 1000 + " seconds ...");
-
- Boolean closed = (Boolean)closingCompleted.poll(waitTime);
-
- if (closed != null)
- {
- fail("closeValve() went through, while it shouldn't have!");
- }
-
- assertEquals(Boolean.TRUE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(1), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
-
- log.info("valve still open ...");
-
- // unblock blockingBusinessMethod
- target.unblockBlockingBusinessMethod();
-
- // valve closing should complete immediately after that
- closed = (Boolean)closingCompleted.poll(1000);
- assertTrue(closed.booleanValue());
- assertEquals(Boolean.FALSE, valve.invoke(buildInvocation("isValveOpen", target)));
- assertEquals(new Integer(0), valve.invoke(buildInvocation("getActiveThreadsCount", target)));
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- ServerManagement.start("all");
-
- ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-
- ServerManagement.deployQueue("TestQueue");
-
- cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
- queue = (Queue)ic.lookup("/queue/TestQueue");
-
- log.debug("setup done");
- }
-
- protected void tearDown() throws Exception
- {
- ServerManagement.undeployQueue("TestQueue");
-
- ic.close();
-
- super.tearDown();
- }
-
- // Private --------------------------------------------------------------------------------------
-
- private Invocation buildInvocation(String methodName, Object targetObject)
- throws Exception
- {
- long hash =
- MethodHashing.calculateHash(targetObject.getClass().getMethod(methodName, new Class[0]));
- MethodInfo mi = new MethodInfo(targetObject.getClass(), hash, hash, null);
- MethodInvocation invocation = new MethodInvocation(mi, new Interceptor[0]);
- invocation.setTargetObject(targetObject);
- return invocation;
- }
-
- // Inner classes --------------------------------------------------------------------------------
-
- public interface BusinessObject
- {
- void businessMethod1();
- void businessMethod2();
- /**
- * Must be unblocked externally.
- */
- void blockingBusinessMethod() throws InterruptedException;
- }
-
- public class SimpleInvocationTargetObject
- extends DelegateSupport implements BusinessObject, Valve
- {
- private int businessMethod1InvocationCount;
- private int businessMethod2InvocationCount;
-
- // LinkedQueue<InvocationToken>
- private LinkedQueue invocationTokens;
-
- private FailoverValveInterceptor valve;
- private int activeThreadsCountHighWaterMark;
-
- private Object blockingMethodWaitArea;
-
- public SimpleInvocationTargetObject(FailoverValveInterceptor valve)
- {
- super();
- setState(new ParentHierarchicalState());
- businessMethod1InvocationCount = 0;
- businessMethod2InvocationCount = 0;
- invocationTokens = new LinkedQueue();
- this.valve = valve;
- activeThreadsCountHighWaterMark = 0;
- blockingMethodWaitArea = new Object();
- }
-
- protected Client getClient() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public boolean isValveOpen()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void closeValve() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void openValve() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public int getActiveThreadsCount()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public synchronized void businessMethod1()
- {
- businessMethod1InvocationCount++;
- updateActiveThreadsCountHighWaterMark();
- notifyInvocationWaiter("businessMethod1");
- }
-
- public synchronized void businessMethod2()
- {
- businessMethod2InvocationCount++;
- updateActiveThreadsCountHighWaterMark();
- notifyInvocationWaiter("businessMethod2");
- }
-
- public void blockingBusinessMethod() throws InterruptedException
- {
- synchronized(blockingMethodWaitArea)
- {
- log.info("blockingBusinessMethod() blocking ...");
- // block calling thread undefinitely until blockingMethodWaitArea is notified
- blockingMethodWaitArea.wait();
- log.info("blockingBusinessMethod() unblocked");
- }
- }
-
- public synchronized int getBusinessMethod1InvocationCount()
- {
- return businessMethod1InvocationCount;
- }
-
- public synchronized int getBusinessMethod2InvocationCount()
- {
- return businessMethod2InvocationCount;
- }
-
- public synchronized int getActiveThreadsHighWaterMark()
- {
- return activeThreadsCountHighWaterMark;
- }
-
- /**
- * Block until an invocation arrives into the target. If the invocation arrived prior to
- * calling this method, it returns immediately.
- *
- * @return a token corresponding to the business invocation, or null if the method exited with
- * timout, without an invocation to arive.
- */
- public InvocationToken waitForInvocation(long timeout) throws InterruptedException
- {
- return (InvocationToken)invocationTokens.poll(timeout);
- }
-
- public void unblockBlockingBusinessMethod()
- {
- synchronized(blockingMethodWaitArea)
- {
- blockingMethodWaitArea.notify();
- }
- }
-
- /**
- * Reset the state
- */
- public synchronized void reset()
- {
- businessMethod1InvocationCount = 0;
- businessMethod2InvocationCount = 0;
- activeThreadsCountHighWaterMark = 0;
- }
-
- /**
- * Notify someone who waits for this invocation to arrive.
- */
- private void notifyInvocationWaiter(String methodName)
- {
- try
- {
- invocationTokens.put(new InvocationToken(methodName));
- }
- catch(InterruptedException e)
- {
- throw new RuntimeException("Failed to deposit notification in queue", e);
- }
- }
-
- private synchronized void updateActiveThreadsCountHighWaterMark()
- {
- try
- {
- int c =
- ((Integer)valve.invoke(buildInvocation("getActiveThreadsCount", this))).intValue();
- if (c > activeThreadsCountHighWaterMark)
- {
- activeThreadsCountHighWaterMark = c;
- }
- }
- catch(Throwable t)
- {
- throw new RuntimeException("Failed to get getActiveThreadsCount", t);
- }
- }
- }
-
- private class ParentHierarchicalState extends HierarchicalStateSupport
- {
- private DelegateSupport delegate;
- private HierarchicalState parent;
-
- ParentHierarchicalState()
- {
- super(null, null);
-
- children = new HashSet();
-
- // populate it with a child state
- children.add(new ChildState());
- }
-
- public DelegateSupport getDelegate()
- {
- return delegate;
- }
-
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate = delegate;
- }
-
- public HierarchicalState getParent()
- {
- return parent;
- }
-
- public void setParent(HierarchicalState parent)
- {
- this.parent = parent;
- }
-
- public Version getVersionToUse()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void synchronizeWith(HierarchicalState newState) throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
- }
-
- private class ChildState extends HierarchicalStateSupport
- {
- private DelegateSupport delegate;
- private HierarchicalState parent;
-
- ChildState()
- {
- super(null, new ChildDelegate());
- }
-
- public Set getChildren()
- {
- return Collections.EMPTY_SET;
- }
-
- public DelegateSupport getDelegate()
- {
- return delegate;
- }
-
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate = delegate;
- }
-
- public HierarchicalState getParent()
- {
- return parent;
- }
-
- public void setParent(HierarchicalState parent)
- {
- this.parent = parent;
- }
-
- public Version getVersionToUse()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public void synchronizeWith(HierarchicalState newState) throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
- }
-
- private class ChildDelegate extends DelegateSupport implements Valve
- {
- private boolean valveOpen;
-
- ChildDelegate()
- {
- valveOpen = true;
- }
-
- protected Client getClient() throws Exception
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
-
- public synchronized void closeValve() throws Exception
- {
- valveOpen = false;
- }
-
- public void openValve() throws Exception
- {
- valveOpen = true;
- }
-
- public boolean isValveOpen()
- {
- return valveOpen;
- }
-
- public int getActiveThreadsCount()
- {
- throw new RuntimeException("NOT YET IMPLEMENTED");
- }
- }
-
- private class InvocationToken
- {
- private String methodName;
-
- public InvocationToken(String methodName)
- {
- this.methodName = methodName;
- }
-
- public String getMethodName()
- {
- return methodName;
- }
- }
-
-}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -11,15 +11,8 @@
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.FailoverListener;
import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.client.Valve;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossQueueBrowser;
-import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.JBossMessageProducer;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import javax.jms.Connection;
import javax.jms.Session;
@@ -1377,77 +1370,6 @@
}
}
- public void testCloseValveHierarchy() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
-
- Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
-
- MessageProducer prod1 = session1.createProducer(queue[0]);
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
-
- MessageConsumer cons1 = session1.createConsumer(queue[0]);
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
-
- QueueBrowser browser1 = session1.createBrowser(queue[0]);
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
-
- MessageProducer prod2 = session2.createProducer(queue[0]);
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
-
- MessageConsumer cons2 = session2.createConsumer(queue[0]);
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
-
- QueueBrowser browser2 = session2.createBrowser(queue[0]);
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().closeValve();
-
- log.debug("top level valve closed");
-
- assertFalse(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertFalse(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
-
- ((JBossConnection)conn).getDelegate().openValve();
-
- log.debug("top level valve open");
-
- assertTrue(((Valve)((JBossConnection)conn).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossSession)session2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser1).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageProducer)prod2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossMessageConsumer)cons2).getDelegate()).isValveOpen());
- assertTrue(((Valve)((JBossQueueBrowser)browser2).getDelegate()).isValveOpen());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
public void testFailoverMessageOnServer() throws Exception
{
Connection conn = null;
@@ -1572,42 +1494,6 @@
}
}
- public void testTemp() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
- conn.close();
-
- conn = cf.createConnection();
-
- assertEquals(1, ((JBossConnection)conn).getServerID());
-
- // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
- // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
- JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
- getDelegate()).getRemotingConnection();
- rc.removeConnectionListener();
-
- ServerManagement.killAndWait(1);
-
- log.info("########");
- log.info("######## KILLED NODE 1");
- log.info("########");
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
public void testSimpleFailover() throws Exception
{
Connection conn = null;
@@ -1667,6 +1553,42 @@
}
}
+// public void testTemp() throws Exception
+// {
+// Connection conn = null;
+//
+// try
+// {
+// conn = cf.createConnection();
+// conn.close();
+//
+// conn = cf.createConnection();
+//
+// assertEquals(1, ((JBossConnection)conn).getServerID());
+//
+// // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+// // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+// JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+// getDelegate()).getRemotingConnection();
+// rc.removeConnectionListener();
+//
+// ServerManagement.killAndWait(1);
+//
+// log.info("########");
+// log.info("######## KILLED NODE 1");
+// log.info("########");
+//
+// Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// conn.close();
+// }
+// }
+// }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-09 08:19:28 UTC (rev 1927)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-09 11:36:35 UTC (rev 1928)
@@ -1161,13 +1161,15 @@
//is always currently used - (we could make this configurable)
String transport = config.getRemotingTransport();
+
+ long clientLeasePeriod = 20000;
String params = "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
"unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
"serializationtype=" + serializationType + "&" +
"dataType=jms&" +
"socket.check_connection=false&" +
- "clientLeasePeriod=20000&" +
+ "clientLeasePeriod=" + clientLeasePeriod + "&" +
"callbackStore=org.jboss.remoting.callback.BlockingCallbackStore&" +
"clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&" +
"serverSocketClass=org.jboss.jms.server.remoting.ServerSocketWrapper";
More information about the jboss-cvs-commits
mailing list