[jboss-cvs] JBoss Messaging SVN: r1823 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/remoting tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/jms/clustering/base
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 19 01:51:05 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-19 01:51:01 -0500 (Tue, 19 Dec 2006)
New Revision: 1823
Modified:
trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
Log:
Adding Valve and few other fixes
Modified: trunk/src/main/org/jboss/jms/client/container/ValveAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ValveAspect.java 2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/src/main/org/jboss/jms/client/container/ValveAspect.java 2006-12-19 06:51:01 UTC (rev 1823)
@@ -27,12 +27,14 @@
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.messaging.util.Valve;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.logging.Logger;
import org.jboss.remoting.CannotConnectException;
import org.jboss.remoting.ConnectionListener;
import org.jboss.remoting.Client;
import java.io.IOException;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
/**
* This aspect will intercept failures from any HA object.
@@ -56,12 +58,15 @@
private ClientConnectionDelegate delegate;
- private Valve valve = new Valve();
+ // alternative implementation
+ //private Valve valve = new Valve();
+ private ReadWriteLock lockValve;
ValveAspect(ClientConnectionDelegate delegate, HAAspect copy)
{
super(copy);
this.delegate = delegate;
+ lockValve = new ReentrantWriterPreferenceReadWriteLock();
ConnectionListener listener = new ConnectionFailureListener(delegate);
@@ -91,14 +96,37 @@
// Eventually retries in case of listed exceptions
for (int i = 0; i < MAX_IO_RETRY_COUNT; i++)
{
- // We shouldn't have any calls being made while the failover is being executed
- valve.isOpened(true);
if (i > 0)
{
log.info("Retrying a call " + i);
}
+
+
+ // An alternate way of playing with Valves is to set an attribute to true
+ // and wait for notifications until it's being set. Valve encpasulates this logic
+ // if (i > 0)
+ // {
+ // valve.open(true);
+ // }
+ // else
+ // {
+ // valve.isOpened(true);
+ // }
+
+ // We shouldn't have any calls being made while the failover is being executed
+ if (i > 0)
+ {
+ // On retries we will use writeLocks, as the failover was already being processed
+ lockValve.writeLock().acquire();
+ }
+ else
+ {
+ lockValve.readLock().acquire();
+ }
+
failure = false;
+
try
{
returnObject = invocation.invokeNext();
@@ -118,6 +146,23 @@
log.error("ValveAspect didn't catch the exception " + e + ", and it will be forwarded", e);
throw e;
}
+ finally
+ {
+ // An alternate way of playing with Valves is to set an attribute to true
+ // and wait for notifications until it's being set. Valve encpasulates this logic
+ //if (i>0)
+ //{
+ // valve.reset();
+ //}
+ if (i > 0)
+ {
+ lockValve.writeLock().release();
+ }
+ else
+ {
+ lockValve.readLock().release();
+ }
+ }
if (!failure)
{
@@ -128,7 +173,7 @@
if (failure)
{
- handleConnectionFailure(delegate);
+ handleConnectionFailure(delegate, delegate.getRemotingConnection());
// if on the end we still have an exception there is nothing we can do besides throw an exception
// so, no retires on the failedOver Invocation
returnObject = invocation.invokeNext();
@@ -151,37 +196,48 @@
* it will just return all the others as soon as the valve is closed. This way all the simultaneous failures will
* act as they were processed while we called failover only once.
*/
- protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate) throws Exception
+ protected void handleConnectionFailure(ClientConnectionDelegate failedConnDelegate,
+ JMSRemotingConnection jmsConnection) throws Exception
{
- Valve localValve = null;
+ log.info("Processing handleConnectionFailure ");
- // The idea is to reset the Valve synchronized with a reset valve
- synchronized (this) // I'm not sure if this synchronized is necessary. I will keep it here just to be safe
- {
- localValve = valve;
- }
+ // only one execution should be performed if multiple exceptions happened at the same time
+ lockValve.writeLock().acquire();
- // only one execution should be performed if multiple exceptions happened at the same time
- if (localValve.open())
+ // An alternate way of playing with Valves is to set an attribute to true
+ // and wait for notifications until it's being set. Valve encpasulates this logic
+// if (!valve.open())
+// {
+// return;
+// }
+
+ try
{
- try
+ if (jmsConnection.isFailed())
{
- log.info("Processing valve on exception failure");
- super.handleConnectionFailure(failedConnDelegate);
+ log.info("Failover on " + failedConnDelegate + " was already performed, so just ignoring call to handleConnectionFailure");
+ return;
}
- finally
- {
- localValve.close();
- synchronized (this)
- {
- // reset the valve, so future exceptions will also get processed
- valve = new Valve();
- }
- }
- } else
+
+ log.info("Processing valve on exception failure");
+ jmsConnection.setFailed(true);
+ super.handleConnectionFailure(failedConnDelegate);
+ log.info("Failover on client finished");
+ }
+ catch (Exception e)
{
- log.info("The valve was closed, so this invocation waited another invocation to finish on handleFailure");
+ log.error("An exception happened during client failover processing!", e);
+ throw e;
}
+ finally
+ {
+ // An alternate way of playing with Valves is to set an attribute to true
+ // and wait for notifications until it's being set. Valve encpasulates this logic
+// valve.close();
+// valve = new Valve();
+ lockValve.writeLock().release();
+ }
+
}
// Inner classes -------------------------------------------------
@@ -208,7 +264,7 @@
try
{
log.debug(this + " is being notified of connection failure: " + throwable);
- handleConnectionFailure(cd);
+ handleConnectionFailure(cd, cd.getRemotingConnection());
}
catch (Throwable e)
{
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-12-19 06:51:01 UTC (rev 1823)
@@ -61,6 +61,9 @@
protected InvokerLocator serverLocator;
protected CallbackManager callbackManager;
+ /** When a failover is performed, this flag is set to true */
+ protected boolean failed=false;
+
// Constructors --------------------------------------------------
public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Throwable
@@ -183,6 +186,17 @@
return callbackManager;
}
+
+ public boolean isFailed()
+ {
+ return failed;
+ }
+
+ public void setFailed(boolean failed)
+ {
+ this.failed = failed;
+ }
+
public String toString()
{
return "JMSRemotingConnection[" + serverLocator.getLocatorURI() + "]";
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java 2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ValveTest.java 2006-12-19 06:51:01 UTC (rev 1823)
@@ -25,6 +25,8 @@
import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.logging.Logger;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@@ -49,8 +51,8 @@
super(name);
}
- int messageCounterConsumer =0;
- int messageCounterProducer=0;
+ int messageCounterConsumer = 0;
+ int messageCounterProducer = 0;
Object lockReader = new Object();
@@ -88,12 +90,12 @@
int counter = 0;
while (true)
{
- Message message = consumer.receive(50);
- if (message==null && shouldStop)
+ Message message = consumer.receive(5000);
+ if (message == null && shouldStop)
{
break;
}
- if (message!=null)
+ if (message != null)
{
synchronized (lockReader)
{
@@ -103,11 +105,11 @@
if (counter++ % 10 == 0)
{
//log.info("Commit on id=" + id);
- session.commit();
+ //session.commit();
}
}
}
- session.commit();
+ //session.commit();
}
catch (Exception e)
{
@@ -154,7 +156,7 @@
if (counter++ % 5 == 0)
{
//log.info("Committing message");
- session.commit();
+ //session.commit();
}
}
@@ -173,15 +175,36 @@
public void testMultiThreadFailover() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
- Connection conn = factory.createConnection();
+
+ Connection conn1 = cf.createConnection();
+ Connection conn2 = cf.createConnection();
+ Connection conn3 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(new Connection[]{conn1, conn2, conn3});
+
+ Connection conn = getConnection(new Connection[]{conn1, conn2, conn3}, 1);
conn.start();
+ for (int i = 0; i < 3; i++)
+ {
+ JBossConnection connTest = (JBossConnection)getConnection(new Connection[]{conn1, conn2, conn3}, i);
+
+ String locator = ((ClientConnectionDelegate) connTest.getDelegate()).getRemotingConnection().
+ getInvokingClient().getInvoker().getLocator().getLocatorURI();
+
+ log.info("Server " + i + " has locator=" + locator);
+
+ }
+
+
ArrayList list = new ArrayList();
for (int i = 0; i < 5; i++)
{
- list.add(new LocalThreadProducer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
- list.add(new LocalThreadConsumer(i, conn.createSession(true, Session.AUTO_ACKNOWLEDGE), queue[1]));
+ list.add(new LocalThreadProducer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
+ list.add(new LocalThreadConsumer(i, conn.createSession(false, Session.AUTO_ACKNOWLEDGE), queue[1]));
}
for (Iterator iter = list.iterator(); iter.hasNext();)
@@ -199,11 +222,15 @@
Thread.sleep(30000);
log.info("Killing server 1");
+ ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed");
+ ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed", 2);
+ log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
ServerManagement.kill(1);
Thread.sleep(50000);
- shouldStop=true;
+ log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
+ shouldStop = true;
for (Iterator iter = list.iterator(); iter.hasNext();)
{
@@ -211,7 +238,7 @@
t.join();
}
- log.info("produced " + messageCounterProducer + " and read " + messageCounterConsumer);
+ log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" + messageCounterProducer);
assertEquals(messageCounterProducer, messageCounterConsumer);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2006-12-19 06:28:23 UTC (rev 1822)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2006-12-19 06:51:01 UTC (rev 1823)
@@ -130,6 +130,24 @@
super.tearDown();
}
+ // lookup for the connection with the right serverID
+ // I'm using this method to find the proper serverId so I won't relay on loadBalancing policies on testcases
+ protected Connection getConnection(Connection[] conn, int serverId) throws Exception
+ {
+ for(int i = 0; i < conn.length; i++)
+ {
+ ConnectionState state = (ConnectionState)(((DelegateSupport)((JBossConnection)conn[i]).
+ getDelegate()).getState());
+
+ if (state.getServerID() == serverId)
+ {
+ return conn[i];
+ }
+ }
+
+ return null;
+ }
+
protected void checkConnectionsDifferentServers(Connection[] conn) throws Exception
{
int[] serverID = new int[conn.length];
More information about the jboss-cvs-commits
mailing list