[jboss-cvs] JBoss Messaging SVN: r1808 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/util tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/util
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 16 23:41:31 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-16 23:41:26 -0500 (Sat, 16 Dec 2006)
New Revision: 1808
Added:
trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
Modified:
trunk/src/main/org/jboss/jms/client/container/HAAspect.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/util/Valve.java
trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java
Log:
Adding ValveAspect on the failover
Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-17 04:41:26 UTC (rev 1808)
@@ -34,6 +34,8 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.aop.Advised;
+import org.jboss.aop.advice.Interceptor;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
@@ -61,15 +63,15 @@
import org.jboss.remoting.ConnectionListener;
/**
- *
+ *
* A HAAspect
- *
+ *
* There is one of these PER_INSTANCE of connection factory
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- *
+ *
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
@@ -82,6 +84,8 @@
public static final int MAX_RECONNECT_HOP_COUNT = 10;
+ public static final int MAX_IO_RETRY_COUNT = 10;
+
// Static --------------------------------------------------------
private static boolean trace = log.isTraceEnabled();
@@ -104,6 +108,14 @@
id = null;
}
+ /** A Copy Constructor Used on creationg of ValveAspect */
+ protected HAAspect(HAAspect copyFrom)
+ {
+ this.delegates = copyFrom.delegates;
+ this.failoverMap = copyFrom.failoverMap;
+ this.id = copyFrom.id;
+ }
+
// Public --------------------------------------------------------
public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
@@ -117,6 +129,10 @@
cacheLocalDelegates(invocation);
+ // TODO: I wanted to change aop-messaging-client.xml to only execute handleCreateConnectionDelegate
+ // on instances of ClusteredClientConnectionFactoryDelegate, but I couldn't find the right
+ // pointcut expression. So, this is a to do.
+ // However the following test is enough for now.
if (delegates == null)
{
// not clustered, pass the invocation through
@@ -148,16 +164,14 @@
ClientConnectionDelegate cd = (ClientConnectionDelegate)res.getDelegate();
+ // ValveAspect is supposed to be created per ClientConnectionDelegate
+ installValveAspect(cd, new ValveAspect(cd, this));
+
if(trace) { log.trace(this + " got local connection delegate " + cd); }
// Add a connection listener to detect failure; the consolidated remoting connection listener
// must be already in place and configured
- ConnectionListener listener = new ConnectionFailureListener(cd);
-
- ((ConnectionState)((DelegateSupport)cd).getState()).
- getRemotingConnectionListener().addDelegateListener(listener);
-
return new CreateConnectionResult(cd);
}
@@ -211,6 +225,62 @@
}
}
+ // Debug information about interceptors
+ protected void printInterceptors(Interceptor interceptors[])
+ {
+ if (interceptors==null || interceptors.length==0)
+ {
+ log.info("Interceptor chain is empty");
+ }
+ else
+ {
+ for (int i=0; i<interceptors.length; i++)
+ {
+ log.info("Interceptor[" + i + "] = " + interceptors[i].getName() + " className= " + interceptors[i].getClass().getName());
+ }
+ }
+ }
+
+ /** The valve aspect needs to stay after ExceptionInterceptor, and before DelegateSupport.
+ * This method will place the aspect on the proper place */
+ protected void installValveAspect(DelegateSupport delegate, Interceptor interceptor)
+ {
+ Advised advised = (Advised)delegate;
+ Interceptor interceptors[] = advised._getInstanceAdvisor().getInterceptors();
+
+ log.info("Installing interceptors");
+ printInterceptors(interceptors);
+
+
+ Interceptor delegateInterceptorFound = null;
+
+ for (int i=0;i<interceptors.length;i++)
+ {
+ if (interceptors[i] instanceof DelegateSupport)
+ {
+ delegateInterceptorFound = interceptors[i];
+ }
+ }
+
+
+ if (delegateInterceptorFound!=null)
+ {
+ advised._getInstanceAdvisor().removeInterceptor(delegateInterceptorFound.getName());
+ }
+
+ advised._getInstanceAdvisor().appendInterceptor(interceptor);
+
+ if (delegateInterceptorFound!=null)
+ {
+ advised._getInstanceAdvisor().appendInterceptor(delegateInterceptorFound);
+ }
+
+ log.info("Interceptors after installation:");
+ printInterceptors(advised._getInstanceAdvisor().getInterceptors());
+
+ }
+
+
//TODO this is currently hardcoded as round-robin, this should be made pluggable
private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
{
@@ -245,7 +315,7 @@
return null;
}
- private void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate)
+ protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate)
throws Exception
{
log.debug(this + " handling failed connection " + failedConnDelegate);
@@ -327,7 +397,7 @@
"Cannot find a server to failover onto.");
}
- private void performClientSideFailover(ClientConnectionDelegate failedConnDelegate,
+ protected void performClientSideFailover(ClientConnectionDelegate failedConnDelegate,
ClientConnectionDelegate newConnDelegate)
throws Exception
{
@@ -348,14 +418,14 @@
// We need to update some of the attributes on the state
failedState.copyState(newState);
-
+
// Map of old session ID to new session state
Map oldNewSessionStateMap = new HashMap();
for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
{
SessionState failedSessionState = (SessionState)i.next();
-
+
int oldSessionID = failedSessionState.getSessionId();
ClientSessionDelegate failedSessionDelegate =
@@ -367,9 +437,9 @@
failedSessionState.isXA());
SessionState newSessionState = (SessionState)newSessionDelegate.getState();
-
+
if (trace) { log.trace("new session state has " + newSessionState.getClientAckList().size() + " deliveries"); }
-
+
oldNewSessionStateMap.put(new Integer(oldSessionID), failedSessionState);
failedSessionDelegate.copyAttributes(newSessionDelegate);
@@ -401,13 +471,13 @@
}
}
}
-
+
// First we must tell the resource manager to substitute old session ID for new session ID.
// Note we MUST submit the entire mapping in one operation since there may be overlap between
// old and new session ID, and we don't want to overwrite keys in the map.
-
+
failedState.getResourceManager().handleFailover(oldNewSessionStateMap);
-
+
for(Iterator i = oldNewSessionStateMap.values().iterator(); i.hasNext(); )
{
List ackInfos = Collections.EMPTY_LIST;
@@ -415,16 +485,16 @@
SessionState state = (SessionState)i.next();
if (!state.isTransacted() ||
- (state.isXA() && state.getCurrentTxId() == null))
+ (state.isXA() && state.getCurrentTxId() == null))
{
// Non transacted session or an XA session with no transaction set (it falls back
// to auto_ack)
-
+
if (trace) { log.trace(state + " is not transacted (or XA with no tx set), retrieving deliveries from session state"); }
// We remove any unacked non-persistent messages - this is because we don't want to ack
// them since the server won't know about them and will get confused
-
+
if (state.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
for(Iterator j = state.getClientAckList().iterator(); j.hasNext(); )
@@ -436,13 +506,13 @@
if (trace) { log.trace("removed non persistent delivery " + info); }
}
}
-
+
ackInfos = state.getClientAckList();
}
else
{
DeliveryInfo autoAck = state.getAutoAckInfo();
- if (autoAck != null)
+ if (autoAck != null)
{
if (!autoAck.getMessageProxy().getMessage().isReliable())
{
@@ -455,9 +525,9 @@
ackInfos = new ArrayList();
ackInfos.add(autoAck);
}
- }
+ }
}
-
+
if (trace) { log.trace(this + " retrieved " + ackInfos.size() + " deliveries"); }
}
else
@@ -470,23 +540,23 @@
}
if (!ackInfos.isEmpty())
- {
+ {
SessionDelegate newDelegate = (SessionDelegate)state.getDelegate();
-
+
List recoveryInfos = new ArrayList();
-
+
for (Iterator iter2 = ackInfos.iterator(); iter2.hasNext(); )
{
DeliveryInfo info = (DeliveryInfo)iter2.next();
-
+
DeliveryRecovery recInfo =
- new DeliveryRecovery(info.getMessageProxy().getDeliveryId(),
+ new DeliveryRecovery(info.getMessageProxy().getDeliveryId(),
info.getMessageProxy().getMessage().getMessageID(),
info.getChannelId());
-
+
recoveryInfos.add(recInfo);
}
-
+
if (trace) { log.trace(this + " sending delivery recovery info: " + recoveryInfos); }
newDelegate.recoverDeliveries(recoveryInfos);
}
@@ -545,7 +615,7 @@
// ResourceManager rm = failedConnectionState.getResourceManager();
//
// todo - we need to replace the sesion id
-//
+//
// rm.handleFailover(oldConsumerID, failedConsumerState.getConsumerID());
// }
@@ -614,37 +684,6 @@
}
- // Inner classes -------------------------------------------------
-
- private class ConnectionFailureListener implements ConnectionListener
- {
- private ClientConnectionDelegate cd;
-
- ConnectionFailureListener(ClientConnectionDelegate cd)
- {
- this.cd = cd;
- }
-
- // ConnectionListener implementation ---------------------------
-
- public void handleConnectionException(Throwable throwable, Client client)
- {
- try
- {
- log.debug(this + " is being notified of connection failure: " + throwable);
- handleConnectionFailure(cd);
- }
- catch (Throwable e)
- {
- log.error("Caught exception in handling failure", e);
- }
- }
-
- public String toString()
- {
- return "ConnectionFailureListener[" + cd + "]";
- }
- }
}
Added: trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ValveAspect.java 2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/client/container/ValveAspect.java 2006-12-17 04:41:26 UTC (rev 1808)
@@ -0,0 +1,227 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.client.container;
+
+import org.jboss.aop.advice.Interceptor;
+import org.jboss.aop.joinpoint.Invocation;
+import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.util.Valve;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.CannotConnectException;
+import org.jboss.remoting.ConnectionListener;
+import org.jboss.remoting.Client;
+import java.io.IOException;
+import javax.jms.JMSException;
+
+/**
+ * This aspect will intercept failures from any HA object.
+ * <p/>
+ * The reason why I've made an extension of HAAspect instead of implementing new methods there is
+ * ValveAspect needs to cache ClientConnectionDelegate while HAAspect needs to cache CF related objects.
+ * I have made this an extension of HAAspect as it's needed one instance of this aspect per
+ * ConnectionCreated.
+ * <p/>
+ * We will cache the ClientConnectionDelegate on this aspect so we won't need to do any operation on delegates
+ * to retrieve the current ConnectionDelegate.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id:$
+ */
+public class ValveAspect extends HAAspect implements Interceptor
+{
+ private static final Logger log = Logger.getLogger(ValveAspect.class);
+
+ private ClientConnectionDelegate delegate;
+
+ private Valve valve = new Valve();
+
+ ValveAspect(ClientConnectionDelegate delegate, HAAspect copy)
+ {
+ super(copy);
+ this.delegate = delegate;
+
+ ConnectionListener listener = new ConnectionFailureListener(delegate);
+
+ ((ConnectionState) ((DelegateSupport) delegate).getState()).
+ getRemotingConnectionListener().addDelegateListener(listener);
+
+
+ }
+
+ public String getName()
+ {
+ return this.getClass().getName();
+ }
+
+
+ /**
+ * This method executes the valve, listening for erros on the underlaying IO layer,
+ * and it will call the failure for HA
+ */
+ public Object invoke(Invocation invocation) throws Throwable
+ {
+
+ Object returnObject = null;
+
+ boolean failure = false;
+
+ // Eventually retries in case of listed exceptions
+ for (int i = 0; i < MAX_IO_RETRY_COUNT; i++)
+ {
+ // We shouldn't have any calls being made while the failover is being executed
+ valve.isOpened(true);
+
+ if (i > 0)
+ {
+ log.info("Retrying a call " + i);
+ }
+ failure = false;
+ try
+ {
+ returnObject = invocation.invokeNext();
+ }
+ catch (CannotConnectException e)
+ {
+ log.error("Got an exception on HAAspect, retryCount=" + i, e);
+ failure = true;
+ }
+ catch (IOException e)
+ {
+ log.error("Got an exception on HAAspect, retryCount=" + i, e);
+ failure = true;
+ }
+ catch (Throwable e)
+ {
+ log.error("ValveAspect didn't catch the exception " + e + ", and it will be forwarded", e);
+ throw e;
+ }
+
+ if (!failure)
+ {
+ break;
+ }
+ }
+
+
+ if (failure)
+ {
+ handleConnectionFailure(delegate);
+ // if on the end we still have an exception there is nothing we can do besides throw an exception
+ // so, no retires on the failedOver Invocation
+ returnObject = invocation.invokeNext();
+ }
+
+ // if the object returned is another DelegateSupport, we will install the aspect on the returned object
+
+ return returnObject;
+
+ }
+
+
+ /**
+ * Since we are listening for exceptions on the invocation layer, several objects might
+ * get the exception at the same time.
+ * Suppose you have 30 (or any X number>=2) Consumers, using the same JBossConnection failing at the same time.
+ * We will get simultaneous calls on handleFailures while we just need to process one single failure.
+ * <p/>
+ * On this case this method will open a valve and it will perform only the first handleFailure captured, and
+ * it will just return all the others as soon as the valve is closed. This way all the simultaneous failures will
+ * act as they were processed while we called failover only once.
+ */
+ protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate) throws Exception
+ {
+ Valve localValve = null;
+
+ // The idea is to reset the Valve synchronized with a reset valve
+ synchronized (this) // I'm not sure if this synchronized is necessary. I will keep it here just to be safe
+ {
+ localValve = valve;
+ }
+
+ // only one execution should be performed if multiple exceptions happened at the same time
+ if (localValve.open())
+ {
+ try
+ {
+ log.info("Processing valve on exception failure");
+ super.handleConnectionFailure(failedConnDelegate);
+ }
+ finally
+ {
+ localValve.close();
+ synchronized (this)
+ {
+ // reset the valve, so future exceptions will also get processed
+ valve = new Valve();
+ }
+ }
+ } else
+ {
+ log.info("The valve was closed, so this invocation waited another invocation to finish on handleFailure");
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+
+ /** I have moved this ConnectionListener to ValveAspect (from HAAspect) because
+ * it needs to use the same valve as exception listeners.
+ * While we are processing failover, we should block any calls on the client side.
+ * (No call should be made while the client failover is being executed). It doesn't matter if
+ * the failover was captured by Lease (ConnectionFactory) or Exception handling on invoke at this class */
+ private class ConnectionFailureListener implements ConnectionListener
+ {
+ private ClientConnectionDelegate cd;
+
+ ConnectionFailureListener(ClientConnectionDelegate cd)
+ {
+ this.cd = cd;
+ }
+
+ // ConnectionListener implementation ---------------------------
+
+ public void handleConnectionException(Throwable throwable, Client client)
+ {
+ try
+ {
+ log.debug(this + " is being notified of connection failure: " + throwable);
+ handleConnectionFailure(cd);
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught exception in handling failure", e);
+ }
+ }
+
+ public String toString()
+ {
+ return "ConnectionFailureListener[" + cd + "]";
+ }
+ }
+
+
+}
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-12-17 04:41:26 UTC (rev 1808)
@@ -94,7 +94,8 @@
public String getName()
{
- return "Invoker";
+ // it's needed a meaninful name to change the aop stack programatically (HA uses that)
+ return this.getClass().getName();
}
/**
Modified: trunk/src/main/org/jboss/jms/util/Valve.java
===================================================================
--- trunk/src/main/org/jboss/jms/util/Valve.java 2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/util/Valve.java 2006-12-17 04:41:26 UTC (rev 1808)
@@ -22,7 +22,9 @@
package org.jboss.jms.util;
+import org.jboss.logging.Logger;
+
/**
* This class is used to guarantee only one thread will be performing a given function, and if any other
* thread tries to execute the same functionality it will just ignored
@@ -54,10 +56,47 @@
* */
public class Valve
{
+ private static final Logger log = Logger.getLogger(Valve.class);
+ private boolean trace = log.isTraceEnabled();
+
boolean opened;
boolean closed;
+ Thread threadOwner;
+ int refereceCountOpen=0;
+
+
+ public synchronized boolean isOpened()
+ {
+ return opened;
+ }
+
+ /** If the Valve is opened, will wait until the valve is closed */
+ public synchronized boolean isOpened(boolean wait) throws Exception
+ {
+ if (wait && opened)
+ {
+ if (!closed && threadOwner != Thread.currentThread())
+ {
+ if (trace) log.trace("threadOwner= " + threadOwner + " and currentThread=" + Thread.currentThread());
+ if (trace) log.trace("Waiting valve to be closed");
+ this.wait();
+ if (trace) log.trace("Valve was closed");
+ }
+ else
+ {
+ if (trace) log.trace("This is ThreadOwner, so Valve won't wait");
+ }
+ return opened;
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+
public boolean open() throws Exception
{
return open(true);
@@ -65,9 +104,16 @@
public synchronized boolean open(boolean wait) throws Exception
{
+ if (threadOwner==Thread.currentThread())
+ {
+ if (trace) log.trace("Valve was opened again by thread owner");
+ refereceCountOpen++;
+ return true;
+ }
// already opened? then needs to wait to be closed
if (opened)
{
+ if (trace) log.trace("Valve being opened and time.wait");
// if not closed yet, will wait to be closed
if (!closed)
{
@@ -79,7 +125,10 @@
return false;
} else
{
+ if (trace) log.trace("Valve being opened and this thread is the owner for this lock");
+ refereceCountOpen++;
opened = true;
+ threadOwner = Thread.currentThread();
return true;
}
}
@@ -92,9 +141,18 @@
}
if (closed)
{
- throw new IllegalStateException("Valve is already closed");
+ log.warn("Valve was already closed", new Exception());
}
- closed = true;
- notifyAll();
+ refereceCountOpen--;
+ if (refereceCountOpen==0)
+ {
+ if (trace) log.trace("Closing Valve");
+ closed = true;
+ notifyAll();
+ }
+ else
+ {
+ if (trace) log.trace("Valve.close called but there referenceCountOpen=" + refereceCountOpen);
+ }
}
}
Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java 2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java 2006-12-17 04:41:26 UTC (rev 1808)
@@ -0,0 +1,237 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.logging.Logger;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id:$
+ */
+public class ValveTest extends ClusteringTestBase
+{
+
+ public ValveTest(String name)
+ {
+ super(name);
+ }
+
+ int messageCounterConsumer =0;
+ int messageCounterProducer=0;
+
+
+ Object lockReader = new Object();
+ Object lockWriter = new Object();
+ Object semaphore = new Object();
+
+ boolean shouldStop = false;
+
+
+ class LocalThreadConsumer extends Thread
+ {
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ int id;
+ MessageConsumer consumer;
+ Session session;
+
+ public LocalThreadConsumer(int id, Session session, Destination destination) throws Exception
+ {
+ consumer = session.createConsumer(destination);
+ this.session = session;
+ this.id = id;
+ }
+
+
+ public void run()
+ {
+ try
+ {
+ synchronized (semaphore)
+ {
+ semaphore.wait();
+ }
+
+ int counter = 0;
+ while (true)
+ {
+ Message message = consumer.receive(50);
+ if (message==null && shouldStop)
+ {
+ break;
+ }
+ if (message!=null)
+ {
+ synchronized (lockReader)
+ {
+ messageCounterConsumer++;
+ }
+ log.trace("ReceiverID=" + id + " received message " + message);
+ if (counter++ % 10 == 0)
+ {
+ //log.info("Commit on id=" + id);
+ session.commit();
+ }
+ }
+ }
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ log.info("Caught exception... finishing Thread " + id, e);
+ }
+ }
+ }
+
+ class LocalThreadProducer extends Thread
+ {
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ MessageProducer producer;
+ Session session;
+ int id;
+
+ public LocalThreadProducer(int id, Session session, Destination destination) throws Exception
+ {
+ this.session = session;
+ producer = session.createProducer(destination);
+ this.id = id;
+ }
+
+ public void run()
+ {
+ try
+ {
+ synchronized (semaphore)
+ {
+ semaphore.wait();
+ }
+
+ int counter = 0;
+ while (!shouldStop)
+ {
+ log.trace("Producer ID=" + id + " send message");
+ producer.send(session.createTextMessage("Message from producer " + id + " counter=" + (counter)));
+
+ synchronized (lockWriter)
+ {
+ messageCounterProducer++;
+ }
+
+ if (counter++ % 5 == 0)
+ {
+ //log.info("Committing message");
+ session.commit();
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ log.info("Caught exception... finishing Thread " + id, e);
+ }
+ }
+ }
+
+ /**
+ * This test will open several Consumers at the same Connection and it will kill the server, expecting failover
+ * to happen inside the Valve
+ */
+ public void testMultiThreadFailover() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
+ Connection conn = factory.createConnection();
+ conn.start();
+
+ ArrayList list = new ArrayList();
+
+ for (int i = 0; i < 5; i++)
+ {
+ list.add(new LocalThreadProducer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
+ list.add(new LocalThreadConsumer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
+ }
+
+ for (Iterator iter = list.iterator(); iter.hasNext();)
+ {
+ Thread t = (Thread) iter.next();
+ t.start();
+ }
+
+ Thread.sleep(1000);
+ synchronized (semaphore)
+ {
+ semaphore.notifyAll();
+ }
+
+ Thread.sleep(30000);
+
+ log.info("Killing server 1");
+
+ ServerManagement.kill(1);
+
+ Thread.sleep(50000);
+ shouldStop=true;
+
+ for (Iterator iter = list.iterator(); iter.hasNext();)
+ {
+ Thread t = (Thread) iter.next();
+ t.join();
+ }
+
+ log.info("produced " + messageCounterProducer + " and read " + messageCounterConsumer);
+
+ assertEquals(messageCounterProducer, messageCounterConsumer);
+
+
+ }
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ nodeCount = 3;
+
+ super.setUp();
+
+ log.debug("setup done");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java 2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java 2006-12-17 04:41:26 UTC (rev 1808)
@@ -81,11 +81,18 @@
//Thread.sleep(1000);
+ valve.open(); // stack vavles
+ if (!valve.isOpened(true))
+ {
+ fail("Valve should be opened");
+ }
+
synchronized (VeryBasicValveTest.class)
{
counter ++;
}
valve.close();
+ valve.close();
}
//log.info("Thread " + threadId + " is now closing the valve");
More information about the jboss-cvs-commits
mailing list