[jboss-cvs] JBoss Messaging SVN: r1619 - in branches/Branch_Client_Failover_Experiment: src/etc/xmdesc src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools tests/src/org/jboss/test/messaging/tools/jmx/rmi

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 22 18:26:44 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-22 18:26:37 -0500 (Wed, 22 Nov 2006)
New Revision: 1619

Added:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Modified:
   branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
Log:
Adding new HA automated Tests + LeaveClusterRequest message

Modified: branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-11-22 23:26:37 UTC (rev 1619)
@@ -142,6 +142,16 @@
    </operation>
 
    <operation>
+      <description></description>
+      <name>stop</name>
+      <parameter>
+        <description>Should we send a notification about leaving cluster</description>
+        <name>sendNotification</name>
+        <type>boolean</type>
+      </parameter>
+   </operation>
+
+   <operation>
       <description>JBoss Service lifecycle operation</description>
       <name>destroy</name>
    </operation>

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -21,30 +21,21 @@
  */
 package org.jboss.messaging.core.plugin.postoffice;
 
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
+import java.util.*;
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
-
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.*;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.JDBCSupport;
@@ -54,10 +45,6 @@
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
-
 /**
  * 
  * A DefaultPostOffice
@@ -143,9 +130,14 @@
       
       if (trace) { log.trace(this + " started"); }
    }
-   
+
    public void stop() throws Exception
    {
+      stop(true);
+   }
+
+   public void stop(boolean sendNotification) throws Exception
+   {
       if (trace) { log.trace(this + " stopping"); }
       
       super.stop();

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -23,7 +23,6 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-
 import org.jboss.messaging.util.Streamable;
 
 /**
@@ -99,6 +98,9 @@
             request = new RollbackPullRequest();
             break;
          }
+         case LeaveClusterRequest.TYPE:
+            request = new LeaveClusterRequest();
+            break;
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + type);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -296,16 +296,17 @@
       if (trace) { log.trace("Started " + this + " with sync address " + syncAddress +
                              " async address " + asyncAddress); }
    }
-
-   public synchronized void stop() throws Exception
+   
+   public synchronized void stop(boolean sendNotification) throws Exception
    {
       if (!started)
       {
          log.warn("Attempt to stop() but " + this + " is not started");
       }
+
+      syncSendRequest(new LeaveClusterRequest(this.getNodeId()));
+      super.stop(sendNotification);
       
-      super.stop();
-      
       statsSender.stop();
          
       syncChannel.close();
@@ -1322,7 +1323,7 @@
 
       NodeAddressInfo info[] = getClusterNodes();
 
-      out.println("<table><tr><td>Node</td><td>AsyncChannel</td><td>SyncChannel</td></tr>");
+      out.println("<table border=1><tr><td>Node</td><td>AsyncChannel</td><td>SyncChannel</td></tr>");
       for (int i = 0; i < info.length; i++)
       {
          out.println("<tr><td>" + info[i].getNodeId() + "</td><td>" + info[i].getAsyncChannelAddress() + "</td><td>" + info[i].getSyncChannelAddress() + "</td>");
@@ -1539,9 +1540,12 @@
          lock.readLock().release();
       }
    }
-       
-   private void removeBindingsForAddress(Integer nodeId) throws Exception
+
+
+   public void removeBindingsForAddress(int parameterNodeId) throws Exception
    {
+      log.info("Node " + parameterNodeId + " requested to leave cluster");
+      Integer nodeId = new Integer(parameterNodeId);
       lock.writeLock().acquire();
 
       try
@@ -1838,22 +1842,23 @@
                      try
                      {
                         Integer nodeId = getNodeIdForSyncAddress(address);
-                        
-                        if (nodeId == null)
+
+                        // nodeId==null means the server left the cluster without a problem. It sent a message before
+                        // leaving the cluster
+                        if (nodeId != null)
                         {
-                           throw new IllegalStateException("Cannot find node id for address: " + address);
+                           if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " Performing cleanup for node " + nodeId); }
+
+                           //Perform a check - the member might have crashed and left uncommitted transactions
+                           //we need to resolve this
+                           check(nodeId);
+
+                           //removeBindingsForAddress(nodeId);
+                           failOver(nodeId.intValue());
+
+                           if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " cleanup complete"); }
                         }
-                        
-                        if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " Performing cleanup for node " + nodeId); }
-                        
-                        //Perform a check - the member might have crashed and left uncommitted transactions
-                        //we need to resolve this
-                        check(nodeId);
-                                                                        
-                        removeBindingsForAddress(nodeId);
-                        
-                        if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " cleanup complete"); }
-                     }               
+                     }
                      catch (Throwable e)
                      {
                         log.error("Caught Exception in MembershipListener", e);

Added: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -0,0 +1,51 @@
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class LeaveClusterRequest extends ClusterRequest
+{
+   static final int TYPE = 11;
+
+   private int nodeId;
+
+   public LeaveClusterRequest(int nodeId)
+   {
+      this.nodeId=nodeId;
+   }
+
+   /**
+    * This constructor only exist because it's an Streamable requirement.
+    * @see ClusterRequest#createFromStream(java.io.DataInputStream)  
+    */
+   public LeaveClusterRequest()
+   {
+   }
+
+   Object execute(PostOfficeInternal office) throws Throwable
+   {
+      office.removeBindingsForAddress(nodeId);
+      return null;
+   }
+
+   byte getType()
+   {
+      return TYPE;
+   }
+
+   public void write(DataOutputStream out) throws Exception
+   {
+      out.writeInt(nodeId);
+   }
+
+   public void read(DataInputStream in) throws Exception
+   {
+      nodeId = in.readInt();
+   }
+}

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -23,7 +23,6 @@
 
 import java.util.List;
 import java.util.Map;
-
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 
@@ -48,6 +47,8 @@
    
    void removeBindingFromCluster(int nodeId, String queueName)
       throws Exception;
+
+   void removeBindingsForAddress(int nodeId) throws Exception;
    
    void handleAddressNodeMapping(NodeAddressInfo info, int nodeId)
       throws Exception;

Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -0,0 +1,196 @@
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class ClusteringBase extends MessagingTestCase
+{
+
+   protected Context ic1;
+
+   protected Context ic2;
+
+   protected Context ic3;
+
+   protected Queue queue1;
+
+   protected Topic topic1;
+
+   protected Queue queue2;
+
+   protected Topic topic2;
+
+   protected Queue queue3;
+
+   protected Topic topic3;
+
+   protected ConnectionFactory cf1;
+
+   protected ConnectionFactory cf2;
+
+   protected ConnectionFactory cf3;
+
+   public ClusteringBase(String name)
+   {
+      super(name);
+   }
+
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      try
+      {
+
+         ServerManagement.start("all", 0, true);
+
+         ServerManagement.start("all", 1, true);
+
+         ServerManagement.start("all", 2, true);
+
+         ServerManagement.deployClusteredQueue("testDistributedQueue", 0);
+         ServerManagement.deployClusteredTopic("testDistributedTopic", 0);
+
+         ServerManagement.deployClusteredQueue("testDistributedQueue", 1);
+         ServerManagement.deployClusteredTopic("testDistributedTopic", 1);
+
+         ServerManagement.deployClusteredQueue("testDistributedQueue", 2);
+         ServerManagement.deployClusteredTopic("testDistributedTopic", 2);
+
+         ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
+
+         ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+
+         ic3 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
+
+         queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
+
+         queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
+
+         queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
+
+         topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
+
+         topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
+
+         topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
+
+         cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+         cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
+
+         cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
+
+         drainQueues();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+   }
+
+   protected void tearDown() throws Exception
+   {
+      try
+      {
+         super.tearDown();
+
+         ServerManagement.undeployQueue("testDistributedQueue", 0);
+         ServerManagement.undeployTopic("testDistributedTopic", 0);
+
+         ServerManagement.undeployQueue("testDistributedQueue", 1);
+         ServerManagement.undeployTopic("testDistributedTopic", 1);
+
+         ServerManagement.undeployQueue("testDistributedQueue", 2);
+         ServerManagement.undeployTopic("testDistributedTopic", 2);
+
+         ic1.close();
+
+         ic2.close();
+
+         ic3.close();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+   }
+
+   protected void drainQueues() throws Exception
+   {
+      Connection conn1 = null;
+
+      Connection conn2 = null;
+
+      Connection conn3 = null;
+
+      try
+      {
+         conn1 = cf1.createConnection();
+
+         conn2 = cf2.createConnection();
+
+         conn3 = cf3.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+         MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+         MessageConsumer cons3 = sess3.createConsumer(queue3);
+
+         conn1.start();
+
+         conn2.start();
+
+         conn3.start();
+
+         Message msg = null;
+
+         do
+         {
+            msg = cons1.receive(1000);
+         }
+         while (msg != null);
+
+         do
+         {
+            msg = cons2.receive(1000);
+         }
+         while (msg != null);
+
+         do
+         {
+            msg = cons3.receive(1000);
+         }
+         while (msg != null);
+      }
+      finally
+      {
+         if (conn1 != null) conn1.close();
+
+         if (conn2 != null) conn2.close();
+
+         if (conn3 != null) conn3.close();
+      }
+   }
+
+
+}

Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -0,0 +1,282 @@
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.*;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.message.MessageProxy;
+import org.jboss.logging.Logger;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class HATest extends ClusteringBase
+{
+   protected Logger log = Logger.getLogger(getClass());
+
+   public HATest(String name)
+   {
+      super(name);
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   public void testLeaveFailover() throws Exception
+   {
+      try
+      {
+         System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+         assertEquals(3,ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+         assertEquals(3,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+         ServerManagement.stop(0,true);
+         System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+         assertEquals(2,ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+      }
+      finally
+      {
+         ServerManagement.start("all", 0, true); // tear down needs the server up
+      }
+   }
+
+
+   public void testTopicSubscriber() throws Exception
+   {
+      try
+      {
+         log.info("++testTopicSubscriber");
+
+         log.info(">>Lookup Queue");
+         Destination destination = (Destination) ic1.lookup("topic/testDistributedTopic");
+
+         log.info("Creating connection server1");
+         JBossConnection conn = (JBossConnection) cf1.createConnection();
+         conn.setClientID("testClient");
+         conn.start();
+
+         JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+         SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+         MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
+         JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+         org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+         ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+         log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+         log.info(">>Creating Producer");
+         MessageProducer producer = session.createProducer(destination);
+         log.info(">>creating Message");
+         Message message = session.createTextMessage("Hello Before");
+         log.info(">>sending Message");
+         producer.send(message);
+         session.commit();
+
+         receiveMessage("consumerHA", consumerHA, true, false);
+
+         session.commit();
+         //if (true) return;
+
+         Object txID = sessionState.getCurrentTxId();
+
+         producer.send(session.createTextMessage("Hello again before failover"));
+
+         ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+         JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+         log.info(">>Creating alternate connection");
+         JBossConnection conn2 = (JBossConnection) cf2.createConnection();
+         log.info("NewConnectionCreated=" + conn2);
+
+         log.info(">>Failling over");
+         assertSame(originalRemoting, delegate.getRemotingConnection());
+         conn.getDelegate().failOver(conn2.getDelegate());
+
+
+
+         try
+         {
+            originalRemoting.stop();
+         } catch (Throwable throwable)
+         {
+            throwable.printStackTrace();
+         }
+
+         ServerManagement.stop(0, false);
+
+         assertNotSame(originalRemoting, delegate.getRemotingConnection());
+
+         //System.out.println("Kill server1"); Thread.sleep(10000);
+
+         message = session.createTextMessage("Hello After");
+         log.info(">>Sending new message");
+         producer.send(message);
+
+         assertEquals(txID, sessionState.getCurrentTxId());
+         System.out.println("TransactionID on client = " + txID);
+         log.info(">>Final commit");
+
+         /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+          connSecondServer.start();
+          JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+          MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
+
+         session.commit();
+
+         /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+        receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+        receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
+
+         log.info("Calling alternate receiver");
+         receiveMessage("consumerHA", consumerHA, true, false);
+         receiveMessage("consumerHA", consumerHA, true, false);
+         receiveMessage("consumerHA", consumerHA, true, true);
+
+
+         session.commit();
+
+      }
+      finally
+      {
+         // restart the server as it was probably stopped (tearDown will need that)
+         ServerManagement.start("all", 0, true);
+      }
+   }
+
+   public void testQueueHA() throws Exception
+   {
+      log.info("++testTopicSubscriber");
+
+      log.info(">>Lookup Queue");
+      Destination destination = (Destination) ic1.lookup("queue/testDistributedQueue");
+
+      log.info("Creating connection server1");
+      JBossConnection conn = (JBossConnection) cf1.createConnection();
+      conn.setClientID("testClient");
+      conn.start();
+
+      JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+      ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+      SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+      MessageConsumer consumerHA = session.createConsumer(destination);
+      JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+      org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+      ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+      log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+      log.info(">>Creating Producer");
+      MessageProducer producer = session.createProducer(destination);
+      log.info(">>creating Message");
+      Message message = session.createTextMessage("Hello Before");
+      log.info(">>sending Message");
+      producer.send(message);
+      session.commit();
+
+      session.commit();
+      //if (true) return;
+
+      Object txID = sessionState.getCurrentTxId();
+
+      ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+      JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+      log.info(">>Creating alternate connection");
+      JBossConnection conn2 = (JBossConnection) cf2.createConnection();
+      log.info("NewConnectionCreated=" + conn2);
+
+      log.info(">>Failling over");
+      assertSame(originalRemoting, delegate.getRemotingConnection());
+      conn.getDelegate().failOver(conn2.getDelegate());
+
+      try
+      {
+         originalRemoting.stop();
+      } catch (Throwable throwable)
+      {
+         throwable.printStackTrace();
+      }
+
+
+      assertNotSame(originalRemoting, delegate.getRemotingConnection());
+
+      //System.out.println("Kill server1"); Thread.sleep(10000);
+      assertEquals(txID, sessionState.getCurrentTxId());
+      System.out.println("TransactionID on client = " + txID);
+      log.info(">>Final commit");
+
+      session.commit();
+
+      log.info("Calling alternate receiver");
+      receiveMessage("consumerHA", consumerHA, true, false);
+      receiveMessage("consumerHA", consumerHA, true, true);
+
+      session.commit();
+
+      for (int i = 0; i < 30; i++)
+      {
+         log.info("Message Sent " + i);
+         producer.send(session.createTextMessage("Message " + i));
+      }
+      session.commit();
+
+      Thread.sleep(5000);
+
+      TextMessage messageLoop = null;
+      while (!((messageLoop = (TextMessage) consumerHA.receive(5000)) == null))
+      {
+         log.info("Message received = " + messageLoop.getText());
+      }
+
+   }
+
+
+   private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
+   {
+      MessageProxy message = (MessageProxy) consumer.receive(3000);
+      TextMessage txtMessage = (TextMessage) message;
+      if (message != null)
+      {
+         log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
+      } else
+      {
+         log.info(text + ": Message received was null");
+      }
+      if (shouldAssert)
+      {
+         if (shouldBeNull)
+         {
+            assertNull(message);
+         } else
+         {
+            assertNotNull(message);
+         }
+      }
+   }
+
+
+}

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -21,23 +21,8 @@
  */
 package org.jboss.test.messaging.jms.clustering;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
+import javax.jms.*;
 
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-
 /**
  * 
  * A ManualClusteringTest
@@ -50,32 +35,8 @@
  * $Id$
  *
  */
-public class ManualClusteringTest extends MessagingTestCase
+public class ManualClusteringTest extends ClusteringBase
 {
-   protected Context ic1;
-   
-   protected Context ic2;
-   
-   protected Context ic3;
-   
-   protected Queue queue1;
-   
-   protected Topic topic1;
-   
-   protected Queue queue2;
-   
-   protected Topic topic2;
-   
-   protected Queue queue3;
-   
-   protected Topic topic3;
-   
-   protected ConnectionFactory cf1;
-   
-   protected ConnectionFactory cf2;
-   
-   protected ConnectionFactory cf3;
-     
    public ManualClusteringTest(String name)
    {
       super(name);
@@ -85,151 +46,13 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      
-      try
-      {
-                     
-         ServerManagement.start("all", 0, true);
-         
-         ServerManagement.start("all", 1, true);
-         
-         ServerManagement.start("all", 2, true);
-         
-         ServerManagement.deployClusteredQueue("testDistributedQueue", 0);         
-         ServerManagement.deployClusteredTopic("testDistributedTopic", 0);
-         
-         ServerManagement.deployClusteredQueue("testDistributedQueue", 1);         
-         ServerManagement.deployClusteredTopic("testDistributedTopic", 1);
-         
-         ServerManagement.deployClusteredQueue("testDistributedQueue", 2);         
-         ServerManagement.deployClusteredTopic("testDistributedTopic", 2);
-               
-         ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
-         
-         ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
-         
-         ic3 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
-               
-         queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
-         
-         queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
-         
-         queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
-               
-         topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
-         
-         topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
-         
-         topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
-         
-         cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-         
-         cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
-         
-         cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
-         
-         drainQueues();
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         throw e;
-      }
    }
 
    protected void tearDown() throws Exception
    {
-      try
-      {
          super.tearDown();
-         
-         ServerManagement.undeployQueue("testDistributedQueue", 0);         
-         ServerManagement.undeployTopic("testDistributedTopic", 0);
-         
-         ServerManagement.undeployQueue("testDistributedQueue", 1);         
-         ServerManagement.undeployTopic("testDistributedTopic", 1);
-         
-         ServerManagement.undeployQueue("testDistributedQueue", 2);         
-         ServerManagement.undeployTopic("testDistributedTopic", 2);
-         
-         ic1.close();
-         
-         ic2.close();
-         
-         ic3.close();                  
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         throw e;
-      }
    }
    
-   protected void drainQueues() throws Exception
-   {
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-            
-      try
-      {
-         conn1 = cf1.createConnection();
-         
-         conn2 = cf2.createConnection();
-         
-         conn3 = cf3.createConnection();
-           
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons1 = sess1.createConsumer(queue1);
-         
-         MessageConsumer cons2 = sess2.createConsumer(queue2);
-         
-         MessageConsumer cons3 = sess3.createConsumer(queue3);
-         
-         conn1.start();
-         
-         conn2.start();
-         
-         conn3.start();
-         
-         Message msg = null;
-         
-         do
-         {
-            msg = cons1.receive(1000);
-         }
-         while (msg != null);
-         
-         do
-         {
-            msg = cons2.receive(1000);
-         }
-         while (msg != null);
-         
-         do
-         {
-            msg = cons3.receive(1000);
-         }
-         while (msg != null);
-      }
-      finally
-      {      
-         if (conn1 != null) conn1.close();
-         
-         if (conn2 != null) conn2.close();
-         
-         if (conn3 != null) conn3.close();
-      }
-   }
-    
-   
    public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
    {
       clusteredQueueLocalConsumer(false);

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -165,10 +165,17 @@
    public static synchronized void stop() throws Exception
    {
       insureStarted();
-      
-      servers[0].stop();      
+
+      servers[0].stop();
    }
 
+   public static synchronized void stop(int index, boolean sendNotification) throws Exception
+   {
+      insureStarted();
+
+      servers[index].stop(sendNotification);
+   }
+
    public static synchronized void destroy() throws Exception
    {
       stop();

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
 import org.jboss.remoting.ServerInvocationHandler;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.jboss.MBeanConfigurationElement;
@@ -129,12 +130,17 @@
 
    public synchronized void stop() throws Exception
    {
+      stop(true);
+   }
+
+   public synchronized void stop(boolean sendNotification) throws Exception
+   {
       if (!isStarted())
       {
          return;
       }
 
-      stopServerPeer();
+      stopServerPeer(sendNotification);
 
       log.debug("stopping service container");
 
@@ -360,6 +366,11 @@
 
    public void stopServerPeer() throws Exception
    {
+      stopServerPeer(true);
+   }
+   
+   public void stopServerPeer(boolean sendNotification) throws Exception
+   {
       try
       {
          // if we don't find a ServerPeer instance registered under the serverPeerObjectName
@@ -421,7 +432,7 @@
    
          try
          {
-            sc.invoke(serverPeerObjectName, "stop", new Object[0], new String[0]);
+            sc.invoke(serverPeerObjectName, "stop", new Object[]{new Boolean(sendNotification)}, new String[]{"boolean"});
             sc.invoke(serverPeerObjectName, "destroy", new Object[0], new String[0]);
             sc.unregisterService(serverPeerObjectName);
          }
@@ -566,7 +577,13 @@
       return (PostOffice)sc.
          getAttribute(queuePostOfficeObjectName, "Instance");
    }
-   
+
+   public PostOffice internalGetQueuePostOffice() throws Exception
+   {
+      return (PostOffice)sc.
+         getAttribute(queuePostOfficeObjectName, "Instance");
+   }
+
    public PostOffice getTopicPostOffice() throws Exception
    {
       return (PostOffice)sc.
@@ -810,6 +827,15 @@
       return sc.getUserTransaction();
    }
 
+
+   public int getNumberOfNodesOnCluster() throws Exception
+   {
+      log.info("getNumberOfNodesOnCluster being called:: sc=" + sc);
+      DefaultClusteredPostOffice postOffice = (DefaultClusteredPostOffice)
+            sc.getAttribute(queuePostOfficeObjectName, "Instance");
+      return postOffice.getClusterNodes().length;
+   }
+
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -330,6 +330,12 @@
       server.startServerPeer(serverPeerID, defaultQueueJNDIContext, defaultTopicJNDIContext, clustered);
    }
 
+   public void stop(boolean sendNotification) throws Exception
+   {
+      server.stop(sendNotification);
+      namingDelegate.reset();
+   }
+
    public void stop() throws Exception
    {
       server.stop();
@@ -372,8 +378,15 @@
       return server.getUserTransaction();
    }
 
+
+   public int getNumberOfNodesOnCluster() throws Exception
+   {
+      return server.getNumberOfNodesOnCluster();
+   }
+
    private RMINamingDelegate getNamingDelegate()
    {
       return namingDelegate;
    }
+
 }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2006-11-22 23:26:37 UTC (rev 1619)
@@ -43,6 +43,7 @@
 public interface Server extends Remote
 {
    void start(String containerConfig, boolean clustered) throws Exception;
+   void stop(boolean sendNotification) throws Exception;
    void stop() throws Exception;
    void destroy() throws Exception;
 
@@ -217,4 +218,7 @@
 
    UserTransaction getUserTransaction() throws Exception;
 
+   /** use only on Clustering tests */
+   int getNumberOfNodesOnCluster() throws Exception;
+
 }




More information about the jboss-cvs-commits mailing list