[jboss-cvs] JBoss Messaging SVN: r1556 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/util src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/ha

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 9 17:57:10 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-09 17:57:01 -0500 (Thu, 09 Nov 2006)
New Revision: 1556

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
Log:
AFter tests with Queues + round robbin of failed queues

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,18 +21,7 @@
   */
 package org.jboss.jms.server.endpoint;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.delegate.BrowserDelegate;
@@ -68,7 +57,10 @@
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.util.id.GUID;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import java.util.*;
 
 /**
  * Concrete implementation of SessionEndpoint.
@@ -150,8 +142,18 @@
    {
       try
       {
-         ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
 
+         // this code needs to be transfered to PostOffices, JGroups fail detection
+         if (jmsDestination.isTopic())
+         {
+            ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
+         }
+         else
+         if (jmsDestination.isQueue())
+         {
+            ((ClusteredPostOffice)queuePostOffice).failOver(nodeId);
+         }
+
          // fail over channel
          PostOffice postOfficeToUse = null;
          if (jmsDestination.isTopic())

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -72,6 +72,8 @@
          {
             setLinkedException(new Exception(cause));
          }
+
+         this.initCause(cause);
       }
    }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,20 +21,13 @@
   */
 package org.jboss.messaging.core.local;
 
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.PagingChannelSupport;
-import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.*;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.tx.Transaction;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * 
  * A PagingFilteredQueue
@@ -137,7 +130,7 @@
    
    public String toString()
    {
-      return "Queue[" + getChannelID() + "]";
+      return "Queue[" + getChannelID() + "/" + this.getName() +  "]";
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,11 +21,11 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.Router;
+
 import java.util.List;
 
-import org.jboss.messaging.core.Router;
-import org.jboss.messaging.core.Receiver;
-
 /**
  * A ClusterRouter
  *
@@ -39,6 +39,8 @@
 {
    List getQueues();
 
+   List getFailedQueues();
+
    ClusteredQueue getLocalQueue();
 
    boolean add(Receiver receiver, boolean failedOver);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,18 +21,10 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.io.*;
-import java.util.*;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.*;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
@@ -43,20 +35,19 @@
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.util.StreamUtils;
-import org.jgroups.Address;
+import org.jgroups.*;
 import org.jgroups.Channel;
-import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
 import org.jgroups.Message;
-import org.jgroups.MessageListener;
 import org.jgroups.Receiver;
-import org.jgroups.View;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.MessageDispatcher;
 import org.jgroups.blocks.RequestHandler;
 import org.w3c.dom.Element;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import java.io.*;
+import java.util.*;
 
 /**
  * 
@@ -1108,6 +1099,11 @@
       {
          log.info("Preparing failover against node " + nodeId);
          Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
+         if (subMaps==null || subMaps.size()==0)
+         {
+            log.warn("Couldn't find any binding to failOver from serverId=" +nodeId);
+            return;
+         }
          ArrayList namesToRemove = new ArrayList();
          for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
          {
@@ -1157,7 +1153,6 @@
             clusteredQueue.load();
             clusteredQueue.activate();
             addBinding(newBinding);
-            System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
             sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
          }
       }
@@ -1218,6 +1213,7 @@
       PrintWriter out = new PrintWriter(buffer);
       out.print(super.printBindingInformation());
 
+      out.println("<br>FailOver bindings");
       out.println("<table border=1><tr><td>Node</td><td>ChannelID</td><td>Binding</td>");
 
       for (Iterator iter = this.failedBindings.entrySet().iterator(); iter.hasNext();)
@@ -1245,21 +1241,44 @@
 
       out.println("<br>Router Information");
 
+      out.println("<table border=1><tr><td>Queue Route</td><td>Local Queue</td><td>Elements</td></tr>");
+
       for (Iterator iterRouter = routerMap.entrySet().iterator();iterRouter.hasNext();)
       {
          Map.Entry entry = (Map.Entry)iterRouter.next();
          ClusterRouter router = (ClusterRouter)entry.getValue();
-         out.println("<br> queue " + entry.getKey() + " being routed to:");
-         out.println("<br>    LocalQueue = " + router.getLocalQueue());
+         out.println("<tr><td>" + entry.getKey() + "</td><td>" + router.getLocalQueue() + "</td>");
 
+         out.println("<td>");
+
+         out.println("<table border=1>");
+
+         if (!router.getFailedQueues().isEmpty())
+         {
+            out.println("<tr><td><b>Failed Over Queues</b></td><</tr>");
+            for (Iterator queuesIterator = router.getFailedQueues().iterator();queuesIterator.hasNext();)
+            {
+               Object queue = queuesIterator.next();
+               out.println("<tr><td>" + queue + "</td></tr>");
+            }
+         }
+
+         out.println("<tr><td><b>Queues</b></td><</tr>");
+
          for (Iterator queuesIterator = router.getQueues().iterator();queuesIterator.hasNext();)
          {
             Object queueRouted = queuesIterator.next();
-            out.println("<br>  RoutedQueue=" + queueRouted + " class=" + queueRouted.getClass().getName());
+            out.println("<tr><td>" + queueRouted + "</td></tr>");
          }
 
+         out.println("</table>");
+
+         out.println("</td></tr>");
+
       }
 
+      out.println("</table>");
+
       return buffer.toString();
    }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,10 +21,6 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
@@ -32,6 +28,10 @@
 import org.jboss.messaging.core.Receiver;
 import org.jboss.messaging.core.tx.Transaction;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
 /**
  * 
  * A DefaultRouter
@@ -166,8 +166,32 @@
    {
       if (trace) { log.trace(this + " routing ref " + reference); }
 
-      //Favour the local queue
+      //Favour the local queue or the failedOver queue in round robbin
 
+      if (!failedOverQueues.isEmpty())
+      {
+
+         if (trace) { log.trace("Round robbing on FailedOver queue, currentTarget=" + target);}
+         LocalClusteredQueue queueToUse = null;
+
+         if (target==-1)
+         {
+            queueToUse = (LocalClusteredQueue)this.localQueue; 
+         }
+         else
+         {
+            queueToUse = (LocalClusteredQueue)failedOverQueues.get(target);
+         }
+
+         incTargetFailedOver();
+         
+         Delivery del = queueToUse.handle(observer, reference, tx);
+
+         if (trace) { log.trace(this+" routed to failed queue, using failedOver round robbing, returned " + del); }
+         
+         return del;
+      }
+      else
       if (localQueue != null)
       {
          //The only time the local queue won't accept is if the selector doesn't
@@ -207,6 +231,17 @@
       return null;
    }
 
+   private void incTargetFailedOver()
+   {
+      target++;
+
+      if (target == failedOverQueues.size())
+      {
+         target = -1; // use the local queue
+      }
+   }
+
+
    private void incTarget()
    {
       target++;
@@ -217,6 +252,12 @@
       }
    }
 
+
+   public java.util.List getFailedQueues()
+   {
+      return failedOverQueues;
+   }
+
    public List getQueues()
    {
       List queues = new ArrayList();

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,17 +21,9 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.*;
 import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
@@ -40,8 +32,6 @@
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.util.Future;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * 
  * A LocalClusteredQueue
@@ -141,6 +131,12 @@
    {
       return nodeId;
    }
+
+
+   public String toString()
+   {
+      return "LocalClusteredQueue[" + this.getChannelID() + "/" + this.getName() +"]"; 
+   }
    
    /*
     * This is the same as the normal handle() method on the Channel except it doesn't

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java	2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java	2006-11-09 22:57:01 UTC (rev 1556)
@@ -1,22 +1,17 @@
 package org.jboss.test.messaging.core.ha;
 
 import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossMessageConsumer;
 import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.ConsumerState;
 import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.client.state.ConsumerState;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.message.TextMessageProxy;
-import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
-import org.jboss.test.messaging.tools.ServerManagement;
 
-
 import javax.jms.*;
-import javax.management.ObjectName;
 
 /** Start two JBoss instances (clustered) to run these tests.
  *  */
@@ -141,62 +136,6 @@
         assertNotNull(consumer.receive(2000));
     }
 
-    public void testDurableTopicCluster() throws Exception
-    {
-        log.info("++testDurableTopicCluster");
-
-        log.info(">>Lookup Queue");
-        Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
-
-        JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
-        connFirstServer.setClientID("test");
-        connFirstServer.start();
-        JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
-
-        MessageConsumer consumer = sessionFirstServer.createDurableSubscriber((Topic)destination,"test");
-
-        MessageProducer producer = sessionFirstServer.createProducer(destination);
-
-        for (int i=0;i<10;i++)
-        {
-            producer.send(sessionFirstServer.createTextMessage("Test"  + i));
-        }
-
-        Object objectReceived=consumer.receive(5000);
-        if (objectReceived!=null)
-        {
-            System.out.println("Object received=" + objectReceived);
-        }
-        assertNull(objectReceived);
-
-        sessionFirstServer.commit();
-
-
-        for (int i=0;i<5;i++)
-        {
-            assertNotNull(consumer.receive(1000));
-        }
-
-        sessionFirstServer.rollback();
-        connFirstServer.close();
-
-
-        JBossConnection connectionSecondServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
-        connectionSecondServer.setClientID("test");
-        connectionSecondServer.start();
-
-        JBossSession sessionSecondServer = (JBossSession)connectionSecondServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
-
-        consumer = sessionSecondServer.createDurableSubscriber((Topic)destination,"test");
-
-        for (int i=0;i<10;i++)
-        {
-            assertNotNull(consumer.receive(1000));
-        }
-
-        assertNull(consumer.receive(1000));
-    }
-
     public void testTopicSubscriber() throws Exception
     {
         log.info("++testTopicSubscriber");
@@ -232,6 +171,9 @@
 
         receiveMessage("consumerHA",consumerHA,true,false);
 
+        session.commit();
+        //if (true) return;
+
         Object txID = sessionState.getCurrentTxId();
 
         producer.send(session.createTextMessage("Hello again before failover"));
@@ -267,26 +209,117 @@
         System.out.println("TransactionID on client = " + txID);
         log.info(">>Final commit");
 
-        JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+       /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
         connSecondServer.start();
         JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
+        MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
 
         session.commit();
 
+        /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
         receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
-        receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
-        receiveMessage("consumerSecondServer",consumerSecondServer,true,true);
+        receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
 
         log.info("Calling alternate receiver");
         receiveMessage("consumerHA",consumerHA,true,false);
         receiveMessage("consumerHA",consumerHA,true,false);
         receiveMessage("consumerHA",consumerHA,true,true);
 
+
         session.commit();
 
     }
 
+   public void testQueueHA() throws Exception
+   {
+       log.info("++testTopicSubscriber");
+
+       log.info(">>Lookup Queue");
+       Destination destination = (Destination)getCtx1().lookup("queue/testDistributedQueue");
+
+       log.info("Creating connection server1");
+       JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
+       conn.setClientID("testClient");
+       conn.start();
+
+       JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+       ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
+       SessionState sessionState = (SessionState)clientSessionDelegate.getState();
+
+       MessageConsumer consumerHA = session.createConsumer(destination);
+       JBossMessageConsumer jbossConsumerHA =(JBossMessageConsumer)consumerHA;
+
+       org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate)jbossConsumerHA.getDelegate();
+       ConsumerState consumerState = (ConsumerState)clientDelegate.getState();
+
+       log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+       log.info(">>Creating Producer");
+       MessageProducer producer = session.createProducer(destination);
+       log.info(">>creating Message");
+       Message message = session.createTextMessage("Hello Before");
+       log.info(">>sending Message");
+       producer.send(message);
+       session.commit();
+
+       session.commit();
+       //if (true) return;
+
+       Object txID = sessionState.getCurrentTxId();
+
+       ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+
+       JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+       log.info(">>Creating alternate connection");
+       JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+       log.info("NewConnectionCreated=" + conn2);
+
+       log.info(">>Failling over");
+       assertSame(originalRemoting,delegate.getRemotingConnection());
+       conn.getDelegate().failOver(conn2.getDelegate());
+
+       try {
+           originalRemoting.stop();
+       } catch (Throwable throwable) {
+           throwable.printStackTrace();
+       }
+
+
+       assertNotSame(originalRemoting,delegate.getRemotingConnection());
+
+       //System.out.println("Kill server1"); Thread.sleep(10000);
+       assertEquals(txID,sessionState.getCurrentTxId());
+       System.out.println("TransactionID on client = " + txID);
+       log.info(">>Final commit");
+
+       session.commit();
+
+       log.info("Calling alternate receiver");
+       receiveMessage("consumerHA",consumerHA,true,false);
+       receiveMessage("consumerHA",consumerHA,true,true);
+
+       session.commit();
+
+       for (int i=0;i<30;i++)
+       {
+          log.info("Message Sent " + i);
+          producer.send(session.createTextMessage("Message " + i));
+       }
+      session.commit();
+
+      Thread.sleep(5000);
+
+       TextMessage messageLoop = null;
+       while (!((messageLoop = (TextMessage) consumerHA.receive(5000)) == null))
+       {
+          log.info("Message received = " + messageLoop.getText());
+       }
+
+   }
+
+
     private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
     {
         MessageProxy message = (MessageProxy)consumer.receive(3000);




More information about the jboss-cvs-commits mailing list