[jboss-cvs] JBoss Messaging SVN: r1808 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/util tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/util

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 16 23:41:31 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-16 23:41:26 -0500 (Sat, 16 Dec 2006)
New Revision: 1808

Added:
   trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
Modified:
   trunk/src/main/org/jboss/jms/client/container/HAAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/util/Valve.java
   trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java
Log:
Adding ValveAspect on the failover

Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-17 04:41:26 UTC (rev 1808)
@@ -34,6 +34,8 @@
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.aop.Advised;
+import org.jboss.aop.advice.Interceptor;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
@@ -61,15 +63,15 @@
 import org.jboss.remoting.ConnectionListener;
 
 /**
- * 
+ *
  * A HAAspect
- * 
+ *
  * There is one of these PER_INSTANCE of connection factory
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * 
+ *
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
@@ -82,6 +84,8 @@
 
    public static final int MAX_RECONNECT_HOP_COUNT = 10;
 
+   public static final int MAX_IO_RETRY_COUNT = 10;
+
    // Static --------------------------------------------------------
 
    private static boolean trace = log.isTraceEnabled();
@@ -104,6 +108,14 @@
       id = null;
    }
 
+   /** A Copy Constructor Used on creationg of ValveAspect */
+   protected HAAspect(HAAspect copyFrom)
+   {
+      this.delegates = copyFrom.delegates;
+      this.failoverMap = copyFrom.failoverMap;
+      this.id = copyFrom.id;
+   }
+
    // Public --------------------------------------------------------
 
    public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
@@ -117,6 +129,10 @@
 
       cacheLocalDelegates(invocation);
 
+      // TODO: I wanted to change aop-messaging-client.xml to only execute handleCreateConnectionDelegate
+      //       on instances of ClusteredClientConnectionFactoryDelegate, but I couldn't find the right
+      //       pointcut expression. So, this is a to do.
+      //      However the following test is enough for now.
       if (delegates == null)
       {
          // not clustered, pass the invocation through
@@ -148,16 +164,14 @@
 
       ClientConnectionDelegate cd = (ClientConnectionDelegate)res.getDelegate();
 
+      // ValveAspect is supposed to be created per ClientConnectionDelegate
+      installValveAspect(cd, new ValveAspect(cd, this));
+
       if(trace) { log.trace(this + " got local connection delegate " + cd); }
 
       // Add a connection listener to detect failure; the consolidated remoting connection listener
       // must be already in place and configured
 
-      ConnectionListener listener = new ConnectionFailureListener(cd);
-
-      ((ConnectionState)((DelegateSupport)cd).getState()).
-         getRemotingConnectionListener().addDelegateListener(listener);
-
       return new CreateConnectionResult(cd);
    }
 
@@ -211,6 +225,62 @@
       }
    }
 
+   // Debug information about interceptors
+   protected void printInterceptors(Interceptor interceptors[])
+    {
+       if (interceptors==null || interceptors.length==0)
+       {
+          log.info("Interceptor chain is empty");
+       }
+       else
+       {
+          for (int i=0; i<interceptors.length; i++)
+          {
+             log.info("Interceptor[" + i + "] = " + interceptors[i].getName() + " className= " + interceptors[i].getClass().getName());
+          }
+       }
+    }
+   
+   /** The valve aspect needs to stay after ExceptionInterceptor, and before DelegateSupport.
+    *  This method will place the aspect on the proper place */
+   protected void installValveAspect(DelegateSupport delegate, Interceptor interceptor)
+   {
+      Advised advised = (Advised)delegate;
+      Interceptor interceptors[] = advised._getInstanceAdvisor().getInterceptors();
+
+      log.info("Installing interceptors");
+      printInterceptors(interceptors);
+
+
+      Interceptor delegateInterceptorFound = null;
+
+      for (int i=0;i<interceptors.length;i++)
+      {
+         if (interceptors[i] instanceof DelegateSupport)
+         {
+            delegateInterceptorFound = interceptors[i];
+         }
+      }
+
+
+      if (delegateInterceptorFound!=null)
+      {
+         advised._getInstanceAdvisor().removeInterceptor(delegateInterceptorFound.getName());
+      }
+
+      advised._getInstanceAdvisor().appendInterceptor(interceptor);
+
+      if (delegateInterceptorFound!=null)
+      {
+         advised._getInstanceAdvisor().appendInterceptor(delegateInterceptorFound);
+      }
+
+      log.info("Interceptors after installation:");
+      printInterceptors(advised._getInstanceAdvisor().getInterceptors());
+
+   }
+   
+
    //TODO this is currently hardcoded as round-robin, this should be made pluggable
    private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
    {
@@ -245,7 +315,7 @@
       return null;
    }
 
-   private void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate)
+   protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate)
       throws Exception
    {
       log.debug(this + " handling failed connection " + failedConnDelegate);
@@ -327,7 +397,7 @@
                              "Cannot find a server to failover onto.");
    }
 
-   private void performClientSideFailover(ClientConnectionDelegate failedConnDelegate,
+   protected void performClientSideFailover(ClientConnectionDelegate failedConnDelegate,
                                           ClientConnectionDelegate newConnDelegate)
       throws Exception
    {
@@ -348,14 +418,14 @@
 
       // We need to update some of the attributes on the state
       failedState.copyState(newState);
-      
+
       // Map of old session ID to new session state
       Map oldNewSessionStateMap = new HashMap();
 
       for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
       {
          SessionState failedSessionState = (SessionState)i.next();
-          
+
          int oldSessionID = failedSessionState.getSessionId();
 
          ClientSessionDelegate failedSessionDelegate =
@@ -367,9 +437,9 @@
                                   failedSessionState.isXA());
 
          SessionState newSessionState = (SessionState)newSessionDelegate.getState();
-         
+
          if (trace) { log.trace("new session state has " + newSessionState.getClientAckList().size() + " deliveries"); }
-         
+
          oldNewSessionStateMap.put(new Integer(oldSessionID), failedSessionState);
 
          failedSessionDelegate.copyAttributes(newSessionDelegate);
@@ -401,13 +471,13 @@
             }
          }
       }
-      
+
       // First we must tell the resource manager to substitute old session ID for new session ID.
       // Note we MUST submit the entire mapping in one operation since there may be overlap between
       // old and new session ID, and we don't want to overwrite keys in the map.
-      
+
       failedState.getResourceManager().handleFailover(oldNewSessionStateMap);
-      
+
       for(Iterator i = oldNewSessionStateMap.values().iterator(); i.hasNext(); )
       {
          List ackInfos = Collections.EMPTY_LIST;
@@ -415,16 +485,16 @@
          SessionState state = (SessionState)i.next();
 
          if (!state.isTransacted() ||
-             (state.isXA() && state.getCurrentTxId() == null))     
+             (state.isXA() && state.getCurrentTxId() == null))
          {
             // Non transacted session or an XA session with no transaction set (it falls back
             // to auto_ack)
-            
+
             if (trace) { log.trace(state + " is not transacted (or XA with no tx set), retrieving deliveries from session state"); }
 
             // We remove any unacked non-persistent messages - this is because we don't want to ack
             // them since the server won't know about them and will get confused
-                        
+
             if (state.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
             {
                for(Iterator j = state.getClientAckList().iterator(); j.hasNext(); )
@@ -436,13 +506,13 @@
                      if (trace) { log.trace("removed non persistent delivery " + info); }
                   }
                }
-               
+
                ackInfos = state.getClientAckList();
             }
             else
             {
                DeliveryInfo autoAck = state.getAutoAckInfo();
-               if (autoAck != null)                 
+               if (autoAck != null)
                {
                   if (!autoAck.getMessageProxy().getMessage().isReliable())
                   {
@@ -455,9 +525,9 @@
                      ackInfos = new ArrayList();
                      ackInfos.add(autoAck);
                   }
-               }               
+               }
             }
-            
+
             if (trace) { log.trace(this + " retrieved " + ackInfos.size() + " deliveries"); }
          }
          else
@@ -470,23 +540,23 @@
          }
 
          if (!ackInfos.isEmpty())
-         {                        
+         {
             SessionDelegate newDelegate = (SessionDelegate)state.getDelegate();
-            
+
             List recoveryInfos = new ArrayList();
-            
+
             for (Iterator iter2 = ackInfos.iterator(); iter2.hasNext(); )
             {
                DeliveryInfo info = (DeliveryInfo)iter2.next();
-               
+
                DeliveryRecovery recInfo =
-                  new DeliveryRecovery(info.getMessageProxy().getDeliveryId(),                                       
+                  new DeliveryRecovery(info.getMessageProxy().getDeliveryId(),
                                        info.getMessageProxy().getMessage().getMessageID(),
                                        info.getChannelId());
-               
+
                recoveryInfos.add(recInfo);
             }
-            
+
             if (trace) { log.trace(this + " sending delivery recovery info: " + recoveryInfos); }
             newDelegate.recoverDeliveries(recoveryInfos);
          }
@@ -545,7 +615,7 @@
 //         ResourceManager rm = failedConnectionState.getResourceManager();
 //
 //         todo - we need to replace the sesion id
-//         
+//
 //         rm.handleFailover(oldConsumerID, failedConsumerState.getConsumerID());
 //      }
 
@@ -614,37 +684,6 @@
 
    }
 
-   // Inner classes -------------------------------------------------
-
-   private class ConnectionFailureListener implements ConnectionListener
-   {
-      private ClientConnectionDelegate cd;
-
-      ConnectionFailureListener(ClientConnectionDelegate cd)
-      {
-         this.cd = cd;
-      }
-
-      // ConnectionListener implementation ---------------------------
-
-      public void handleConnectionException(Throwable throwable, Client client)
-      {
-         try
-         {
-            log.debug(this + " is being notified of connection failure: " + throwable);
-            handleConnectionFailure(cd);
-         }
-         catch (Throwable e)
-         {
-            log.error("Caught exception in handling failure", e);
-         }
-      }
-
-      public String toString()
-      {
-         return "ConnectionFailureListener[" + cd + "]";
-      }
-   }
 }
 
 

Added: trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ValveAspect.java	2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/client/container/ValveAspect.java	2006-12-17 04:41:26 UTC (rev 1808)
@@ -0,0 +1,227 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.client.container;
+
+import org.jboss.aop.advice.Interceptor;
+import org.jboss.aop.joinpoint.Invocation;
+import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.util.Valve;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.CannotConnectException;
+import org.jboss.remoting.ConnectionListener;
+import org.jboss.remoting.Client;
+import java.io.IOException;
+import javax.jms.JMSException;
+
+/**
+ * This aspect will intercept failures from any HA object.
+ * <p/>
+ * The reason why I've made an extension of HAAspect instead of implementing new methods there is
+ * ValveAspect needs to cache ClientConnectionDelegate while HAAspect needs to cache CF related objects.
+ * I have made this an extension of HAAspect as it's needed one instance of this aspect per
+ * ConnectionCreated.
+ * <p/>
+ * We will cache the ClientConnectionDelegate on this aspect so we won't need to do any operation on delegates
+ * to retrieve the current ConnectionDelegate.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class ValveAspect extends HAAspect implements Interceptor
+{
+   private static final Logger log = Logger.getLogger(ValveAspect.class);
+
+   private ClientConnectionDelegate delegate;
+
+   private Valve valve = new Valve();
+
+   ValveAspect(ClientConnectionDelegate delegate, HAAspect copy)
+   {
+      super(copy);
+      this.delegate = delegate;
+
+      ConnectionListener listener = new ConnectionFailureListener(delegate);
+
+      ((ConnectionState) ((DelegateSupport) delegate).getState()).
+         getRemotingConnectionListener().addDelegateListener(listener);
+
+
+   }
+
+   public String getName()
+   {
+      return this.getClass().getName();
+   }
+
+
+   /**
+    * This method executes the valve, listening for erros on the underlaying IO layer,
+    * and it will call the failure for HA
+    */
+   public Object invoke(Invocation invocation) throws Throwable
+   {
+
+      Object returnObject = null;
+
+      boolean failure = false;
+
+      // Eventually retries in case of listed exceptions
+      for (int i = 0; i < MAX_IO_RETRY_COUNT; i++)
+      {
+         // We shouldn't have any calls being made while the failover is being executed
+         valve.isOpened(true);
+
+         if (i > 0)
+         {
+            log.info("Retrying a call " + i);
+         }
+         failure = false;
+         try
+         {
+            returnObject = invocation.invokeNext();
+         }
+         catch (CannotConnectException e)
+         {
+            log.error("Got an exception on HAAspect, retryCount=" + i, e);
+            failure = true;
+         }
+         catch (IOException e)
+         {
+            log.error("Got an exception on HAAspect, retryCount=" + i, e);
+            failure = true;
+         }
+         catch (Throwable e)
+         {
+            log.error("ValveAspect didn't catch the exception " + e + ", and it will be forwarded", e);
+            throw e;
+         }
+
+         if (!failure)
+         {
+            break;
+         }
+      }
+
+
+      if (failure)
+      {
+         handleConnectionFailure(delegate);
+         // if on the end we still have an exception there is nothing we can do besides throw an exception
+         // so, no retires on the failedOver Invocation
+         returnObject = invocation.invokeNext();
+      }
+
+      // if the object returned is another DelegateSupport, we will install the aspect on the returned object
+
+      return returnObject;
+
+   }
+
+
+   /**
+    * Since we are listening for exceptions on the invocation layer, several objects might
+    * get the exception at the same time.
+    * Suppose you have 30 (or any X number>=2) Consumers, using the same JBossConnection failing at the same time.
+    * We will get simultaneous calls on handleFailures while we just need to process one single failure.
+    * <p/>
+    * On this case this method will open a valve and it will perform only the first handleFailure captured, and
+    * it will just return all the others as soon as the valve is closed. This way all the simultaneous failures will
+    * act as they were processed while we called failover only once.
+    */
+   protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate) throws Exception
+   {
+      Valve localValve = null;
+
+      // The idea is to reset the Valve synchronized with a reset valve
+      synchronized (this) // I'm not sure if this synchronized is necessary. I will keep it here just to be safe
+      {
+         localValve = valve;
+      }
+
+      // only one execution should be performed if multiple exceptions happened at the same time
+      if (localValve.open())
+      {
+         try
+         {
+            log.info("Processing valve on exception failure");
+            super.handleConnectionFailure(failedConnDelegate);
+         }
+         finally
+         {
+            localValve.close();
+            synchronized (this)
+            {
+               // reset the valve, so future exceptions will also get processed
+               valve = new Valve();
+            }
+         }
+      } else
+      {
+         log.info("The valve was closed, so this invocation waited another invocation to finish on handleFailure");
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+
+   /** I have moved this ConnectionListener to ValveAspect (from HAAspect) because
+    *  it needs to use the same valve as exception listeners.
+    *  While we are processing failover, we should block any calls on the client side.
+    *  (No call should be made while the client failover is being executed). It doesn't matter if
+    *  the failover was captured by Lease (ConnectionFactory) or Exception handling on invoke at this class */
+   private class ConnectionFailureListener implements ConnectionListener
+   {
+      private ClientConnectionDelegate cd;
+
+      ConnectionFailureListener(ClientConnectionDelegate cd)
+      {
+         this.cd = cd;
+      }
+
+      // ConnectionListener implementation ---------------------------
+
+      public void handleConnectionException(Throwable throwable, Client client)
+      {
+         try
+         {
+            log.debug(this + " is being notified of connection failure: " + throwable);
+            handleConnectionFailure(cd);
+         }
+         catch (Throwable e)
+         {
+            log.error("Caught exception in handling failure", e);
+         }
+      }
+
+      public String toString()
+      {
+         return "ConnectionFailureListener[" + cd + "]";
+      }
+   }
+
+
+}

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2006-12-17 04:41:26 UTC (rev 1808)
@@ -94,7 +94,8 @@
 
    public String getName()
    {
-      return "Invoker";
+      // it's needed a meaninful name to change the aop stack programatically (HA uses that)
+      return this.getClass().getName();
    }
 
    /**

Modified: trunk/src/main/org/jboss/jms/util/Valve.java
===================================================================
--- trunk/src/main/org/jboss/jms/util/Valve.java	2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/src/main/org/jboss/jms/util/Valve.java	2006-12-17 04:41:26 UTC (rev 1808)
@@ -22,7 +22,9 @@
 
 package org.jboss.jms.util;
 
+import org.jboss.logging.Logger;
 
+
 /**
  * This class is used to guarantee only one thread will be performing a given function, and if any other
  * thread tries to execute the same functionality it will just ignored
@@ -54,10 +56,47 @@
  *  */
 public class Valve
 {
+   private static final Logger log = Logger.getLogger(Valve.class);
+   private boolean trace = log.isTraceEnabled();
+
    boolean opened;
    boolean closed;
 
+   Thread threadOwner;
 
+   int refereceCountOpen=0;
+
+
+   public synchronized boolean isOpened()
+   {
+      return opened;
+   }
+
+   /** If the Valve is opened, will wait until the valve is closed */
+   public synchronized boolean isOpened(boolean wait) throws Exception
+   {
+      if (wait && opened)
+      {
+         if (!closed && threadOwner != Thread.currentThread())
+         {
+            if (trace) log.trace("threadOwner= " + threadOwner + " and currentThread=" + Thread.currentThread());
+            if (trace) log.trace("Waiting valve to be closed");
+            this.wait();
+            if (trace) log.trace("Valve was closed");
+         }
+         else
+         {
+            if (trace) log.trace("This is ThreadOwner, so Valve won't wait");
+         }
+         return opened;
+      }
+      else
+      {
+         return false;
+      }
+
+   }
+
    public boolean open() throws Exception
    {
       return open(true);
@@ -65,9 +104,16 @@
 
    public synchronized boolean open(boolean wait) throws Exception
    {
+      if (threadOwner==Thread.currentThread())
+      {
+         if (trace) log.trace("Valve was opened again by thread owner");
+         refereceCountOpen++;
+         return true;
+      }
       // already opened? then needs to wait to be closed
       if (opened)
       {
+         if (trace) log.trace("Valve being opened and time.wait");
          // if not closed yet, will wait to be closed
          if (!closed)
          {
@@ -79,7 +125,10 @@
          return false;
       } else
       {
+         if (trace) log.trace("Valve being opened and this thread is the owner for this lock");
+         refereceCountOpen++;
          opened = true;
+         threadOwner = Thread.currentThread();
          return true;
       }
    }
@@ -92,9 +141,18 @@
       }
       if (closed)
       {
-         throw new IllegalStateException("Valve is already closed");
+         log.warn("Valve was already closed", new Exception());
       }
-      closed = true;
-      notifyAll();
+      refereceCountOpen--;
+      if (refereceCountOpen==0)
+      {
+         if (trace) log.trace("Closing Valve");
+         closed = true;
+         notifyAll();
+      }
+      else
+      {
+         if (trace) log.trace("Valve.close called but there referenceCountOpen=" + refereceCountOpen);
+      }
    }
 }

Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java	2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java	2006-12-17 04:41:26 UTC (rev 1808)
@@ -0,0 +1,237 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.logging.Logger;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class ValveTest extends ClusteringTestBase
+{
+
+   public ValveTest(String name)
+   {
+      super(name);
+   }
+
+   int messageCounterConsumer =0;
+   int messageCounterProducer=0;
+
+
+   Object lockReader = new Object();
+   Object lockWriter = new Object();
+   Object semaphore = new Object();
+
+   boolean shouldStop = false;
+
+
+   class LocalThreadConsumer extends Thread
+   {
+      private final Logger log = Logger.getLogger(this.getClass());
+
+      int id;
+      MessageConsumer consumer;
+      Session session;
+
+      public LocalThreadConsumer(int id, Session session, Destination destination) throws Exception
+      {
+         consumer = session.createConsumer(destination);
+         this.session = session;
+         this.id = id;
+      }
+
+
+      public void run()
+      {
+         try
+         {
+            synchronized (semaphore)
+            {
+               semaphore.wait();
+            }
+
+            int counter = 0;
+            while (true)
+            {
+               Message message = consumer.receive(50);
+               if (message==null && shouldStop)
+               {
+                  break;
+               }
+               if (message!=null)
+               {
+                  synchronized (lockReader)
+                  {
+                     messageCounterConsumer++;
+                  }
+                  log.trace("ReceiverID=" + id + " received message " + message);
+                  if (counter++ % 10 == 0)
+                  {
+                     //log.info("Commit on id=" + id);
+                     session.commit();
+                  }
+               }
+            }
+            session.commit();
+         }
+         catch (Exception e)
+         {
+            log.info("Caught exception... finishing Thread " + id, e);
+         }
+      }
+   }
+
+   class LocalThreadProducer extends Thread
+   {
+      private final Logger log = Logger.getLogger(this.getClass());
+
+      MessageProducer producer;
+      Session session;
+      int id;
+
+      public LocalThreadProducer(int id, Session session, Destination destination) throws Exception
+      {
+         this.session = session;
+         producer = session.createProducer(destination);
+         this.id = id;
+      }
+
+      public void run()
+      {
+         try
+         {
+            synchronized (semaphore)
+            {
+               semaphore.wait();
+            }
+
+            int counter = 0;
+            while (!shouldStop)
+            {
+               log.trace("Producer ID=" + id + " send message");
+               producer.send(session.createTextMessage("Message from producer " + id + " counter=" + (counter)));
+
+               synchronized (lockWriter)
+               {
+                  messageCounterProducer++;
+               }
+
+               if (counter++ % 5 == 0)
+               {
+                  //log.info("Committing message");
+                  session.commit();
+               }
+            }
+
+         }
+         catch (Exception e)
+         {
+            log.info("Caught exception... finishing Thread " + id, e);
+         }
+      }
+   }
+
+   /**
+    * This test will open several Consumers at the same Connection and it will kill the server, expecting failover
+    * to happen inside the Valve
+    */
+   public void testMultiThreadFailover() throws Exception
+   {
+      JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
+      Connection conn = factory.createConnection();
+      conn.start();
+
+      ArrayList list = new ArrayList();
+
+      for (int i = 0; i < 5; i++)
+      {
+         list.add(new LocalThreadProducer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
+         list.add(new LocalThreadConsumer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
+      }
+
+      for (Iterator iter = list.iterator(); iter.hasNext();)
+      {
+         Thread t = (Thread) iter.next();
+         t.start();
+      }
+
+      Thread.sleep(1000);
+      synchronized (semaphore)
+      {
+         semaphore.notifyAll();
+      }
+
+      Thread.sleep(30000);
+
+      log.info("Killing server 1");
+
+      ServerManagement.kill(1);
+
+      Thread.sleep(50000);
+      shouldStop=true;
+
+      for (Iterator iter = list.iterator(); iter.hasNext();)
+      {
+         Thread t = (Thread) iter.next();
+         t.join();
+      }
+
+      log.info("produced " + messageCounterProducer + " and read " + messageCounterConsumer);
+
+      assertEquals(messageCounterProducer, messageCounterConsumer);
+
+
+   }
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      nodeCount = 3;
+
+      super.setUp();
+
+      log.debug("setup done");
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+}

Modified: trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java	2006-12-16 17:43:33 UTC (rev 1807)
+++ trunk/tests/src/org/jboss/test/messaging/util/VeryBasicValveTest.java	2006-12-17 04:41:26 UTC (rev 1808)
@@ -81,11 +81,18 @@
 
                //Thread.sleep(1000);
 
+               valve.open(); // stack vavles
+               if (!valve.isOpened(true))
+               {
+                  fail("Valve should be opened");
+               }
+
                synchronized (VeryBasicValveTest.class)
                {
                   counter ++;
                }
                valve.close();
+               valve.close();
             }
 
             //log.info("Thread " + threadId + " is now closing the valve");




More information about the jboss-cvs-commits mailing list