[jboss-cvs] JBoss Messaging SVN: r1823 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/remoting tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/jms/clustering/base

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 19 01:51:05 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-19 01:51:01 -0500 (Tue, 19 Dec 2006)
New Revision: 1823

Modified:
   trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
   trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
Log:
Adding Valve and few other fixes

Modified: trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ValveAspect.java	2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/src/main/org/jboss/jms/client/container/ValveAspect.java	2006-12-19 06:51:01 UTC (rev 1823)
@@ -27,12 +27,14 @@
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.messaging.util.Valve;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.CannotConnectException;
 import org.jboss.remoting.ConnectionListener;
 import org.jboss.remoting.Client;
 import java.io.IOException;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
 
 /**
  * This aspect will intercept failures from any HA object.
@@ -56,12 +58,15 @@
 
    private ClientConnectionDelegate delegate;
 
-   private Valve valve = new Valve();
+   // alternative implementation
+   //private Valve valve = new Valve();
+   private ReadWriteLock lockValve;
 
    ValveAspect(ClientConnectionDelegate delegate, HAAspect copy)
    {
       super(copy);
       this.delegate = delegate;
+      lockValve = new ReentrantWriterPreferenceReadWriteLock();
 
       ConnectionListener listener = new ConnectionFailureListener(delegate);
 
@@ -91,14 +96,37 @@
       // Eventually retries in case of listed exceptions
       for (int i = 0; i < MAX_IO_RETRY_COUNT; i++)
       {
-         // We shouldn't have any calls being made while the failover is being executed
-         valve.isOpened(true);
 
          if (i > 0)
          {
             log.info("Retrying a call " + i);
          }
+
+
+         // An alternate way of playing with Valves is to set an attribute to true
+         // and wait for notifications until it's being set. Valve encpasulates this logic
+         //  if (i > 0)
+         //  {
+         //     valve.open(true);
+         //  }
+         //  else
+         //  {
+         //     valve.isOpened(true);
+         //  }
+
+         // We shouldn't have any calls being made while the failover is being executed
+         if (i > 0)
+         {
+            // On retries we will use writeLocks, as the failover was already being processed
+            lockValve.writeLock().acquire();
+         }
+         else
+         {
+            lockValve.readLock().acquire();
+         }
+
          failure = false;
+
          try
          {
             returnObject = invocation.invokeNext();
@@ -118,6 +146,23 @@
             log.error("ValveAspect didn't catch the exception " + e + ", and it will be forwarded", e);
             throw e;
          }
+         finally
+         {
+            // An alternate way of playing with Valves is to set an attribute to true
+            // and wait for notifications until it's being set. Valve encpasulates this logic
+            //if (i>0)
+            //{
+            //   valve.reset();
+            //}
+            if (i > 0)
+            {
+               lockValve.writeLock().release();
+            }
+            else
+            {
+               lockValve.readLock().release();
+            }
+         }
 
          if (!failure)
          {
@@ -128,7 +173,7 @@
 
       if (failure)
       {
-         handleConnectionFailure(delegate);
+         handleConnectionFailure(delegate, delegate.getRemotingConnection());
          // if on the end we still have an exception there is nothing we can do besides throw an exception
          // so, no retires on the failedOver Invocation
          returnObject = invocation.invokeNext();
@@ -151,37 +196,48 @@
     * it will just return all the others as soon as the valve is closed. This way all the simultaneous failures will
     * act as they were processed while we called failover only once.
     */
-   protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate) throws Exception
+   protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate,
+                                          JMSRemotingConnection jmsConnection) throws Exception
    {
-      Valve localValve = null;
+      log.info("Processing handleConnectionFailure ");
 
-      // The idea is to reset the Valve synchronized with a reset valve
-      synchronized (this) // I'm not sure if this synchronized is necessary. I will keep it here just to be safe
-      {
-         localValve = valve;
-      }
+      // only one execution should be performed if multiple exceptions happened at the same time
+      lockValve.writeLock().acquire();
 
-      // only one execution should be performed if multiple exceptions happened at the same time
-      if (localValve.open())
+      // An alternate way of playing with Valves is to set an attribute to true
+      // and wait for notifications until it's being set. Valve encpasulates this logic
+//      if (!valve.open())
+//      {
+//         return;
+//      }
+
+      try
       {
-         try
+         if (jmsConnection.isFailed())
          {
-            log.info("Processing valve on exception failure");
-            super.handleConnectionFailure(failedConnDelegate);
+            log.info("Failover on " + failedConnDelegate + " was already performed, so just ignoring call to handleConnectionFailure");
+            return;
          }
-         finally
-         {
-            localValve.close();
-            synchronized (this)
-            {
-               // reset the valve, so future exceptions will also get processed
-               valve = new Valve();
-            }
-         }
-      } else
+
+         log.info("Processing valve on exception failure");
+         jmsConnection.setFailed(true);
+         super.handleConnectionFailure(failedConnDelegate);
+         log.info("Failover on client finished");
+      }
+      catch (Exception e)
       {
-         log.info("The valve was closed, so this invocation waited another invocation to finish on handleFailure");
+         log.error("An exception happened during client failover processing!", e);
+         throw e;
       }
+      finally
+      {
+         // An alternate way of playing with Valves is to set an attribute to true
+         // and wait for notifications until it's being set. Valve encpasulates this logic
+//         valve.close();
+//         valve = new Valve();
+         lockValve.writeLock().release();
+      }
+
    }
 
    // Inner classes -------------------------------------------------
@@ -208,7 +264,7 @@
          try
          {
             log.debug(this + " is being notified of connection failure: " + throwable);
-            handleConnectionFailure(cd);
+            handleConnectionFailure(cd, cd.getRemotingConnection());
          }
          catch (Throwable e)
          {

Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2006-12-19 06:51:01 UTC (rev 1823)
@@ -61,6 +61,9 @@
    protected InvokerLocator serverLocator;
    protected CallbackManager callbackManager;
 
+   /** When a failover is performed, this flag is set to true */
+   protected boolean failed=false;
+
    // Constructors --------------------------------------------------
 
    public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Throwable
@@ -183,6 +186,17 @@
       return callbackManager;
    }
 
+
+   public boolean isFailed()
+   {
+      return failed;
+   }
+
+   public void setFailed(boolean failed)
+   {
+      this.failed = failed;
+   }
+
    public String toString()
    {
       return "JMSRemotingConnection[" + serverLocator.getLocatorURI() + "]";

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java	2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java	2006-12-19 06:51:01 UTC (rev 1823)
@@ -25,6 +25,8 @@
 import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.logging.Logger;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
@@ -49,8 +51,8 @@
       super(name);
    }
 
-   int messageCounterConsumer =0;
-   int messageCounterProducer=0;
+   int messageCounterConsumer = 0;
+   int messageCounterProducer = 0;
 
 
    Object lockReader = new Object();
@@ -88,12 +90,12 @@
             int counter = 0;
             while (true)
             {
-               Message message = consumer.receive(50);
-               if (message==null && shouldStop)
+               Message message = consumer.receive(5000);
+               if (message == null && shouldStop)
                {
                   break;
                }
-               if (message!=null)
+               if (message != null)
                {
                   synchronized (lockReader)
                   {
@@ -103,11 +105,11 @@
                   if (counter++ % 10 == 0)
                   {
                      //log.info("Commit on id=" + id);
-                     session.commit();
+                     //session.commit();
                   }
                }
             }
-            session.commit();
+            //session.commit();
          }
          catch (Exception e)
          {
@@ -154,7 +156,7 @@
                if (counter++ % 5 == 0)
                {
                   //log.info("Committing message");
-                  session.commit();
+                  //session.commit();
                }
             }
 
@@ -173,15 +175,36 @@
    public void testMultiThreadFailover() throws Exception
    {
       JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
-      Connection conn = factory.createConnection();
+
+      Connection conn1 = cf.createConnection();
+      Connection conn2 = cf.createConnection();
+      Connection conn3 = cf.createConnection();
+
+      log.info("Created connections");
+
+      checkConnectionsDifferentServers(new Connection[]{conn1, conn2, conn3});
+
+      Connection conn = getConnection(new Connection[]{conn1, conn2, conn3}, 1);
       conn.start();
 
+      for (int i = 0; i < 3; i++)
+      {
+         JBossConnection connTest = (JBossConnection)getConnection(new Connection[]{conn1, conn2, conn3}, i);
+
+         String locator = ((ClientConnectionDelegate) connTest.getDelegate()).getRemotingConnection().
+            getInvokingClient().getInvoker().getLocator().getLocatorURI();
+
+         log.info("Server " + i + " has locator=" + locator);
+
+      }
+
+
       ArrayList list = new ArrayList();
 
       for (int i = 0; i < 5; i++)
       {
-         list.add(new LocalThreadProducer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
-         list.add(new LocalThreadConsumer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
+         list.add(new LocalThreadProducer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
+         list.add(new LocalThreadConsumer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
       }
 
       for (Iterator iter = list.iterator(); iter.hasNext();)
@@ -199,11 +222,15 @@
       Thread.sleep(30000);
 
       log.info("Killing server 1");
+      ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed");
+      ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed", 2);
+      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
 
       ServerManagement.kill(1);
 
       Thread.sleep(50000);
-      shouldStop=true;
+      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
+      shouldStop = true;
 
       for (Iterator iter = list.iterator(); iter.hasNext();)
       {
@@ -211,7 +238,7 @@
          t.join();
       }
 
-      log.info("produced " + messageCounterProducer + " and read " + messageCounterConsumer);
+      log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
 
       assertEquals(messageCounterProducer, messageCounterConsumer);
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2006-12-19 06:51:01 UTC (rev 1823)
@@ -130,6 +130,24 @@
       super.tearDown();
    }
 
+   // lookup for the connection with the right serverID
+   // I'm using this method to find the proper serverId so I won't relay on loadBalancing policies on testcases
+   protected Connection getConnection(Connection[] conn, int serverId) throws Exception
+   {
+      for(int i = 0; i < conn.length; i++)
+      {
+         ConnectionState state = (ConnectionState)(((DelegateSupport)((JBossConnection)conn[i]).
+            getDelegate()).getState());
+
+         if (state.getServerID() == serverId)
+         {
+            return conn[i];
+         }
+      }
+
+      return null;
+   }
+
    protected void checkConnectionsDifferentServers(Connection[] conn) throws Exception
    {
       int[] serverID = new int[conn.length];




More information about the jboss-cvs-commits mailing list