[jboss-cvs] JBoss Messaging SVN: r1725 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/bin tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/jms/clustering/base tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 8 07:15:53 EST 2006
Author: timfox
Date: 2006-12-08 07:15:43 -0500 (Fri, 08 Dec 2006)
New Revision: 1725
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
branches/Branch_Client_Failover_Experiment/tests/bin/runtest
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
Get ManualClusteringTest to run
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -81,6 +81,8 @@
{
if (getServers(invocation) != null)
{
+ log.info("Clustered createConnection");
+
//In a clustered configuration we create connections in a round-robin fashion
//from the available servers
@@ -168,7 +170,7 @@
ConnectionState state = (ConnectionState)((DelegateSupport)connDelegate).getState();
- state.getRemotingConnection().getInvokingClient().addConnectionListener(new Listener(connDelegate));
+ //state.getRemotingConnection().getInvokingClient().addConnectionListener(new Listener(connDelegate));
return connDelegate;
}
@@ -427,6 +429,8 @@
Listener(ClientConnectionDelegate connection)
{
this.connection = connection;
+
+ log.info("************* CREATING LISTENER");
}
public void handleConnectionException(Throwable throwable, Client client)
@@ -439,6 +443,7 @@
}
catch (Throwable e)
{
+ log.error("Caught exception in handling failure", e);
e.printStackTrace();
}
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -40,6 +40,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+
+import javax.jms.TextMessage;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import org.jboss.jms.server.QueuedExecutorPool;
@@ -417,6 +419,18 @@
log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
}
+ //debug
+ try
+ {
+ TextMessage tm = (TextMessage)ref.getMessage();
+
+ log.info(this.currentNodeId + " *********** Routing ref: " + tm.getText() + " with condition " + condition + " and transaction " + tx);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
if (ref == null)
{
throw new IllegalArgumentException("Message reference is null");
@@ -488,9 +502,13 @@
if (trace)
{
log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
- queue.getNodeId() +" local:" + queue.isLocal());
+ queue.getNodeId() + " local:" + queue.isLocal());
+
}
+ log.info(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
+ queue.getNodeId() + " local:" + queue.isLocal());
+
if (router.numberOfReceivers() > 1)
{
//We have now chosen which one will receive the message so we need to add this
@@ -820,6 +838,11 @@
log.trace(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
routingKey + " map " + queueNameNodeIdMap);
}
+
+ log.info(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
+ routingKey + " map " + queueNameNodeIdMap);
+
+
lock.readLock().acquire();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -21,6 +21,12 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.TextMessage;
+
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
@@ -28,10 +34,6 @@
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.tx.Transaction;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
/**
*
* A DefaultRouter
@@ -166,15 +168,15 @@
{
if (trace) { log.trace(this + " routing ref " + reference); }
- //Favour the local queue or the failedOver queue in round robbin
+ //Favour the local queue or the failedOver queue in round robin
if (!failedOverQueues.isEmpty())
{
-
- if (trace) { log.trace("Round robbing on FailedOver queue, currentTarget=" + target);}
+ if (trace) { log.trace("Round robin on FailedOver queue, currentTarget=" + target);}
+
LocalClusteredQueue queueToUse = null;
- if (target==-1)
+ if (target == -1)
{
queueToUse = (LocalClusteredQueue)this.localQueue;
}
@@ -185,18 +187,30 @@
incTargetFailedOver();
+ log.info("***************** Routing to failed over queue");
Delivery del = queueToUse.handle(observer, reference, tx);
if (trace) { log.trace(this+" routed to failed queue, using failedOver round robbing, returned " + del); }
return del;
}
- else
- if (localQueue != null)
+ else if (localQueue != null)
{
//The only time the local queue won't accept is if the selector doesn't
//match - in which case it won't match at any other nodes too so no point
//in trying them
+
+ //debug
+ try
+ {
+ TextMessage tm = (TextMessage)reference.getMessage();
+
+ log.info("*********** Routing to local queue: " + tm.getText() + " id:" + System.identityHashCode(localQueue) );
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
Delivery del = localQueue.handle(observer, reference, tx);
@@ -215,6 +229,7 @@
queue = (ClusteredQueue)nonLocalQueues.get(target);
+ log.info("************ Routing to non local queue");
Delivery del = queue.handle(observer, reference, tx);
if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -148,6 +148,8 @@
{
if (trace) { log.trace("Handling ref from cluster: " + ref); }
+ log.info("********** Handling ref from cluster: " + ref);
+
if (filter != null && !filter.accept(ref))
{
Delivery del = new SimpleDelivery(this, ref, true, false);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -107,6 +107,8 @@
{
Delivery del = new SimpleDelivery(this, reference, false, false);
+ log.info("********** doesn't match filter");
+
return del;
}
@@ -126,6 +128,8 @@
return null;
}
}
+
+ log.info("*********** accepting message:" + reference);
return new SimpleDelivery(this, reference, false);
}
Modified: branches/Branch_Client_Failover_Experiment/tests/bin/runtest
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/bin/runtest 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/bin/runtest 2006-12-08 12:15:43 UTC (rev 1725)
@@ -42,8 +42,6 @@
TEST_CLUSTERED=$ENV_TEST_CLUSTERED
fi
-TEST_DATABASE=mysql
-
#
# We should use the same test execution classpath as the ant <junit> task, so we run ant to get
# it from there.
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -21,10 +21,20 @@
*/
package org.jboss.test.messaging.jms.clustering;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.state.ConnectionState;
import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
-import javax.jms.*;
-
/**
*
* A ManualClusteringTest
@@ -57,8 +67,7 @@
public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
{
- System.out.println("xxxxxxxxxxxx");
-// clusteredQueueLocalConsumer(false);
+ clusteredQueueLocalConsumer(false);
}
public void testClusteredQueueLocalConsumerPersistent() throws Exception
@@ -127,8 +136,34 @@
protected void tearDown() throws Exception
{
- super.tearDown();
+ super.tearDown();
}
+
+ private void checkConnectionsDifferentServers(Connection conn, Connection conn1, Connection conn2)
+ {
+ ConnectionState state = (ConnectionState)(((DelegateSupport)((JBossConnection)conn).getDelegate()).getState());
+
+ ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+
+ ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+
+ int serverID = state.getServerID();
+
+ int serverID1 = state1.getServerID();
+
+ int serverID2 = state2.getServerID();
+
+ log.info("Server id:" + serverID);
+
+ log.info("Server id1:" + serverID1);
+
+ log.info("Server id2:" + serverID2);
+
+
+ assertTrue(serverID != serverID1);
+
+ assertTrue(serverID1 != serverID2);
+ }
/*
* Create a consumer on each queue on each node.
@@ -143,17 +178,27 @@
try
{
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
conn = cf.createConnection();
- conn1 = cf1.createConnection();
- conn2 = cf2.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn, conn1, conn2);
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("Created sessions");
MessageConsumer cons = sess.createConsumer(queue);
MessageConsumer cons1 = sess1.createConsumer(queue1);
MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+ log.info("Created consumers");
conn.start();
conn1.start();
@@ -173,15 +218,19 @@
prod.send(tm);
}
+
+ log.info("Sent messages");
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = (TextMessage)cons.receive(1000);
assertNotNull(tm);
-
+
+ log.info("1 GOT MESSAGE: " + tm.getText());
+
assertEquals("message" + i, tm.getText());
- }
+ }
Message m = cons1.receive(2000);
@@ -283,9 +332,15 @@
Connection conn2 = null;
try
{
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
conn = cf.createConnection();
- conn1 = cf1.createConnection();
- conn2 = cf2.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn, conn1, conn2);
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -395,9 +450,15 @@
try
{
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
conn = cf.createConnection();
- conn1 = cf1.createConnection();
- conn2 = cf2.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn, conn1, conn2);
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -529,9 +590,15 @@
Connection conn2 = null;
try
{
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
conn = cf.createConnection();
- conn1 = cf1.createConnection();
- conn2 = cf2.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn, conn1, conn2);
conn.setClientID("wib1");
conn1.setClientID("wib1");
@@ -683,10 +750,15 @@
try
{
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
conn = cf.createConnection();
- conn1 = cf1.createConnection();
- conn2 = cf2.createConnection();
-
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn, conn1, conn2);
conn.setClientID("wib1");
conn1.setClientID("wib1");
conn2.setClientID("wib1");
@@ -855,10 +927,16 @@
try
{
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
conn = cf.createConnection();
- conn1 = cf1.createConnection();
- conn2 = cf2.createConnection();
-
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn, conn1, conn2);
+
conn1.setClientID("wib1");
conn2.setClientID("wib1");
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -61,8 +61,8 @@
protected Topic topic2;
protected ConnectionFactory cf;
- protected ConnectionFactory cf1;
- protected ConnectionFactory cf2;
+ // protected ConnectionFactory cf1;
+ // protected ConnectionFactory cf2;
// Constructors --------------------------------------------------
@@ -99,10 +99,10 @@
ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
+ //We only need to lookup one connection factory since it will be a clustered cf
+ //so we will actually create connections on different servers (round robin)
cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
- cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
- cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
-
+
queue = (Queue)ic.lookup("queue/testDistributedQueue");
queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
@@ -124,15 +124,15 @@
{
try
{
- ServerManagement.log(ServerManagement.INFO,"Undeploying Server 0");
+ ServerManagement.log(ServerManagement.INFO, "Undeploying Server 0");
ServerManagement.undeployQueue("testDistributedQueue", 0);
ServerManagement.undeployTopic("testDistributedTopic", 0);
- ServerManagement.log(ServerManagement.INFO,"Undeploying Server 1");
+ ServerManagement.log(ServerManagement.INFO, "Undeploying Server 1");
ServerManagement.undeployQueue("testDistributedQueue", 1);
ServerManagement.undeployTopic("testDistributedTopic", 1);
- ServerManagement.log(ServerManagement.INFO,"Undeploying Server 2");
+ ServerManagement.log(ServerManagement.INFO, "Undeploying Server 2");
ServerManagement.undeployQueue("testDistributedQueue", 2);
ServerManagement.undeployTopic("testDistributedTopic", 2);
@@ -157,9 +157,11 @@
try
{
+ //Since the cf is clustered, this will create connections on 3 different nodes
+ //(round robin)
conn = cf.createConnection();
- conn1 = cf1.createConnection();
- conn2 = cf2.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -178,18 +180,21 @@
do
{
msg = cons.receive(1000);
+ log.info("1 Drained message " + msg);
}
while (msg != null);
do
{
msg = cons1.receive(1000);
+ log.info("2 Drained message " + msg);
}
while (msg != null);
do
{
msg = cons2.receive(1000);
+ log.info("3 Drained message " + msg);
}
while (msg != null);
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-12-08 12:15:43 UTC (rev 1725)
@@ -256,7 +256,7 @@
{
try
{
- log.info(" Server peer ID ........... " + serverPeerID);
+ log.info(" Server peer ID ........... " + serverPeerID + " clustered: " + clustered);
log.debug("creating ServerPeer instance");
More information about the jboss-cvs-commits
mailing list