[jboss-cvs] JBoss Messaging SVN: r1510 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/state src/main/org/jboss/jms/message src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/tx src/main/org/jboss/jms/util src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/ha

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 20 20:34:08 EDT 2006


Author: clebert.suconic at jboss.com
Date: 2006-10-20 20:33:54 -0400 (Fri, 20 Oct 2006)
New Revision: 1510

Added:
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
Removed:
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java
Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 -ServerSide failover first iteration

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -141,7 +141,7 @@
          // call pre or postDeliver so messages won't be acked, or stored in session/tx
          sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
 
-         cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true);
+         cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true,-1l);
       }
       finally
       {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -40,7 +40,7 @@
  *
  * $Id$
  */
-class JBossMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber, Serializable
+public class JBossMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber, Serializable
 {   
    // Constants -----------------------------------------------------  
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -252,7 +252,7 @@
          tccc.set(getClass().getClassLoader());
 
          ConsumerDelegate consumerDelegate = delegate.
-            createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false);
+            createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false,-1l);
          
          return new JBossMessageConsumer(consumerDelegate);
       }
@@ -305,7 +305,7 @@
          tccc.set(getClass().getClassLoader());
 
          ConsumerDelegate consumerDelegate =
-            delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false);
+            delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false,-1l);
 
          return new JBossMessageConsumer(consumerDelegate);
       }
@@ -339,7 +339,7 @@
          messageSelector = null;
       }
       ConsumerDelegate consumerDelegate =
-         delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false);
+         delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false,-1l);
       return new JBossMessageConsumer(consumerDelegate);
    }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -283,17 +283,19 @@
 
     private void handleFailoverOnConsumer(ConnectionState connectionState,SessionState sessionState, ConsumerState consumerState, ClientSessionDelegate sessionDelegate, int oldServerId) throws JMSException
     {
+        ClientConsumerDelegate currentConsumerDelegate = (ClientConsumerDelegate)consumerState.getDelegate();
+
         if (trace)
         {
             log.trace("handleFailoverOnConsumer: creating alternate consumer");
         }
-        ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)sessionDelegate.createConsumerDelegate((JBossDestination) consumerState.getDestination(),consumerState.getSelector(),consumerState.isNoLocal(),consumerState.getSubscriptionName(),false);
+        ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)sessionDelegate.createConsumerDelegate((JBossDestination) consumerState.getDestination(),consumerState.getSelector(),consumerState.isNoLocal(),consumerState.getSubscriptionName(),false, currentConsumerDelegate.getChannelId());
         if (trace)
         {
             log.trace("handleFailoverOnConsumer: alternate consumer created");
         }
 
-        ClientConsumerDelegate currentConsumerDelegate = (ClientConsumerDelegate)consumerState.getDelegate();
+
         currentConsumerDelegate.transferHAState(newConsumerDelegate);
 
         int oldConsumerId = consumerState.getConsumerID();
@@ -301,6 +303,8 @@
         ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
         consumerState.failOver(newState);
 
+        connectionState.getResourceManager().handleFailover(sessionState.getCurrentTxId(),oldConsumerId,consumerState.getConsumerID());
+
         CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
 
         MessageCallbackHandler handler = cm.unregisterHandler(oldServerId,oldConsumerId);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -92,7 +92,7 @@
       //Now we have finished creating the client consumer, we can tell the SCD
       //we are ready
       consumerDelegate.more();
-      
+
       return consumerDelegate;
    }
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -52,20 +52,28 @@
    
    protected int bufferSize;
 
+   protected long channelId;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ClientConsumerDelegate(int objectID, int bufferSize)
+   public ClientConsumerDelegate(int objectID, long channelId, int bufferSize)
    {
       super(objectID);
       this.bufferSize = bufferSize;
+      this.channelId = channelId;
    }
    
    public ClientConsumerDelegate()
    {      
    }
 
+   public long getChannelId()
+   {
+       return channelId;
+   }
+
    // ConsumerDelegate implementation -------------------------------
 
    /**
@@ -171,7 +179,7 @@
 
    public String toString()
    {
-      return "ConsumerDelegate[" + id + "]";
+      return "ConsumerDelegate[" + id + "](ChannelId=" + this.channelId+")" ;
    }
 
    // Protected -----------------------------------------------------
@@ -184,9 +192,12 @@
    }
 
 
+
+
     public void transferHAState(DelegateSupport copyFrom)
     {
         super.transferHAState(copyFrom);
+        this.channelId = ((ClientConsumerDelegate)copyFrom).channelId;
         this.getMetaData().removeMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID);
         this.getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID,copyFrom.getMetaData().getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID), PayloadKey.TRANSIENT);
     }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -186,7 +186,8 @@
     */
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
-                                                  boolean connectionConsumer) throws JMSException
+                                                  boolean connectionConsumer,
+                                                  long oldChannelID) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -196,6 +196,7 @@
     public void failOver(ConnectionState newState)
     {
         this.serverID = newState.serverID;
+        this.idGenerator = newState.idGenerator;
     }
 
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -55,6 +55,11 @@
 
    protected ConnectionFactoryDelegate cfd;
 
+   public ConnectionFactoryDelegate getDelegate()
+   {
+       return cfd;
+   }
+
    // Constructors --------------------------------------------------
 
    public MessageIdGenerator(ConnectionFactoryDelegate cfd, int blockSize)  throws JMSException

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -141,7 +141,7 @@
                                                   String selectorString,
                                                   boolean noLocal,
                                                   String subscriptionName,
-                                                  boolean isCC) throws JMSException
+                                                  boolean isCC, long oldchannelID) throws JMSException
    {
       try
       {
@@ -382,14 +382,18 @@
          }
          
          int prefetchSize = connectionEndpoint.getPrefetchSize();
-         
+
+         if (oldchannelID>=0)
+         {
+             ((PagingFilteredQueue)binding.getQueue()).transferChannel(oldchannelID);
+         }
          ServerConsumerEndpoint ep =
             new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
                                        this, selectorString, noLocal, jmsDestination, prefetchSize);
           
          JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
                      
-         ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize);
+         ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(), prefetchSize);
                        
          putConsumerEndpoint(consumerID, ep); // caching consumer locally
          

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -47,7 +47,7 @@
 { 
    ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                            boolean noLocal, String subscriptionName,
-                                           boolean connectionConsumer) throws JMSException;   
+                                           boolean connectionConsumer, long oldchannelID) throws JMSException;   
    
    BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
       throws JMSException;

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -85,9 +85,9 @@
    
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
-                                                  boolean connectionConsumer) throws JMSException
+                                                  boolean connectionConsumer,long oldchannelID) throws JMSException
    {
-      return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer);
+      return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer, oldchannelID);
    }
 
    public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -96,6 +96,12 @@
    {
       return consumerID;
    }
+
+   /** Used to change ack's id during failover */
+   public void setConsumerID(int consumerID)
+   {
+       this.consumerID=consumerID;
+   }
    
    public MessageProxy getMessage()
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -119,6 +119,16 @@
       
       tx.getMessages().add(m);
    }
+
+    /** Navigate on ACK and change clientIDs on every ACK not sent yet */
+    public void handleFailover(Object xid, int oldClientId, int newClientId)
+    {
+        if (trace) { log.trace("handleFailover:: Transfering clientIds on ACKs from  " + oldClientId + " to " + newClientId); }
+
+        TxState tx = getTx(xid);
+        tx.handleFailover(oldClientId, newClientId);
+    }
+
    
    /**
     * Add an acknowledgement to the transaction

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -95,6 +95,21 @@
    {
       this.state = state;
    }
+
+
+   /** Navigate on ACK and change clientIDs on every ACK not sent yet */
+   public void handleFailover(int oldClientId, int newClientId)
+   {
+       Iterator ackIterator = acks.iterator();
+       while (ackIterator.hasNext())
+       {
+           AckInfo ackInfo = (AckInfo)ackIterator.next();
+           if (ackInfo.getConsumerID()==oldClientId)
+           {
+               ackInfo.setConsumerID(newClientId);
+           }
+       }
+   }
     
    // Streamable implementation ---------------------------------
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -64,7 +64,7 @@
 
       if (Node.CDATA_SECTION_NODE == type)
       {
-         return "<![CDATA[" + n.getNodeValue() + "]]>";
+         return  n.getNodeValue();
       }
 
       if (name.startsWith("#"))

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -532,7 +532,7 @@
          ListIterator iter = null;
          
          MessageReference ref = null;
-         
+
          while (true)
          {           
             synchronized (refLock)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -245,22 +245,66 @@
          {
             firstPagingOrder = nextPagingOrder = 0;
          }
-         
-         Map refMap = processReferences(ili.getRefInfos()); 
-        
-         Iterator iter = ili.getRefInfos().iterator();
-         while (iter.hasNext())
-         {
-            ReferenceInfo info = (ReferenceInfo)iter.next();
 
-            addFromRefInfo(info, refMap);
-         }                    
+          pushReferences(ili);
+
+          //Maybe we need to load some paged refs
          
-         //Maybe we need to load some paged refs
-         
          while (checkLoad()) {}
       }
-   }      
+   }
+
+    /** We extracted this as a method fro mload, as transferChannel (for HA recovery) also needs the same routine. */
+    private Map pushReferences(InitialLoadInfo ili) throws Exception {
+        Map refMap = processReferences(ili.getRefInfos());
+
+        Iterator iter = ili.getRefInfos().iterator();
+        while (iter.hasNext())
+        {
+           ReferenceInfo info = (ReferenceInfo)iter.next();
+
+           addFromRefInfo(info, refMap);
+        }
+        return refMap;
+    }
+
+    /** Transfer messages for an old channel to a new channel.
+     *  This is used during HA failoever when a connection fail and messages will need to be transfered to a new node */
+    public void transferChannel(long oldchannelID) throws Exception
+    {
+        log.info("Transfering state from " + oldchannelID +" into " + this.getChannelID());
+        synchronized (refLock)
+        {
+            while(true)
+            {
+                InitialLoadInfo ili =pm.getInitialReferenceInfos(oldchannelID,fullSize);
+                if (ili.getRefInfos().size()==0)
+                {
+                    break;
+                }
+
+                log.info("got " + ili.getRefInfos().size() + " references to move");
+
+
+
+                Map refMap = pushReferences(ili);
+                Iterator referencesIterator = ili.getRefInfos().iterator();
+                while (referencesIterator.hasNext())
+                {
+                    ReferenceInfo info = (ReferenceInfo)referencesIterator.next();
+                    log.info("transfering reference " + info.getMessageId() + " from " + oldchannelID + " into " + this.getChannelID());
+                    MessageReference messageReference = (MessageReference )refMap.get(new Long(info.getMessageId()));
+
+                    ///// BIG TODOS:
+                    ///// What to do with transaction here?
+                    ///// Do we need to remove from old channel? (Consider the case of the Old Server coming back... I guess we should.. bu we have to check this)
+                    pm.addReference(this.getChannelID(),messageReference,null);
+                    pm.removeReference(oldchannelID,messageReference, null);
+                }
+            }
+        }
+        log.info("transfer state done");
+    }
     
       
    public void unload() throws Exception

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -1486,6 +1486,8 @@
       public void viewAccepted(View view)
       {
          if (trace) { log.trace(nodeId + " Got new view, size=" + view.size()); }
+
+         log.info("JBoss Messaging DefaultClusteredPostOffice Accepted new view:" + view);;
          
          if (currentView != null)
          {

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -50,8 +50,8 @@
     protected Context ctx2;
 
 
-    protected String NODE1 =System.getProperty("NODE1","localhost");
-    protected String NODE2 =System.getProperty("NODE2","localhost");
+    protected String NODE1 =System.getProperty("NODE1","localhost:1199");
+    protected String NODE2 =System.getProperty("NODE2","localhost:1299");
 
     public void setUp() throws Exception
     {
@@ -76,7 +76,7 @@
         contextProperties.put(Context.INITIAL_CONTEXT_FACTORY,
             jndiProviderClass);
         contextProperties.put(Context.PROVIDER_URL,
-            "jnp://"+ host + ":1099");
+            "jnp://"+ host);
         return new InitialContext(contextProperties);
 
     }

Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -0,0 +1,356 @@
+package org.jboss.test.messaging.core.ha;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.message.TextMessageProxy;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+
+import javax.jms.*;
+import javax.management.ObjectName;
+
+/** Start two JBoss instances (clustered) to run these tests.
+ *  */
+public class ReconnectClusteredTest extends HATestBase
+{
+
+    public void testSimpleReconnect() throws Exception
+    {
+        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
+        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+        ConnectionState state = (ConnectionState)delegate.getState();
+
+        assertFalse(state.isStarted());
+        conn.start();
+        assertTrue(state.isStarted());
+
+        JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
+        conn.getDelegate().failOver(conn2.getDelegate());
+
+        conn.stop();
+        assertFalse(state.isStarted());
+
+    }
+
+    public void testSimpleReconnectWithClientID() throws Exception
+    {
+        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
+        conn.setClientID("someClient");
+        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+        ConnectionState state = (ConnectionState)delegate.getState();
+
+        assertFalse(state.isStarted());
+        conn.start();
+        assertTrue(state.isStarted());
+
+        JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
+        conn.getDelegate().failOver(conn2.getDelegate());
+
+        // force recovering the clientID from server
+        state.setClientID(null);
+        assertEquals ("someClient",conn.getClientID());
+
+        conn.stop();
+        assertFalse(state.isStarted());
+
+    }
+
+    public void testWithSession() throws Exception
+    {
+        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
+        Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+        ConnectionState state = (ConnectionState)delegate.getState();
+
+        JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
+        conn.getDelegate().failOver(conn2.getDelegate());
+    }
+
+    public void testSimpleWithOneProducerOnTopic() throws Exception
+    {
+        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
+        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
+        MessageProducer producer = session.createProducer(destination);
+
+        Message message = session.createTextMessage("Hello Before");
+        producer.send(message);
+
+        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+        ConnectionState state = (ConnectionState)delegate.getState();
+
+        JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+        conn.getDelegate().failOver(conn2.getDelegate());
+
+        message = session.createTextMessage("Hello After");
+        producer.send(message);
+    }
+
+    public void testSimpleWithOneProducerOnQueue() throws Exception
+    {
+        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
+        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        Destination destination = (Destination)getCtx1().lookup("queue/testQueue");
+        MessageProducer producer = session.createProducer(destination);
+
+        Message message = session.createTextMessage("Hello Before");
+        producer.send(message);
+
+        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+        ConnectionState state = (ConnectionState)delegate.getState();
+
+        JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+        conn.getDelegate().failOver(conn2.getDelegate());
+
+        message = session.createTextMessage("Hello After");
+        producer.send(message);
+    }
+
+    public void testTopicCluster() throws Exception
+    {
+        log.info("++testTopicCluster");
+
+        log.info(">>Lookup Queue");
+        Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
+
+        JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
+        connFirstServer.start();
+        JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+
+        JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+        connSecondServer.start();
+        JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = sessionFirstServer.createProducer(destination);
+
+        MessageConsumer consumer = sessionSecondServer.createConsumer(destination);
+
+        producer.send(sessionFirstServer.createTextMessage("Hello"));
+
+        assertNotNull(consumer.receive(2000));
+    }
+
+    public void testDurableTopicCluster() throws Exception
+    {
+        log.info("++testDurableTopicCluster");
+
+        log.info(">>Lookup Queue");
+        Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
+
+        JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
+        connFirstServer.setClientID("test");
+        connFirstServer.start();
+        JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = sessionFirstServer.createDurableSubscriber((Topic)destination,"test");
+
+        MessageProducer producer = sessionFirstServer.createProducer(destination);
+
+        for (int i=0;i<10;i++)
+        {
+            producer.send(sessionFirstServer.createTextMessage("Test"  + i));
+        }
+
+        Object objectReceived=consumer.receive(5000);
+        if (objectReceived!=null)
+        {
+            System.out.println("Object received=" + objectReceived);
+        }
+        assertNull(objectReceived);
+
+        sessionFirstServer.commit();
+
+
+        for (int i=0;i<5;i++)
+        {
+            assertNotNull(consumer.receive(1000));
+        }
+
+        sessionFirstServer.rollback();
+        connFirstServer.close();
+
+
+        JBossConnection connectionSecondServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
+        connectionSecondServer.setClientID("test");
+        connectionSecondServer.start();
+
+        JBossSession sessionSecondServer = (JBossSession)connectionSecondServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+        consumer = sessionSecondServer.createDurableSubscriber((Topic)destination,"test");
+
+        for (int i=0;i<10;i++)
+        {
+            assertNotNull(consumer.receive(1000));
+        }
+
+        assertNull(consumer.receive(1000));
+    }
+
+    public void testTopicSubscriber() throws Exception
+    {
+        log.info("++testSimpleWithOneProducerTransacted");
+
+        log.info(">>Lookup Queue");
+        Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
+
+        JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
+        connFirstServer.start();
+        JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+
+        log.info("Creating connection server1");
+        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
+        conn.setClientID("testClient");
+        conn.start();
+
+        log.info("ConnectionCreated=" + conn);
+        log.info(">>Creating Sessions");
+
+        JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+        ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
+        SessionState sessionState = (SessionState)clientSessionDelegate.getState();
+        //MessageConsumer consumerHA = session.createConsumer(destination);
+
+        MessageConsumer consumerHA = session.createDurableSubscriber((Topic)destination,"T1");
+
+        log.info(">>Creating Producer");
+        MessageProducer producer = session.createProducer(destination);
+        log.info(">>creating Message");
+        Message message = session.createTextMessage("Hello Before");
+        log.info(">>sending Message");
+        producer.send(message);
+        session.commit();
+
+        assertNotNull(consumerHA.receive(3000));
+
+        Object txID = sessionState.getCurrentTxId();
+
+        producer.send(session.createTextMessage("Hello again before failover"));
+
+        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+
+        JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+        ConnectionState state = (ConnectionState)delegate.getState();
+
+        log.info(">>Creating alternate connection");
+        JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+        log.info("NewConnectionCreated=" + conn2);
+
+        log.info(">>Failling over");
+        assertSame(originalRemoting,delegate.getRemotingConnection());
+        conn.getDelegate().failOver(conn2.getDelegate());
+
+        try {
+            originalRemoting.stop();
+        } catch (Throwable throwable) {
+            throwable.printStackTrace();
+        }
+
+
+        assertNotSame(originalRemoting,delegate.getRemotingConnection());
+
+        //System.out.println("Kill server1"); Thread.sleep(10000);
+
+        message = session.createTextMessage("Hello After");
+        log.info(">>Sending new message");
+        producer.send(message);
+
+        assertEquals(txID,sessionState.getCurrentTxId());
+        System.out.println("TransactionID on client = " + txID);
+        log.info(">>Final commit");
+
+        JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+        connSecondServer.start();
+        JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
+
+        session.commit();
+
+        //assertNotNull(consumerSecondServer.receive(3000));
+        assertNotNull(consumerSecondServer.receive(3000));
+        assertNotNull(consumerSecondServer.receive(3000));
+        assertNull(consumerSecondServer.receive(3000));
+
+        log.info("Calling alternate receiver");
+        //assertNotNull(consumerHA.receive(1000));
+        assertNotNull(consumerHA.receive(1000));
+        assertNotNull(consumerHA.receive(1000));
+        assertNull(consumerHA.receive(1000));
+
+    }
+
+   public void testSimplestDurableSubscription() throws Exception
+   {
+      ConnectionFactory cf = factoryServer1;
+      Topic topic = (Topic)getCtx1().lookup("topic/testDistributedTopic");
+
+      Connection conn = cf.createConnection();
+
+      conn.setClientID("brookeburke");
+
+      conn.start();
+
+      Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer prod = s.createProducer(topic);
+      prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      log.info("      ******************                       s.createDurableSubscriber(topic, \"monicabelucci\")                 ****************************");
+      MessageConsumer firstconsumer = (MessageConsumer)s.createDurableSubscriber(topic, "monicabelucci");
+
+
+      System.out.println("\n\n****************************************   consumer=" + firstconsumer.toString() + "\n\n");
+      Object message=firstconsumer.receive(2000);
+
+      log.info(" \n\n*******************************                          message Received=" + message + "\n\n");
+
+      prod.send(s.createTextMessage("k"));
+
+      conn.close();
+
+      conn = cf.createConnection();
+      conn.setClientID("brookeburke");
+
+      log.info("      ******************                       conn.createSession(...);                                                       ****************************");
+      s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+      log.info("      ******************                       s.createDurableSubscriber(topic, \"monicabelucci\")                 ****************************");
+      MessageConsumer durable = s.createDurableSubscriber(topic, "monicabelucci");
+
+       log.info("      ******************                       conn.start();                                                       ****************************");
+       conn.start();
+
+      log.info("      ******************                       TextMessage tm = (TextMessage)durable.receive();                    ****************************");
+      TextMessage tm = (TextMessage)durable.receive();
+      assertEquals("k", tm.getText());
+
+      log.info("      ******************                       Message m = durable.receive(1000); (expected to be null)            ****************************");
+      Message m = durable.receive(1000);
+
+       prod = s.createProducer(topic);
+       prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+       prod.send(s.createTextMessage("Hello"));
+
+       Connection conn2 = factoryServer2.createConnection();
+       conn2.setClientID("clientTest");
+
+       Session session = conn2.createSession(false,Session.AUTO_ACKNOWLEDGE);
+       conn2.start();
+
+       MessageConsumer consumer2 = session.createDurableSubscriber(topic,"spongebog");
+       System.out.println("Consumer2=" + consumer2);
+
+
+   }
+
+
+}

Deleted: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java	2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java	2006-10-21 00:33:54 UTC (rev 1510)
@@ -1,382 +0,0 @@
-package org.jboss.test.messaging.core.ha;
-
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.TextMessageProxy;
-
-import javax.jms.*;
-
-
-/** Start two JBoss instances (non clustered) to run these tests.
- *  */
-public class ReconnectTest extends HATestBase
-{
-
-    public void testSimpleReconnect() throws Exception
-    {
-        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        assertFalse(state.isStarted());
-        conn.start();
-        assertTrue(state.isStarted());
-
-        JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
-        conn.getDelegate().failOver(conn2.getDelegate());
-
-        conn.stop();
-        assertFalse(state.isStarted());
-
-    }
-
-    public void testSimpleReconnectWithClientID() throws Exception
-    {
-        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
-        conn.setClientID("someClient");
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        assertFalse(state.isStarted());
-        conn.start();
-        assertTrue(state.isStarted());
-
-        JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
-        conn.getDelegate().failOver(conn2.getDelegate());
-
-        // force recovering the clientID from server
-        state.setClientID(null);
-        assertEquals ("someClient",conn.getClientID());
-
-        conn.stop();
-        assertFalse(state.isStarted());
-
-    }
-
-    public void testWithSession() throws Exception
-    {
-        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
-        Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
-
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
-        conn.getDelegate().failOver(conn2.getDelegate());
-    }
-
-    public void testSimpleWithOneProducerOnTopic() throws Exception
-    {
-        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
-        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
-        MessageProducer producer = session.createProducer(destination);
-
-        Message message = session.createTextMessage("Hello Before");
-        producer.send(message);
-
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
-        conn.getDelegate().failOver(conn2.getDelegate());
-
-        System.out.println("Kill server1");
-        Thread.sleep(10000);
-
-
-        message = session.createTextMessage("Hello After");
-        producer.send(message);
-    }
-
-    public void testSimpleWithOneProducerOnQueue() throws Exception
-    {
-        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
-        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        Destination destination = (Destination)getCtx1().lookup("queue/testQueue");
-        MessageProducer producer = session.createProducer(destination);
-
-        Message message = session.createTextMessage("Hello Before");
-        producer.send(message);
-
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
-        conn.getDelegate().failOver(conn2.getDelegate());
-
-        System.out.println("Kill server1");
-        Thread.sleep(10000);
-
-
-        message = session.createTextMessage("Hello After");
-        producer.send(message);
-    }
-
-    public void testSimpleWithOneProducerTransacted() throws Exception
-    {
-        log.info("++testSimpleWithOneProducerTransacted");
-
-        log.info(">>Lookup Queue");
-        Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
-
-        log.info("Creating connections used for assertion (not failed over)");
-        JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
-        connSecondServer.start();
-        JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
-
-        JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
-        connFirstServer.start();
-        JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerFirstServer = sessionFirstServer.createConsumer(destination);
-
-
-        log.info("Creating connection server1");
-        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
-
-        log.info("ConnectionCreated=" + conn);
-        log.info(">>Creating Sessions");
-
-        JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
-        ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
-        SessionState sessionState = (SessionState)clientSessionDelegate.getState();
-        log.info(">>Creating Producer");
-        MessageProducer producer = session.createProducer(destination);
-        log.info(">>Creating Producer - ");
-        log.info(">>creating Message");
-        Message message = session.createTextMessage("Hello Before");
-        log.info(">>sending Message");
-        producer.send(message);
-
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        log.info("sending first commit");
-        session.commit();
-        Object txID = sessionState.getCurrentTxId();
-
-        assertNotNull(consumerFirstServer.receive(2000));
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        producer.send(session.createTextMessage("Hello again before failover"));
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-
-        JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        log.info(">>Creating alternate connection");
-        JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
-        log.info("NewConnectionCreated=" + conn2);
-
-        log.info(">>Failling over");
-        assertSame(originalRemoting,delegate.getRemotingConnection());
-        conn.getDelegate().failOver(conn2.getDelegate());
-        try {
-            originalRemoting.stop();
-        } catch (Throwable throwable) {
-            throwable.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
-
-        assertNotSame(originalRemoting,delegate.getRemotingConnection());
-
-        //System.out.println("Kill server1"); Thread.sleep(10000);
-
-        message = session.createTextMessage("Hello After");
-        log.info(">>Sending new message");
-        producer.send(message);
-
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        assertEquals(txID,sessionState.getCurrentTxId());
-        System.out.println("TransactionID on client = " + txID);
-        log.info(">>Final commit");
-        session.commit();
-
-        log.info("Checking receive on second server");
-        assertNotNull(consumerSecondServer.receive(1000));
-        assertNotNull(consumerSecondServer.receive(1000));
-        log.info("Checking receive on first server");
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-    }
-
-    public void testSimpleWithOneProducerTransactedWithoutHA() throws Exception
-    {
-        log.info("++testSimpleWithOneProducerTransacted");
-
-        log.info(">>Lookup Queue");
-        Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
-
-        log.info("Creating connections used for assertion (not failed over)");
-        JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
-        connSecondServer.start();
-        JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
-
-        JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
-        connFirstServer.start();
-        JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerFirstServer = sessionFirstServer.createConsumer(destination);
-
-
-        log.info("Creating connection server1");
-        JBossConnection  conn = (JBossConnection)this.factoryServer2.createConnection();
-
-        log.info("ConnectionCreated=" + conn);
-        log.info(">>Creating Sessions");
-
-        JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
-        ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
-        SessionState sessionState = (SessionState)clientSessionDelegate.getState();
-        System.out.println("Size of callbackHandlers=" + sessionState.getCallbackHandlers().size());
-        Object txID = sessionState.getCurrentTxId();
-        log.info(">>Creating Producer");
-        MessageProducer producer = session.createProducer(destination);
-        log.info(">>Creating Producer - ");
-        log.info(">>creating Message");
-        Message message = session.createTextMessage("Hello Before");
-        log.info(">>sending Message");
-        producer.send(message);
-
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        log.info("sending first commit");
-        //session.commit();
-
-        assertNull(consumerFirstServer.receive(2000));
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        TextMessageProxy messagetxt = (TextMessageProxy)session.createTextMessage("Hello again before failover");
-        producer.send(messagetxt);
-        System.out.println("Id=" + messagetxt.getMessage().getConnectionID());
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-
-        JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        log.info(">>Failling over");
-        //System.out.println("Kill server1"); Thread.sleep(10000);
-
-        message = session.createTextMessage("Hello After");
-        log.info(">>Sending new message");
-        producer.send(message);
-
-        assertNull(consumerFirstServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        assertEquals(txID,sessionState.getCurrentTxId());
-        System.out.println("TransactionID on client = " + txID);
-        log.info(">>Final commit");
-        session.commit();
-
-        log.info("Checking receive on second server");
-        assertNull(consumerFirstServer.receive(1000));
-        assertNotNull(consumerSecondServer.receive(3000));
-        assertNotNull(consumerSecondServer.receive(1000));
-        assertNotNull(consumerSecondServer.receive(1000));
-        log.info("Checking receive on first server");
-        assertNull(consumerSecondServer.receive(1000));
-
-    }
-
-    public void testTopicSubscriber() throws Exception
-    {
-        log.info("++testSimpleWithOneProducerTransacted");
-
-        log.info(">>Lookup Queue");
-        Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
-
-        log.info("Creating connections used for assertion (not failed over)");
-        JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
-        connSecondServer.start();
-        JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
-
-        JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
-        connFirstServer.start();
-        JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerFirstServer = sessionFirstServer.createConsumer(destination);
-
-
-        log.info("Creating connection server1");
-        JBossConnection  conn = (JBossConnection)this.factoryServer1.createConnection();
-        conn.start();
-
-        log.info("ConnectionCreated=" + conn);
-        log.info(">>Creating Sessions");
-
-        JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
-        ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
-        SessionState sessionState = (SessionState)clientSessionDelegate.getState();
-        MessageConsumer consumerHA = session.createConsumer(destination);
-        log.info(">>Creating Producer");
-        MessageProducer producer = session.createProducer(destination);
-        log.info(">>creating Message");
-        Message message = session.createTextMessage("Hello Before");
-        log.info(">>sending Message");
-        producer.send(message);
-        log.info("sending first commit");
-        session.commit();
-        Object txID = sessionState.getCurrentTxId();
-
-        assertNotNull(consumerHA.receive(1000));
-        assertNotNull(consumerFirstServer.receive(2000));
-        assertNull(consumerSecondServer.receive(1000));
-        assertNull(consumerHA.receive(1000));
-
-        producer.send(session.createTextMessage("Hello again before failover"));
-
-        ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-
-        JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
-
-        ConnectionState state = (ConnectionState)delegate.getState();
-
-        log.info(">>Creating alternate connection");
-        JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
-        log.info("NewConnectionCreated=" + conn2);
-
-        log.info(">>Failling over");
-        assertSame(originalRemoting,delegate.getRemotingConnection());
-        conn.getDelegate().failOver(conn2.getDelegate());
-        assertNotSame(originalRemoting,delegate.getRemotingConnection());
-
-        //System.out.println("Kill server1"); Thread.sleep(10000);
-
-        message = session.createTextMessage("Hello After");
-        log.info(">>Sending new message");
-        producer.send(message);
-
-        assertEquals(txID,sessionState.getCurrentTxId());
-        System.out.println("TransactionID on client = " + txID);
-        log.info(">>Final commit");
-        session.commit();
-
-        assertNull(consumerFirstServer.receive(1000));
-        assertNotNull(consumerSecondServer.receive(1000));
-        assertNotNull(consumerSecondServer.receive(1000));
-        assertNull(consumerSecondServer.receive(1000));
-
-        assertNotNull(consumerHA.receive(1000));
-        assertNotNull(consumerHA.receive(1000));
-        
-    }
-}




More information about the jboss-cvs-commits mailing list