[jboss-cvs] JBoss Messaging SVN: r1725 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/bin tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/jms/clustering/base tests/src/org/jboss/test/messaging/tools/jmx/rmi

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 8 07:15:53 EST 2006


Author: timfox
Date: 2006-12-08 07:15:43 -0500 (Fri, 08 Dec 2006)
New Revision: 1725

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   branches/Branch_Client_Failover_Experiment/tests/bin/runtest
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
Get ManualClusteringTest to run



Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -81,6 +81,8 @@
    {
       if (getServers(invocation) != null)
       {         
+         log.info("Clustered createConnection");
+                  
          //In a clustered configuration we create connections in a round-robin fashion
          //from the available servers
          
@@ -168,7 +170,7 @@
       
       ConnectionState state = (ConnectionState)((DelegateSupport)connDelegate).getState();
       
-      state.getRemotingConnection().getInvokingClient().addConnectionListener(new Listener(connDelegate));
+      //state.getRemotingConnection().getInvokingClient().addConnectionListener(new Listener(connDelegate));
       
       return connDelegate;   
    }
@@ -427,6 +429,8 @@
       Listener(ClientConnectionDelegate connection)
       {
          this.connection = connection;
+         
+         log.info("************* CREATING LISTENER");
       }
       
       public void handleConnectionException(Throwable throwable, Client client)
@@ -439,6 +443,7 @@
          }
          catch (Throwable e)
          {
+            log.error("Caught exception in handling failure", e);
             e.printStackTrace();
          }
       }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -40,6 +40,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+
+import javax.jms.TextMessage;
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 import org.jboss.jms.server.QueuedExecutorPool;
@@ -417,6 +419,18 @@
          log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
       }
       
+      //debug
+      try
+      {
+         TextMessage tm = (TextMessage)ref.getMessage();
+         
+         log.info(this.currentNodeId + " *********** Routing ref: " + tm.getText() + " with condition " + condition + " and transaction " + tx);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      
       if (ref == null)
       {
          throw new IllegalArgumentException("Message reference is null");
@@ -488,9 +502,13 @@
                   if (trace)
                   {
                      log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
-                               queue.getNodeId() +" local:" + queue.isLocal());
+                               queue.getNodeId() + " local:" + queue.isLocal());
+                     
                   }
                   
+                  log.info(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
+                           queue.getNodeId() + " local:" + queue.isLocal());
+                  
                   if (router.numberOfReceivers() > 1)
                   {
                      //We have now chosen which one will receive the message so we need to add this
@@ -820,6 +838,11 @@
          log.trace(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
                   routingKey + " map " + queueNameNodeIdMap);
       }
+      
+      log.info(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
+               routingKey + " map " + queueNameNodeIdMap);
+      
+      
             
       lock.readLock().acquire();  
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -21,6 +21,12 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.TextMessage;
+
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
@@ -28,10 +34,6 @@
 import org.jboss.messaging.core.Receiver;
 import org.jboss.messaging.core.tx.Transaction;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 /**
  * 
  * A DefaultRouter
@@ -166,15 +168,15 @@
    {
       if (trace) { log.trace(this + " routing ref " + reference); }
 
-      //Favour the local queue or the failedOver queue in round robbin
+      //Favour the local queue or the failedOver queue in round robin
 
       if (!failedOverQueues.isEmpty())
       {
-
-         if (trace) { log.trace("Round robbing on FailedOver queue, currentTarget=" + target);}
+         if (trace) { log.trace("Round robin on FailedOver queue, currentTarget=" + target);}
+         
          LocalClusteredQueue queueToUse = null;
 
-         if (target==-1)
+         if (target == -1)
          {
             queueToUse = (LocalClusteredQueue)this.localQueue; 
          }
@@ -185,18 +187,30 @@
 
          incTargetFailedOver();
          
+         log.info("***************** Routing to failed over queue");
          Delivery del = queueToUse.handle(observer, reference, tx);
 
          if (trace) { log.trace(this+" routed to failed queue, using failedOver round robbing, returned " + del); }
          
          return del;
       }
-      else
-      if (localQueue != null)
+      else if (localQueue != null)
       {
          //The only time the local queue won't accept is if the selector doesn't
          //match - in which case it won't match at any other nodes too so no point
          //in trying them
+         
+         //debug
+         try
+         {
+            TextMessage tm = (TextMessage)reference.getMessage();
+            
+            log.info("*********** Routing to local queue: " + tm.getText() + " id:" + System.identityHashCode(localQueue) );
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
 
          Delivery del = localQueue.handle(observer, reference, tx);
 
@@ -215,6 +229,7 @@
 
             queue = (ClusteredQueue)nonLocalQueues.get(target);
 
+            log.info("************ Routing to non local queue");
             Delivery del = queue.handle(observer, reference, tx);
 
             if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -148,6 +148,8 @@
    {
       if (trace) { log.trace("Handling ref from cluster: " + ref); }
       
+      log.info("********** Handling ref from cluster: " + ref);
+      
       if (filter != null && !filter.accept(ref))
       {
          Delivery del = new SimpleDelivery(this, ref, true, false);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -107,6 +107,8 @@
       {
          Delivery del = new SimpleDelivery(this, reference, false, false);
          
+         log.info("********** doesn't match filter");
+         
          return del;
       }
       
@@ -126,6 +128,8 @@
             return null;
          }
       }
+      
+      log.info("*********** accepting message:" + reference);
   
       return new SimpleDelivery(this, reference, false);      
    }

Modified: branches/Branch_Client_Failover_Experiment/tests/bin/runtest
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/bin/runtest	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/bin/runtest	2006-12-08 12:15:43 UTC (rev 1725)
@@ -42,8 +42,6 @@
    TEST_CLUSTERED=$ENV_TEST_CLUSTERED
 fi
 
-TEST_DATABASE=mysql
-
 #
 # We should use the same test execution classpath as the ant <junit> task, so we run ant to get
 # it from there.

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -21,10 +21,20 @@
  */
 package org.jboss.test.messaging.jms.clustering;
 
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
 
-import javax.jms.*;
-
 /**
  * 
  * A ManualClusteringTest
@@ -57,8 +67,7 @@
 
    public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
    {
-      System.out.println("xxxxxxxxxxxx");
-//      clusteredQueueLocalConsumer(false);
+      clusteredQueueLocalConsumer(false);
    }
 
    public void testClusteredQueueLocalConsumerPersistent() throws Exception
@@ -127,8 +136,34 @@
 
    protected void tearDown() throws Exception
    {
-         super.tearDown();
+      super.tearDown();
    }
+   
+   private void checkConnectionsDifferentServers(Connection conn, Connection conn1, Connection conn2)
+   {
+      ConnectionState state = (ConnectionState)(((DelegateSupport)((JBossConnection)conn).getDelegate()).getState());
+      
+      ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+      
+      ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+      
+      int serverID = state.getServerID();
+      
+      int serverID1 = state1.getServerID();
+      
+      int serverID2 = state2.getServerID();
+      
+      log.info("Server id:" + serverID);
+      
+      log.info("Server id1:" + serverID1);
+      
+      log.info("Server id2:" + serverID2);
+      
+      
+      assertTrue(serverID != serverID1);
+      
+      assertTrue(serverID1 != serverID2);
+   }
 
    /*
     * Create a consumer on each queue on each node.
@@ -143,17 +178,27 @@
 
       try
       {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
          conn = cf.createConnection();
-         conn1 = cf1.createConnection();
-         conn2 = cf2.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn, conn1, conn2);
 
          Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         log.info("Created sessions");
 
          MessageConsumer cons = sess.createConsumer(queue);
          MessageConsumer cons1 = sess1.createConsumer(queue1);
          MessageConsumer cons2 = sess2.createConsumer(queue2);
+         
+         log.info("Created consumers");
 
          conn.start();
          conn1.start();
@@ -173,15 +218,19 @@
 
             prod.send(tm);
          }
+         
+         log.info("Sent messages");
 
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             TextMessage tm = (TextMessage)cons.receive(1000);
 
             assertNotNull(tm);
-
+            
+            log.info("1 GOT MESSAGE: " + tm.getText());
+            
             assertEquals("message" + i, tm.getText());
-         }
+         }                 
 
          Message m = cons1.receive(2000);
 
@@ -283,9 +332,15 @@
       Connection conn2 = null;
       try
       {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
          conn = cf.createConnection();
-         conn1 = cf1.createConnection();
-         conn2 = cf2.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn, conn1, conn2);
 
          Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -395,9 +450,15 @@
 
       try
       {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
          conn = cf.createConnection();
-         conn1 = cf1.createConnection();
-         conn2 = cf2.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn, conn1, conn2);
 
          Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -529,9 +590,15 @@
       Connection conn2 = null;
       try
       {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
          conn = cf.createConnection();
-         conn1 = cf1.createConnection();
-         conn2 = cf2.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn, conn1, conn2);
 
          conn.setClientID("wib1");
          conn1.setClientID("wib1");
@@ -683,10 +750,15 @@
       try
 
       {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
          conn = cf.createConnection();
-         conn1 = cf1.createConnection();
-         conn2 = cf2.createConnection();
-
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn, conn1, conn2);
          conn.setClientID("wib1");
          conn1.setClientID("wib1");
          conn2.setClientID("wib1");
@@ -855,10 +927,16 @@
 
       try
       {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
          conn = cf.createConnection();
-         conn1 = cf1.createConnection();
-         conn2 = cf2.createConnection();
-
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn, conn1, conn2);
+         
          conn1.setClientID("wib1");
          conn2.setClientID("wib1");
 

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -61,8 +61,8 @@
    protected Topic topic2;
 
    protected ConnectionFactory cf;
-   protected ConnectionFactory cf1;
-   protected ConnectionFactory cf2;
+  // protected ConnectionFactory cf1;
+  // protected ConnectionFactory cf2;
 
    // Constructors --------------------------------------------------
 
@@ -99,10 +99,10 @@
          ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
          ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
 
+         //We only need to lookup one connection factory since it will be a clustered cf
+         //so we will actually create connections on different servers (round robin)
          cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-         cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-         cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
-
+                  
          queue = (Queue)ic.lookup("queue/testDistributedQueue");
          queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
          queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
@@ -124,15 +124,15 @@
    {
       try
       {
-         ServerManagement.log(ServerManagement.INFO,"Undeploying Server 0");
+         ServerManagement.log(ServerManagement.INFO, "Undeploying Server 0");
          ServerManagement.undeployQueue("testDistributedQueue", 0);
          ServerManagement.undeployTopic("testDistributedTopic", 0);
 
-         ServerManagement.log(ServerManagement.INFO,"Undeploying Server 1");
+         ServerManagement.log(ServerManagement.INFO, "Undeploying Server 1");
          ServerManagement.undeployQueue("testDistributedQueue", 1);
          ServerManagement.undeployTopic("testDistributedTopic", 1);
 
-         ServerManagement.log(ServerManagement.INFO,"Undeploying Server 2");
+         ServerManagement.log(ServerManagement.INFO, "Undeploying Server 2");
          ServerManagement.undeployQueue("testDistributedQueue", 2);
          ServerManagement.undeployTopic("testDistributedTopic", 2);
 
@@ -157,9 +157,11 @@
 
       try
       {
+         //Since the cf is clustered, this will create connections on 3 different nodes
+         //(round robin)
          conn = cf.createConnection();
-         conn1 = cf1.createConnection();
-         conn2 = cf2.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
 
          Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -178,18 +180,21 @@
          do
          {
             msg = cons.receive(1000);
+            log.info("1 Drained message " + msg);
          }
          while (msg != null);
 
          do
          {
             msg = cons1.receive(1000);
+            log.info("2 Drained message " + msg);
          }
          while (msg != null);
 
          do
          {
             msg = cons2.receive(1000);
+            log.info("3 Drained message " + msg);
          }
          while (msg != null);
       }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-12-08 02:12:13 UTC (rev 1724)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-12-08 12:15:43 UTC (rev 1725)
@@ -256,7 +256,7 @@
    {
       try
       {
-         log.info(" Server peer ID ........... " + serverPeerID);
+         log.info(" Server peer ID ........... " + serverPeerID + " clustered: " + clustered);
          
          log.debug("creating ServerPeer instance");
    




More information about the jboss-cvs-commits mailing list