[jboss-cvs] JBoss Messaging SVN: r2430 - in trunk: src/main/org/jboss/jms/client/container and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Feb 25 17:22:39 EST 2007
Author: timfox
Date: 2007-02-25 17:22:39 -0500 (Sun, 25 Feb 2007)
New Revision: 2430
Added:
trunk/src/main/org/jboss/jms/client/FailoverValve2.java
Modified:
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
trunk/src/main/org/jboss/jms/server/destination/QueueService.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
Log:
Multiple failover fixes
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -6,17 +6,18 @@
*/
package org.jboss.jms.client;
-import org.jboss.logging.Logger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.jms.client.container.FailoverValveInterceptor;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+import org.jboss.logging.Logger;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-
/**
* The class in charge with performing the failover.
*
@@ -39,21 +40,27 @@
private ConnectionState state;
- private FailoverValve valve;
+ private FailoverValve2 valve;
private List failoverListeners;
-
+
// Constructors ---------------------------------------------------------------------------------
public FailoverCommandCenter(ConnectionState state)
{
this.state = state;
failoverListeners = new ArrayList();
- valve = new FailoverValve(this);
+
+ valve = new FailoverValve2();
}
// Public ---------------------------------------------------------------------------------------
-
+
+ public void setState(ConnectionState state)
+ {
+ this.state = state;
+ }
+
/**
* Method called by failure detection components (FailoverValveInterceptors and
* ConnectionListeners) when they have reasons to believe that a server failure occured.
@@ -69,13 +76,17 @@
CreateConnectionResult res = null;
boolean failoverSuccessful = false;
-
+
+ boolean valveOpened = false;
+
try
{
// block any other invocations ariving to any delegate from the hierarchy while we're
// doing failover
valve.close();
+
+ log.debug(this + " starting client-side failover");
synchronized(this)
{
@@ -91,21 +102,19 @@
remotingConnection.setFailed();
}
-
- log.debug(this + " starting client-side failover");
-
+
// generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
// to insure the client-side stack is in a deterministic state
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));
-
+
int failedNodeID = state.getServerID();
ConnectionFactoryDelegate clusteredDelegate =
state.getClusteredConnectionFactoryDelegate();
-
+
// re-try creating the connection
res = clusteredDelegate.
createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
-
+
if (res == null)
{
// No failover attempt was detected on the server side; this might happen if the
@@ -114,16 +123,39 @@
failoverSuccessful = false;
}
else
- {
+ {
// recursively synchronize state
ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
+
state.getDelegate().synchronizeWith(newDelegate);
- failoverSuccessful = true;
+
+ valve.open();
+ valveOpened = true;
+
+ //Now start the connection - note! this can't be done while the valve is closed
+ //or it will block itself
+
+ // start the connection again on the serverEndpoint if necessary
+ if (state.isStarted())
+ {
+ newDelegate.start();
+ }
+
+ failoverSuccessful = true;
}
}
+ catch (Exception e)
+ {
+ log.error("Failover failed", e);
+
+ throw e;
+ }
finally
{
- valve.open();
+ if (!valveOpened)
+ {
+ valve.open();
+ }
if (failoverSuccessful)
{
@@ -152,7 +184,7 @@
}
}
- public FailoverValve getValve()
+ public FailoverValve2 getValve()
{
return valve;
}
Added: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.client;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.jboss.logging.Logger;
+
+/**
+ * A FailoverValve2
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FailoverValve2
+{
+ private static final Logger log = Logger.getLogger(FailoverValve2.class);
+
+ private Set threads = new HashSet();
+
+ private int count;
+
+ private boolean locked;
+
+ private boolean trace = log.isTraceEnabled();
+
+ public synchronized void enter()
+ {
+ if (trace) { log.trace(this + " entering"); }
+
+ while (locked)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ count++;
+
+ if (trace)
+ {
+ threads.add(Thread.currentThread());
+ }
+
+ if (trace) { log.trace(this + " entered"); }
+ }
+
+ public synchronized void leave()
+ {
+ if (trace) { log.trace(this + " leaving"); }
+
+ count--;
+
+ if (trace)
+ {
+ threads.remove(Thread.currentThread());
+ }
+
+ notifyAll();
+
+ if (trace) { log.trace(this + " left"); }
+ }
+
+ public synchronized void close()
+ {
+ if (trace) { log.trace(this + " Closing valve " + locked); }
+
+ if (trace && threads.contains(Thread.currentThread()))
+ {
+ // Sanity check
+ throw new IllegalStateException("Cannot close valve from inside valve");
+ }
+
+ //If the valve is already closed then any more invocations of close must block until the valve is opened
+
+ while (locked)
+ {
+ if (trace) { log.trace("valve is already closed - blocking until its opened"); }
+
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ if (!locked)
+ {
+ //If it was locked when we tried to close but is not now locked - then return immediately
+ return;
+ }
+ }
+
+
+ locked = true;
+
+ while (count > 0)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (trace) { log.trace("Valve closed"); }
+ }
+
+ public synchronized void open()
+ {
+ if (trace) { log.trace(this + " Opening valve " + locked); }
+
+ if (!locked) return;
+
+ locked = false;
+
+ notifyAll();
+ }
+}
+
Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -10,12 +10,11 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.FailoverCommandCenter;
-import org.jboss.jms.client.FailoverValve;
+import org.jboss.jms.client.FailoverValve2;
import org.jboss.jms.client.FailureDetector;
-import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.HierarchicalState;
@@ -52,11 +51,10 @@
private DelegateSupport delegate;
- // We need to cache connectionState here, as FailureCommandCenter instance could be null for
- // non-clustered connections
+ // We need to cache connectionState here
+ // IMPORTANT - We must not cache the fcc or valve since these need to be replaced when failover occurs
+ // and if we cache them we wil end up using the old ones
private ConnectionState connectionState;
- private FailoverCommandCenter fcc;
- private FailoverValve valve;
// Constructors ---------------------------------------------------------------------------------
@@ -66,7 +64,7 @@
{
return "FailoverValveInterceptor";
}
-
+
public Object invoke(Invocation invocation) throws Throwable
{
// maintain a reference to connectionState, so we can ensure we have already tested for fcc.
@@ -82,25 +80,23 @@
}
connectionState = (ConnectionState)hs;
-
- // maintain a reference to the FailoverCommandCenter instance.
- fcc = connectionState.getFailoverCommandCenter();
-
- if (fcc != null)
- {
- valve = fcc.getValve();
- }
}
-
+
+ FailoverCommandCenter fcc = connectionState.getFailoverCommandCenter();
+
// non clustered, send the invocation forward
if (fcc == null)
{
return invocation.invokeNext();
}
-
+
+ FailoverValve2 valve = fcc.getValve();
+
JMSRemotingConnection remotingConnection = null;
String methodName = ((MethodInvocation)invocation).getMethod().getName();
+ boolean left = false;
+
try
{
valve.enter();
@@ -111,6 +107,9 @@
}
catch (MessagingNetworkFailureException e)
{
+ valve.leave();
+ left = true;
+
log.debug(this + " detected network failure, putting " + methodName +
"() on hold until failover completes");
@@ -150,7 +149,10 @@
}
finally
{
- valve.leave();
+ if (!left)
+ {
+ valve.leave();
+ }
}
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -30,6 +30,7 @@
import javax.jms.JMSException;
import javax.jms.ServerSessionPool;
+import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.FailoverListener;
import org.jboss.jms.client.JBossConnectionConsumer;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -50,6 +51,7 @@
import org.jboss.jms.wireformat.ConnectionStartRequest;
import org.jboss.jms.wireformat.ConnectionStopRequest;
import org.jboss.jms.wireformat.RequestSupport;
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.tx.MessagingXid;
/**
@@ -66,7 +68,10 @@
public class ClientConnectionDelegate extends DelegateSupport implements ConnectionDelegate
{
// Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ClientConnectionDelegate.class);
+
// Attributes -----------------------------------------------------------------------------------
private int serverID;
@@ -97,7 +102,7 @@
super.synchronizeWith(nd);
ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)nd;
-
+
// synchronize the server endpoint state
// this is a bit counterintuitve, as we're not copying from new delegate, but modifying its
@@ -105,7 +110,7 @@
// server
ConnectionState thisState = (ConnectionState)state;
-
+
if (thisState.getClientID() != null)
{
newDelegate.setClientID(thisState.getClientID());
@@ -119,23 +124,19 @@
remotingConnection = newDelegate.getRemotingConnection();
versionToUse = newDelegate.getVersionToUse();
-
+
// There is one RM per server, so we need to merge the rms if necessary
ResourceManagerFactory.instance.handleFailover(serverID, newDelegate.getServerID());
client = thisState.getRemotingConnection().getRemotingClient();
-
- // start the connection again on the serverEndpoint if necessary
- if (thisState.isStarted())
- {
- this.start();
- }
+
+ serverID = newDelegate.getServerID();
}
public void setState(HierarchicalState state)
{
super.setState(state);
-
+
client = ((ConnectionState)state).getRemotingConnection().getRemotingClient();
}
@@ -297,7 +298,7 @@
public String toString()
{
- return "ConnectionDelegate[" + id + ", SID=" + serverID + "]";
+ return "ConnectionDelegate[" + id + ", SID=" + serverID + "] " + System.identityHashCode(this);
}
// Protected ------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -37,6 +37,7 @@
import org.jboss.jms.wireformat.ConsumerCancelInflightMessagesRequest;
import org.jboss.jms.wireformat.ConsumerChangeRateRequest;
import org.jboss.jms.wireformat.RequestSupport;
+import org.jboss.logging.Logger;
/**
* The client-side Consumer delegate class.
@@ -51,7 +52,10 @@
public class ClientConsumerDelegate extends DelegateSupport implements ConsumerDelegate
{
// Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ClientConsumerDelegate.class);
+
// Attributes -----------------------------------------------------------------------------------
private int bufferSize;
@@ -92,8 +96,7 @@
maxDeliveries = newDelegate.getMaxDeliveries();
client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
- getRemotingClient();
-
+ getRemotingClient();
}
public void setState(HierarchicalState state)
@@ -214,7 +217,7 @@
public String toString()
{
- return "ConsumerDelegate[" + id + "]";
+ return "ConsumerDelegate[" + id + "] " + System.identityHashCode(this);
}
public int getBufferSize()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -65,6 +65,7 @@
import org.jboss.jms.wireformat.SessionRecoverDeliveriesRequest;
import org.jboss.jms.wireformat.SessionSendRequest;
import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
+import org.jboss.logging.Logger;
/**
* The client-side Session delegate class.
@@ -89,6 +90,9 @@
// Static ---------------------------------------------------------------------------------------
+ private static final Logger log = Logger.getLogger(ClientSessionDelegate.class);
+
+
// Constructors ---------------------------------------------------------------------------------
public ClientSessionDelegate(int objectID, int dupsOKBatchSize)
@@ -486,7 +490,7 @@
public String toString()
{
- return "SessionDelegate[" + id + "]";
+ return "SessionDelegate[" + id + "] " + System.identityHashCode(this);
}
// Protected ------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -28,7 +28,9 @@
import javax.jms.JMSException;
+import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.HierarchicalState;
+import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.util.MessagingJMSException;
import org.jboss.jms.util.MessagingNetworkFailureException;
import org.jboss.jms.wireformat.RequestSupport;
@@ -210,15 +212,6 @@
log.warn("Captured Exception:" + t, t);
return new MessagingNetworkFailureException((Exception)t);
}
-// else if (t instanceof Exception && t.getMessage().startsWith("Can not make remoting client invocation"))
-// {
-//
-// log.info("********** caught exception ", t);
-//
-// //FIXME Temporary HACK until http://jira.jboss.org/jira/browse/JBMESSAGING-891 is
-// //fixed
-// return new MessagingNetworkFailureException((Exception)t);
-// }
else
{
log.error("Failed", t);
@@ -226,5 +219,10 @@
}
}
+ public Client getClient()
+ {
+ return client;
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -236,8 +236,26 @@
* Handles a message sent from the server
* @param message The message
*/
- public void handleMessage(Object message) throws Exception
+
+
+ public void handleMessage(final Object message) throws Exception
{
+ this.sessionExecutor.execute(
+ new Runnable() { public void run()
+ {
+ try
+ {
+ handleMessageInternal(message);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle message", e);
+ }
+ } });
+ }
+
+ public void handleMessageInternal(Object message) throws Exception
+ {
MessageProxy proxy = (MessageProxy) message;
if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -169,6 +169,10 @@
sessionDelegate.synchronizeWith(newSessionDelegate);
}
+
+ //We weren't picking up the new fcc before so new delegates were using the old fcc!!
+ fcc = newState.fcc;
+ fcc.setState(this);
}
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -64,8 +64,6 @@
try
{
- log.info("Starting queue " + destination.getName());
-
postOffice = serverPeer.getPostOfficeInstance();
destination.setServerPeer(serverPeer);
@@ -89,14 +87,11 @@
// Must be done after load
queue.setMaxSize(destination.getMaxSize());
- queue.activate();
-
- log.info("Activated queue " + queue);
+ queue.activate();
}
if (queue == null)
- {
- log.info("Queue was null so creating a new one");
+ {
// Create a new queue
JMSCondition queueCond = new JMSCondition(true, destination.getName());
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -562,8 +562,6 @@
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
- log.info("loading bindings, non cliustered only " + nonClusteredOnly);
-
try
{
conn = ds.getConnection();
@@ -595,8 +593,6 @@
if (nonClusteredOnly && isClustered)
{
// Don't want to load clustered bindings
-
- log.info("it's a clustered binding not loading it since non clustered only");
}
else
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -132,21 +132,22 @@
// catch (Exception e)
// {
// log.error("Failed", e);
+// throw e;
// }
// finally
// {
-// if (conn != null)
-// {
-// log.info("closing connetion");
-// try
-// {
-// conn.close();
-// }
-// catch (Exception ignore)
-// {
-// }
-// log.info("closed connection");
-// }
+//// if (conn != null)
+//// {
+//// log.info("closing connetion");
+//// try
+//// {
+//// conn.close();
+//// }
+//// catch (Exception ignore)
+//// {
+//// }
+//// log.info("closed connection");
+//// }
// }
// }
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -1684,7 +1684,7 @@
failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
}
- public void testFailureRightAFterSendTransaction() throws Exception
+ public void testFailureRightAfterSendTransaction() throws Exception
{
Connection conn = null;
Connection conn0 = null;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2007-02-25 22:22:39 UTC (rev 2430)
@@ -51,10 +51,11 @@
// Constants ------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
- int messageCounterConsumer = 0;
- int messageCounterProducer = 0;
- boolean started = false;
- boolean shouldStop = false;
+
+ volatile int messageCounterConsumer = 0;
+ volatile int messageCounterProducer = 0;
+ volatile boolean started = false;
+ volatile boolean shouldStop = false;
Object lockReader = new Object();
Object lockWriter = new Object();
@@ -361,7 +362,7 @@
t.start();
}
- Thread.sleep(1000); // time to everybody line up
+ Thread.sleep(2000); // time to everybody line up
synchronized (semaphore)
{
started = true;
@@ -412,6 +413,7 @@
for (Iterator iter = threadList.iterator(); iter.hasNext();)
{
+ log.info("Waiting to join");
LocalThread t = (LocalThread) iter.next();
@@ -565,7 +567,7 @@
int counter = 0;
while (true)
{
- Message message = consumer.receive(1000);
+ Message message = consumer.receive(5000);
if (message == null && shouldStop)
{
log.info("Finished execution of thread as shouldStop was true");
More information about the jboss-cvs-commits
mailing list