[jboss-cvs] JBoss Messaging SVN: r2430 - in trunk: src/main/org/jboss/jms/client/container and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Feb 25 17:22:39 EST 2007


Author: timfox
Date: 2007-02-25 17:22:39 -0500 (Sun, 25 Feb 2007)
New Revision: 2430

Added:
   trunk/src/main/org/jboss/jms/client/FailoverValve2.java
Modified:
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
   trunk/src/main/org/jboss/jms/server/destination/QueueService.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
Log:
Multiple failover fixes



Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -6,17 +6,18 @@
  */
 package org.jboss.jms.client;
 
-import org.jboss.logging.Logger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.jms.client.container.FailoverValveInterceptor;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+import org.jboss.logging.Logger;
 
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-
 /**
  * The class in charge with performing the failover.
  *
@@ -39,21 +40,27 @@
 
    private ConnectionState state;
 
-   private FailoverValve valve;
+   private FailoverValve2 valve;
 
    private List failoverListeners;
-
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public FailoverCommandCenter(ConnectionState state)
    {
       this.state = state;
       failoverListeners = new ArrayList();
-      valve = new FailoverValve(this);
+      
+      valve = new FailoverValve2();
    }
 
    // Public ---------------------------------------------------------------------------------------
-
+   
+   public void setState(ConnectionState state)
+   {
+      this.state = state;
+   }
+   
    /**
     * Method called by failure detection components (FailoverValveInterceptors and
     * ConnectionListeners) when they have reasons to believe that a server failure occured.
@@ -69,13 +76,17 @@
 
       CreateConnectionResult res = null;
       boolean failoverSuccessful = false;
-
+      
+      boolean valveOpened = false;
+      
       try
       {
          // block any other invocations ariving to any delegate from the hierarchy while we're
          // doing failover
 
          valve.close();
+         
+         log.debug(this + " starting client-side failover");
 
          synchronized(this)
          {
@@ -91,21 +102,19 @@
 
             remotingConnection.setFailed();
          }
-
-         log.debug(this + " starting client-side failover");
-
+         
          // generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
          // to insure the client-side stack is in a deterministic state
          broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));
-
+         
          int failedNodeID = state.getServerID();
          ConnectionFactoryDelegate clusteredDelegate =
             state.getClusteredConnectionFactoryDelegate();
-
+         
          // re-try creating the connection
          res = clusteredDelegate.
             createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
-
+         
          if (res == null)
          {
             // No failover attempt was detected on the server side; this might happen if the
@@ -114,16 +123,39 @@
             failoverSuccessful = false;
          }
          else
-         {
+         {      
             // recursively synchronize state
             ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
+            
             state.getDelegate().synchronizeWith(newDelegate);
-            failoverSuccessful = true;
+            
+            valve.open();
+            valveOpened = true;
+            
+            //Now start the connection - note! this can't be done while the valve is closed
+            //or it will block itself
+            
+            // start the connection again on the serverEndpoint if necessary
+            if (state.isStarted())
+            {
+               newDelegate.start();
+            }
+            
+            failoverSuccessful = true;                        
          }
       }
+      catch (Exception e)
+      {
+         log.error("Failover failed", e);
+         
+         throw e;
+      }
       finally
       {
-         valve.open();
+         if (!valveOpened)
+         {
+            valve.open();
+         }
 
          if (failoverSuccessful)
          {
@@ -152,7 +184,7 @@
       }
    }
 
-   public FailoverValve getValve()
+   public FailoverValve2 getValve()
    {
       return valve;
    }

Added: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.client;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.jboss.logging.Logger;
+
+/**
+ * A FailoverValve2
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class FailoverValve2
+{
+   private static final Logger log = Logger.getLogger(FailoverValve2.class);
+
+   private Set threads = new HashSet();
+   
+   private int count;
+   
+   private boolean locked;
+   
+   private boolean trace = log.isTraceEnabled();
+
+   public synchronized void enter()
+   {
+      if (trace) { log.trace(this + " entering"); }
+      
+      while (locked)
+      {
+         try
+         {
+            wait();
+         }
+         catch (InterruptedException ignore)
+         {            
+         }
+      }
+      count++;
+      
+      if (trace)
+      {
+         threads.add(Thread.currentThread());
+      }
+      
+      if (trace) { log.trace(this + " entered"); }
+   }
+
+   public synchronized void leave()
+   {
+      if (trace) { log.trace(this + " leaving"); }
+      
+      count--;
+      
+      if (trace)
+      {
+         threads.remove(Thread.currentThread());
+      }
+
+      notifyAll();
+      
+      if (trace) { log.trace(this + " left"); }
+   }
+
+   public synchronized void close()
+   {
+      if (trace) { log.trace(this + " Closing valve " + locked); }
+      
+      if (trace && threads.contains(Thread.currentThread()))
+      {
+         // Sanity check
+         throw new IllegalStateException("Cannot close valve from inside valve");
+      }
+      
+      //If the valve is already closed then any more invocations of close must block until the valve is opened
+      
+      while (locked)
+      {
+         if (trace) { log.trace("valve is already closed - blocking until its opened"); }
+         
+         try
+         {
+            wait();
+         }
+         catch (InterruptedException ignore)
+         {            
+         }
+         
+         if (!locked)
+         {
+            //If it was locked when we tried to close but is not now locked - then return immediately
+            return;
+         }
+      }
+      
+
+      locked = true;
+
+      while (count > 0)
+      {
+         try
+         {
+            wait();
+         }
+         catch (InterruptedException ignore)
+         {            
+         }
+      } 
+      
+      if (trace) { log.trace("Valve closed"); }
+   }
+
+   public synchronized void open()
+   {
+      if (trace) { log.trace(this + " Opening valve " + locked); }
+      
+      if (!locked) return;
+
+      locked = false;
+
+      notifyAll();
+   } 
+}
+

Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -10,12 +10,11 @@
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.FailoverCommandCenter;
-import org.jboss.jms.client.FailoverValve;
+import org.jboss.jms.client.FailoverValve2;
 import org.jboss.jms.client.FailureDetector;
-import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.HierarchicalState;
@@ -52,11 +51,10 @@
 
    private DelegateSupport delegate;
 
-   // We need to cache connectionState here, as FailureCommandCenter instance could be null for
-   // non-clustered connections
+   // We need to cache connectionState here
+   // IMPORTANT - We must not cache the fcc or valve since these need to be replaced when failover occurs
+   // and if we cache them we wil end up using the old ones
    private ConnectionState connectionState;
-   private FailoverCommandCenter fcc;
-   private FailoverValve valve;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -66,7 +64,7 @@
    {
       return "FailoverValveInterceptor";
    }
-
+   
    public Object invoke(Invocation invocation) throws Throwable
    {      
       // maintain a reference to connectionState, so we can ensure we have already tested for fcc.
@@ -82,25 +80,23 @@
          }
          
          connectionState = (ConnectionState)hs;
-
-         // maintain a reference to the FailoverCommandCenter instance.
-         fcc = connectionState.getFailoverCommandCenter();
-         
-         if (fcc != null)
-         {
-            valve = fcc.getValve();
-         }
       }
-
+      
+      FailoverCommandCenter fcc = connectionState.getFailoverCommandCenter();
+            
       // non clustered, send the invocation forward
       if (fcc == null)
       {
          return invocation.invokeNext();
       }
-
+      
+      FailoverValve2 valve = fcc.getValve();
+      
       JMSRemotingConnection remotingConnection = null;
       String methodName = ((MethodInvocation)invocation).getMethod().getName();
 
+      boolean left = false;
+      
       try
       {
          valve.enter();
@@ -111,6 +107,9 @@
       }
       catch (MessagingNetworkFailureException e)
       {
+         valve.leave();
+         left = true;
+         
          log.debug(this + " detected network failure, putting " + methodName +
          "() on hold until failover completes");
       
@@ -150,7 +149,10 @@
       }
       finally
       {
-         valve.leave();
+         if (!left)
+         {
+            valve.leave();
+         }
       }
    }
    

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -30,6 +30,7 @@
 import javax.jms.JMSException;
 import javax.jms.ServerSessionPool;
 
+import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.client.FailoverListener;
 import org.jboss.jms.client.JBossConnectionConsumer;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -50,6 +51,7 @@
 import org.jboss.jms.wireformat.ConnectionStartRequest;
 import org.jboss.jms.wireformat.ConnectionStopRequest;
 import org.jboss.jms.wireformat.RequestSupport;
+import org.jboss.logging.Logger;
 import org.jboss.messaging.core.tx.MessagingXid;
 
 /**
@@ -66,7 +68,10 @@
 public class ClientConnectionDelegate extends DelegateSupport implements ConnectionDelegate
 {
    // Constants ------------------------------------------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(ClientConnectionDelegate.class);
 
+
    // Attributes -----------------------------------------------------------------------------------
 
    private int serverID;
@@ -97,7 +102,7 @@
       super.synchronizeWith(nd);
 
       ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)nd;
-
+      
       // synchronize the server endpoint state
 
       // this is a bit counterintuitve, as we're not copying from new delegate, but modifying its
@@ -105,7 +110,7 @@
       // server
 
       ConnectionState thisState = (ConnectionState)state;
-
+      
       if (thisState.getClientID() != null)
       {
          newDelegate.setClientID(thisState.getClientID());
@@ -119,23 +124,19 @@
 
       remotingConnection = newDelegate.getRemotingConnection();
       versionToUse = newDelegate.getVersionToUse();
-
+      
       // There is one RM per server, so we need to merge the rms if necessary
       ResourceManagerFactory.instance.handleFailover(serverID, newDelegate.getServerID());
       
       client = thisState.getRemotingConnection().getRemotingClient();
-
-      // start the connection again on the serverEndpoint if necessary
-      if (thisState.isStarted())
-      {
-         this.start();
-      }
+      
+      serverID = newDelegate.getServerID();
    }
    
    public void setState(HierarchicalState state)
    {
       super.setState(state);
-      
+                 
       client = ((ConnectionState)state).getRemotingConnection().getRemotingClient();
    }
 
@@ -297,7 +298,7 @@
 
    public String toString()
    {
-      return "ConnectionDelegate[" + id + ", SID=" + serverID + "]";
+      return "ConnectionDelegate[" + id + ", SID=" + serverID + "] " + System.identityHashCode(this);
    }
 
    // Protected ------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -37,6 +37,7 @@
 import org.jboss.jms.wireformat.ConsumerCancelInflightMessagesRequest;
 import org.jboss.jms.wireformat.ConsumerChangeRateRequest;
 import org.jboss.jms.wireformat.RequestSupport;
+import org.jboss.logging.Logger;
 
 /**
  * The client-side Consumer delegate class.
@@ -51,7 +52,10 @@
 public class ClientConsumerDelegate extends DelegateSupport implements ConsumerDelegate
 {
    // Constants ------------------------------------------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(ClientConsumerDelegate.class);
 
+
    // Attributes -----------------------------------------------------------------------------------
    
    private int bufferSize;
@@ -92,8 +96,7 @@
       maxDeliveries = newDelegate.getMaxDeliveries();
 
       client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
-         getRemotingClient();
-      
+         getRemotingClient();      
    }
    
    public void setState(HierarchicalState state)
@@ -214,7 +217,7 @@
 
    public String toString()
    {
-      return "ConsumerDelegate[" + id + "]";
+      return "ConsumerDelegate[" + id + "] " + System.identityHashCode(this);
    }
    
    public int getBufferSize()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -65,6 +65,7 @@
 import org.jboss.jms.wireformat.SessionRecoverDeliveriesRequest;
 import org.jboss.jms.wireformat.SessionSendRequest;
 import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
+import org.jboss.logging.Logger;
 
 /**
  * The client-side Session delegate class.
@@ -89,6 +90,9 @@
 
    // Static ---------------------------------------------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(ClientSessionDelegate.class);
+
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public ClientSessionDelegate(int objectID, int dupsOKBatchSize)
@@ -486,7 +490,7 @@
    
    public String toString()
    {
-      return "SessionDelegate[" + id + "]";
+      return "SessionDelegate[" + id + "] " + System.identityHashCode(this);
    }
    
    // Protected ------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -28,7 +28,9 @@
 
 import javax.jms.JMSException;
 
+import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.HierarchicalState;
+import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.util.MessagingJMSException;
 import org.jboss.jms.util.MessagingNetworkFailureException;
 import org.jboss.jms.wireformat.RequestSupport;
@@ -210,15 +212,6 @@
          log.warn("Captured Exception:" + t, t);
          return new MessagingNetworkFailureException((Exception)t);
       }
-//      else if (t instanceof Exception && t.getMessage().startsWith("Can not make remoting client invocation"))
-//      {
-//
-//         log.info("********** caught exception ", t);
-//            
-//         //FIXME Temporary HACK until http://jira.jboss.org/jira/browse/JBMESSAGING-891 is
-//         //fixed
-//         return new MessagingNetworkFailureException((Exception)t);
-//      }
       else         
       {
          log.error("Failed", t);
@@ -226,5 +219,10 @@
       }
    }
    
+   public Client getClient()
+   {
+      return client;
+   }
+   
    // Inner classes --------------------------------------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -236,8 +236,26 @@
     * Handles a message sent from the server
     * @param message The message
     */
-   public void handleMessage(Object message) throws Exception
+   
+
+   public void handleMessage(final Object message) throws Exception
    {
+      this.sessionExecutor.execute(
+               new Runnable() { public void run()
+               {
+                  try
+                  {
+                     handleMessageInternal(message);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to handle message", e);
+                  }
+               } });
+   }
+   
+   public void handleMessageInternal(Object message) throws Exception
+   {
       MessageProxy proxy = (MessageProxy) message;
 
       if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }

Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -169,6 +169,10 @@
 
          sessionDelegate.synchronizeWith(newSessionDelegate);
       }
+      
+      //We weren't picking up the new fcc before so new delegates were using the old fcc!!
+      fcc = newState.fcc;
+      fcc.setState(this);
    }
 
    // Public ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -64,8 +64,6 @@
       
       try
       {                           
-         log.info("Starting queue " + destination.getName());
-         
          postOffice = serverPeer.getPostOfficeInstance();
          
          destination.setServerPeer(serverPeer);
@@ -89,14 +87,11 @@
                
             // Must be done after load
             queue.setMaxSize(destination.getMaxSize());
-            queue.activate();
-            
-            log.info("Activated queue " + queue);             
+            queue.activate();           
          }
                      
          if (queue == null)
-         {
-            log.info("Queue was null so creating a new one");
+         {           
             // Create a new queue
             
             JMSCondition queueCond = new JMSCondition(true, destination.getName());

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -562,8 +562,6 @@
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
 
-      log.info("loading bindings, non cliustered only " + nonClusteredOnly);
-      
       try
       {
          conn = ds.getConnection();
@@ -595,8 +593,6 @@
             if (nonClusteredOnly && isClustered)
             {
                // Don't want to load clustered bindings
-               
-               log.info("it's a clustered binding not loading it since non clustered only");
             }
             else
             {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -132,21 +132,22 @@
 //      catch (Exception e)
 //      {
 //         log.error("Failed", e);
+//         throw e;
 //      }
 //      finally
 //      {      
-//         if (conn != null)
-//         {
-//            log.info("closing connetion");
-//            try
-//            {
-//               conn.close();
-//            }
-//            catch (Exception ignore)
-//            {               
-//            }
-//            log.info("closed connection");
-//         }
+////         if (conn != null)
+////         {
+////            log.info("closing connetion");
+////            try
+////            {
+////               conn.close();
+////            }
+////            catch (Exception ignore)
+////            {               
+////            }
+////            log.info("closed connection");
+////         }
 //      }     
 //   }
    

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -1684,7 +1684,7 @@
       failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
    }
 
-   public void testFailureRightAFterSendTransaction() throws Exception
+   public void testFailureRightAfterSendTransaction() throws Exception
    {
       Connection conn = null;
       Connection conn0 = null;

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java	2007-02-25 20:51:00 UTC (rev 2429)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java	2007-02-25 22:22:39 UTC (rev 2430)
@@ -51,10 +51,11 @@
    // Constants ------------------------------------------------------------------------------------
 
    // Attributes -----------------------------------------------------------------------------------
-   int messageCounterConsumer = 0;
-   int messageCounterProducer = 0;
-   boolean started = false;
-   boolean shouldStop = false;
+  
+   volatile int messageCounterConsumer = 0;
+   volatile int messageCounterProducer = 0;
+   volatile boolean started = false;
+   volatile boolean shouldStop = false;
 
    Object lockReader = new Object();
    Object lockWriter = new Object();
@@ -361,7 +362,7 @@
             t.start();
          }
 
-         Thread.sleep(1000); // time to everybody line up
+         Thread.sleep(2000); // time to everybody line up
          synchronized (semaphore)
          {
             started = true;
@@ -412,6 +413,7 @@
 
          for (Iterator iter = threadList.iterator(); iter.hasNext();)
          {
+            log.info("Waiting to join");
 
             LocalThread t = (LocalThread) iter.next();
 
@@ -565,7 +567,7 @@
             int counter = 0;
             while (true)
             {
-               Message message = consumer.receive(1000);
+               Message message = consumer.receive(5000);
                if (message == null && shouldStop)
                {
                   log.info("Finished execution of thread as shouldStop was true");




More information about the jboss-cvs-commits mailing list