[jboss-cvs] JBoss Messaging SVN: r1751 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools/jmx/rmi

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 9 18:29:11 EST 2006


Author: timfox
Date: 2006-12-09 18:28:59 -0500 (Sat, 09 Dec 2006)
New Revision: 1751

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
Log:
More progress on failover



Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -130,23 +130,17 @@
       if (delegates == null)
       {
          log.info("Looking for delegates");
-         SimpleMetaData metaData = invocation.getMetaData();
 
-         //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
-
          MethodInvocation methodInvoke = (MethodInvocation)invocation;
-         //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
 
-
-         // this is a hack, but I couldn't get this working through metaData
          // TODO: FIX THIS! metaData should contain CF_DELEGATES
          Object target = methodInvoke.getTargetObject();
+         
          if (target instanceof ClusteredClientConnectionFactoryDelegate)
          {
             delegates = ((ClusteredClientConnectionFactoryDelegate)target).getDelegates();
          }
 
-
          if (delegates != null)
          {
             //TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
@@ -191,6 +185,12 @@
       //TODO implement client side valve to prevent invocations occurring whilst failover is occurring
       
       ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
+      
+      log.info("*** about to failover - waiting for server to finish");
+      
+      //FIXME - this is only temporarily necessary since we need to give the server enough time to
+      //failover before we reconnect - this would be handled by the server side valve
+      Thread.sleep(10000);
 
       log.info("Creating new connection");
       ClientConnectionDelegate newConnection = createConnection(newCF, state.getUser(), state.getPassword());
@@ -357,6 +357,8 @@
                                          int oldServerID)
       throws JMSException
    {
+      log.info("Failing over consumer");
+      
       ClientConsumerDelegate failedConsumerDelegate =
          (ClientConsumerDelegate)failedConsumerState.getDelegate();
 
@@ -379,10 +381,17 @@
       ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
       failedConsumerState.copy(newState);
 
-      failedConnectionState.getResourceManager().
-         handleFailover(failedSessionState.getCurrentTxId(),
-                        oldConsumerID,
-                        failedConsumerState.getConsumerID());
+      if (failedSessionState.isTransacted())
+      {
+         //Replace the old consumer id with the new consumer id
+         
+         //TODO what about XA?? - may have done work in many transactions - so need to replace all
+         
+         failedConnectionState.getResourceManager().
+            handleFailover(failedSessionState.getCurrentTxId(),
+                           oldConsumerID,
+                           failedConsumerState.getConsumerID());
+      }
 
       CallbackManager cm = failedConnectionState.getRemotingConnection().getCallbackManager();
 
@@ -392,7 +401,10 @@
       cm.registerHandler(failedConnectionState.getServerID(),
                          failedConsumerState.getConsumerID(),
                          handler);
+      
       failedSessionState.addCallbackHandler(handler);
+      
+      log.info("failed over consumer");
 
    }
 
@@ -444,11 +456,6 @@
          {
             log.info("********* EXCEPTION DETECTED");
             
-            log.info("handleConnectionException: ", throwable);
-            
-            
-            log.info("Caught connection exception for connection: " + connection);
-            
             handleFailure(connection);
          }
          catch (Throwable e)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -149,11 +149,6 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   public int getFailoverNode(int node) throws JMSException
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
    // Public --------------------------------------------------------
 
    public synchronized Object invoke(Invocation invocation) throws Throwable

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -43,9 +43,5 @@
    byte[] getClientAOPConfig() throws JMSException;
 
    IdBlock getIdBlock(int size) throws JMSException;
-
-   /** Return the node that should take over a given node */
-   int getFailoverNode(int node) throws JMSException;
-
 }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -137,24 +137,6 @@
       }
    }
 
-
-   public int getFailoverNode(int node) throws JMSException
-   {
-      try
-      {
-         //ServerPeer peer = (ServerPeer )serverPeer.getInstance();
-         //return peer.getReplicator().getFailoverNodeID(node);
-
-         // not implemented yet
-         return node;
-      }
-      catch (Throwable t)
-      {
-         throw ExceptionUtil.handleJMSInvocation(t, this + " getFailoverNode");
-      }
-   }
-
-
    public byte[] getClientAOPConfig() throws JMSException
    {
       try

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -73,11 +73,6 @@
       return endpoint.getIdBlock(size);
    }
 
-   public int getFailoverNode(int node) throws JMSException
-   {
-      return endpoint.getFailoverNode(node);
-   }
-
    // AdvisedSupport override ---------------------------------------
 
    public Object getEndpoint()

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -121,14 +121,15 @@
    }
 
     /**
-     * Navigate on ACK and change clientIDs on every ACK not sent yet.
+     * Navigate on ACK and change consumer ids on every ACK not sent yet.
      */
-    public void handleFailover(Object xid, int oldClientId, int newClientId)
+    public void handleFailover(Object xid, int oldConsumerID, int newConsumerID)
     {
-        if (trace) { log.trace("handleFailover:: Transfering clientIds on ACKs from  " + oldClientId + " to " + newClientId); }
+        if (trace) { log.trace("handleFailover:: Transfering consumer id on ACKs from  " + oldConsumerID + " to " + newConsumerID); }
 
         TxState tx = getTx(xid);
-        tx.handleFailover(oldClientId, newClientId);
+        
+        tx.handleFailover(oldConsumerID, newConsumerID);
     }
 
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -97,15 +97,16 @@
    }
 
 
-   /** Navigate on ACK and change clientIDs on every ACK not sent yet */
-   public void handleFailover(int oldClientId, int newClientId)
+   /** Navigate on ACK and change consumer ids on every ACK not sent yet */
+   public void handleFailover(int oldConsumerID, int newConsumerID)
    {
-       for (Iterator ackIterator = acks.iterator();ackIterator.hasNext();)
+       for (Iterator ackIterator = acks.iterator(); ackIterator.hasNext(); )
        {
            AckInfo ackInfo = (AckInfo)ackIterator.next();
-           if (ackInfo.getConsumerID()==oldClientId)
+           
+           if (ackInfo.getConsumerID() == oldConsumerID)
            {
-               ackInfo.setConsumerID(newClientId);
+               ackInfo.setConsumerID(newConsumerID);
            }
        }
    }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -235,7 +235,7 @@
          
          //Load the unpaged references
          InitialLoadInfo ili = pm.getInitialReferenceInfos(channelID, fullSize);
-
+         
          if (ili.getMaxPageOrdering() != null)            
          {
             firstPagingOrder = ili.getMinPageOrdering().longValue();
@@ -248,67 +248,25 @@
          {
             firstPagingOrder = nextPagingOrder = 0;
          }
-
-          pushReferences(ili);
-
-          //Maybe we need to load some paged refs
          
+         log.info("Channel " + this.channelID + " loading " + ili.getRefInfos().size() + " references");
+         
+         Map refMap = processReferences(ili.getRefInfos());
+         
+         Iterator iter = ili.getRefInfos().iterator();
+         while (iter.hasNext())
+         {
+            ReferenceInfo info = (ReferenceInfo)iter.next();
+            
+            addFromRefInfo(info, refMap);
+         }
+         
+         //Maybe we need to load some paged refs
+         
          while (checkLoad()) {}
       }
    }
-
-    /** We extracted this as a method fro mload, as transferChannel (for HA recovery) also needs the same routine. */
-    private Map pushReferences(InitialLoadInfo ili) throws Exception {
-        Map refMap = processReferences(ili.getRefInfos());
-
-        Iterator iter = ili.getRefInfos().iterator();
-        while (iter.hasNext())
-        {
-           ReferenceInfo info = (ReferenceInfo)iter.next();
-
-           addFromRefInfo(info, refMap);
-        }
-        return refMap;
-    }
-
-    /** Transfer messages for an old channel to a new channel.
-     *  This is used during HA failoever when a connection fail and messages will need to be transfered to a new node */
-    public void transferChannel(long oldchannelID) throws Exception
-    {
-        log.info("Transfering state from " + oldchannelID +" into " + this.getChannelID());
-        synchronized (refLock)
-        {
-            while(true)
-            {
-                InitialLoadInfo ili =pm.getInitialReferenceInfos(oldchannelID,fullSize);
-                if (ili.getRefInfos().size()==0)
-                {
-                    break;
-                }
-
-                log.info("got " + ili.getRefInfos().size() + " references to move");
-
-
-
-                Map refMap = pushReferences(ili);
-                Iterator referencesIterator = ili.getRefInfos().iterator();
-                while (referencesIterator.hasNext())
-                {
-                    ReferenceInfo info = (ReferenceInfo)referencesIterator.next();
-                    log.info("transfering reference " + info.getMessageId() + " from " + oldchannelID + " into " + this.getChannelID());
-                    MessageReference messageReference = (MessageReference )refMap.get(new Long(info.getMessageId()));
-
-                    ///// BIG TODOS:
-                    ///// What to do with transaction here?
-                    ///// Do we need to remove from old channel? (Consider the case of the Old Server coming back... I guess we should.. bu we have to check this)
-                    pm.addReference(this.getChannelID(),messageReference,null);
-                    pm.removeReference(oldchannelID,messageReference, null);
-                }
-            }
-        }
-        log.info("transfer state done");
-    }
-    
+   
       
    public void unload() throws Exception
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -1357,8 +1357,6 @@
          }
       }
    }
-
-
    
    /**
     * This method fails over all the queues from node <nodeId> onto this node
@@ -1424,6 +1422,8 @@
             namesToRemove.add(entry);
          }
 
+         log.info("Deleting " + namesToRemove.size() + " bindings from old node");
+         
          for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
          {
             Map.Entry entry = (Map.Entry)iterNames.next();
@@ -1439,6 +1439,8 @@
 
             //Then deleted from the database
             this.deleteBinding(nodeId, queueName);
+            
+            log.info("deleted binding for " + queueName);
 
             //Then an unbind request is sent - this cause other nodes to also remove it from the in memory
             //condition and name maps
@@ -1461,11 +1463,17 @@
             {
                log.info("The current node didn't have a queue " + queueName + " so it's assuming the queue as a regular queue");
             }
+            else
+            {
+               log.info("There is already a queue with that name so adding to failed map");
+            }
            
             //Create a new binding
             Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
                                                     stub.getName(), stub.getChannelID(),
                                                     stub.getFilter(), stub.isRecoverable(), failed);
+            
+            log.info("Created new binding");
 
             //Insert it into the database
             insertBinding(newBinding);
@@ -1476,6 +1484,8 @@
             clusteredQueue.load();
             clusteredQueue.activate();
             
+            log.info("Loaded queue");
+            
             //Add the new binding in memory
             addBinding(newBinding);
             
@@ -2223,11 +2233,11 @@
 
       //Need to evaluate this before we regenerate the failover map
       boolean isFailover = isFailoverNodeForNode(theNodeId.intValue());
+      
+      log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
+      
+      log.info("Crashed: " + crashed);
 
-      //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
-      //since that may cause connection factories to be rebound
-      //generateFailoverMap(currentView);
-
       //Remove any replicant data and non durable bindings for the node - again we need to do this
       //irrespective of whether we crashed
       //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
@@ -2341,8 +2351,6 @@
       
       public void viewAccepted(View newView)
       {
-         //if (trace) { log.trace(DefaultClusteredPostOffice.this  + " got new view: " + newView
-         // + DefaultClusteredPostOffice.this.getOfficeName()); }
          //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
          //TODO:     can't we do the same since this is pretty useful?
          log.info(currentNodeId  + " got new view: " + newView + " postOffice:"

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -74,349 +74,349 @@
     * Test that connections created using a clustered connection factory are created round robin on
     * different servers
     */
-//   public void testRoundRobinConnectionCreation() throws Exception
-//   {
-//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//      
-//      ClusteredClientConnectionFactoryDelegate delegate =
-//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//      
-//      log.info ("number of delegates = " + delegate.getDelegates().length);
-//      log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-//      
-//      assertEquals(3, delegate.getDelegates().length);
-//      
-//      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//      
-//      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//      
-//      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//      
-//      assertEquals(0, cf1.getServerId());
-//      
-//      assertEquals(1, cf2.getServerId());
-//      
-//      assertEquals(2, cf3.getServerId());
-//      
-//      assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-//      
-//      Connection conn1 = null;
-//      
-//      Connection conn2 = null;
-//      
-//      Connection conn3 = null;
-//      
-//      Connection conn4 = null;
-//      
-//      Connection conn5 = null;
-//      
-//      try
-//      {         
-//         conn1 = factory.createConnection();
-//         
-//         conn2 = factory.createConnection();
-//         
-//         conn3 = factory.createConnection();
-//         
-//         conn4 = factory.createConnection();
-//         
-//         conn5 = factory.createConnection();
-//         
-//         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-//         
-//         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-//         
-//         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-//         
-//         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-//         
-//         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-//         
-//         int serverID1 = state1.getServerID();
-//         
-//         int serverID2 = state2.getServerID();
-//         
-//         int serverID3 = state3.getServerID();
-//         
-//         int serverID4 = state4.getServerID();
-//         
-//         int serverID5 = state5.getServerID();
-//         
-//         log.info("server id 1: " + serverID1);
-//         
-//         log.info("server id 2: " + serverID2);
-//         
-//         log.info("server id 3: " + serverID3);
-//         
-//         log.info("server id 4: " + serverID4);
-//         
-//         log.info("server id 5: " + serverID5);
-//         
-//         assertEquals(0, serverID1);
-//         
-//         assertEquals(1, serverID2);
-//         
-//         assertEquals(2, serverID3);
-//         
-//         assertEquals(0, serverID4);
-//         
-//         assertEquals(1, serverID5);
-//      }
-//      finally
-//      {
-//         if (conn1 != null)
-//         {
-//            conn1.close();
-//         }
-//         
-//         if (conn2 != null)
-//         {
-//            conn2.close();
-//         }
-//         
-//         if (conn3 != null)
-//         {
-//            conn3.close();
-//         }
-//         
-//         if (conn4 != null)
-//         {
-//            conn4.close();
-//         }
-//         
-//         if (conn5 != null)
-//         {
-//            conn5.close();
-//         }
-//      }
-//      
-//   }
-// 
-//   /*
-//    * Test that the failover mapping is created correctly and updated properly when nodes leave
-//    * or join
-//    */
-//   public void testDefaultFailoverMap() throws Exception
-//   {     
-//      {
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//         
-//         //The order here depends on the order the servers were started in
-//         
-//         //If any servers get stopped and then started then the order will change
-//    
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         log.info("cf3 serverid=" + cf3.getServerId());
-//         
-//         
-//         assertEquals(0, cf1.getServerId());
-//         
-//         assertEquals(1, cf2.getServerId());
-//         
-//         assertEquals(2, cf3.getServerId());
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         assertEquals(3, delegates.length);
-//         
-//         assertEquals(3, failoverMap.size());
-//         
-//         // Default failover policy just chooses the node to the right
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-//      }
-//      
-//      //Now cleanly stop one of the servers
-//      
-//      
-//      log.info("************** STOPPING SERVER 0");
-//      ServerManagement.stop(0, true);
-//      
-//      log.info("server stopped");
-//      
-//      assertEquals(2, ServerManagement.getServer(1).getNumberOfNodesOnCluster());
-//      
-//      {         
-//         //Lookup another connection factory
-//         
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//         
-//         log.info("Got connection factory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         log.info("Got failover map");
-//         
-//         assertEquals(2, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         //Order here depends on order servers were started in
-//         
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         assertEquals(1, cf1.getServerId());
-//         
-//         assertEquals(2, cf2.getServerId());
-//         
-//         
-//         assertEquals(2, failoverMap.size());
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//      }
-//      
-//      //Cleanly stop another server
-//      
-//      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-//      
-//      ServerManagement.stop(1, true);
-//      
-//      assertEquals(1, ServerManagement.getServer(2).getNumberOfNodesOnCluster());
-//      
-//      {         
-//         //Lookup another connection factory
-//         
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         assertEquals(1, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         assertEquals(2, cf1.getServerId());
-//         
-//         
-//         assertEquals(1, failoverMap.size());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//      }
-//            
-//      //Restart server 0
-//      
-//      ServerManagement.start("all", 0);
-//      
-//      {
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//         
-//         log.info("Got connection factory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         log.info("Got failover map");
-//         
-//         assertEquals(2, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         assertEquals(2, cf1.getServerId());
-//         
-//         assertEquals(0, cf2.getServerId());
-//         
-//         
-//         assertEquals(2, failoverMap.size());
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//      }
-//      
-//      
-//      //Restart server 1
-//      
-//      ServerManagement.start("all", 1);
-//      
-//      {
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//         
-//         log.info("Got connection factory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         log.info("Got failover map");
-//         
-//         assertEquals(3, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//         
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         log.info("cf3 serverid=" + cf3.getServerId());
-//         
-//         assertEquals(2, cf1.getServerId());
-//         
-//         assertEquals(0, cf2.getServerId());
-//         
-//         assertEquals(1, cf3.getServerId());
-//         
-//         
-//         assertEquals(3, failoverMap.size());
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-//      }            
-//   }
+   public void testRoundRobinConnectionCreation() throws Exception
+   {
+      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+      
+      ClusteredClientConnectionFactoryDelegate delegate =
+         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+      
+      log.info ("number of delegates = " + delegate.getDelegates().length);
+      log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+      
+      assertEquals(3, delegate.getDelegates().length);
+      
+      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+      
+      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+      
+      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+      
+      assertEquals(0, cf1.getServerId());
+      
+      assertEquals(1, cf2.getServerId());
+      
+      assertEquals(2, cf3.getServerId());
+      
+      assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+      
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      
+      Connection conn4 = null;
+      
+      Connection conn5 = null;
+      
+      try
+      {         
+         conn1 = factory.createConnection();
+         
+         conn2 = factory.createConnection();
+         
+         conn3 = factory.createConnection();
+         
+         conn4 = factory.createConnection();
+         
+         conn5 = factory.createConnection();
+         
+         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+         
+         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+         
+         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+         
+         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+         
+         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+         
+         int serverID1 = state1.getServerID();
+         
+         int serverID2 = state2.getServerID();
+         
+         int serverID3 = state3.getServerID();
+         
+         int serverID4 = state4.getServerID();
+         
+         int serverID5 = state5.getServerID();
+         
+         log.info("server id 1: " + serverID1);
+         
+         log.info("server id 2: " + serverID2);
+         
+         log.info("server id 3: " + serverID3);
+         
+         log.info("server id 4: " + serverID4);
+         
+         log.info("server id 5: " + serverID5);
+         
+         assertEquals(0, serverID1);
+         
+         assertEquals(1, serverID2);
+         
+         assertEquals(2, serverID3);
+         
+         assertEquals(0, serverID4);
+         
+         assertEquals(1, serverID5);
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+         
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+         
+         if (conn4 != null)
+         {
+            conn4.close();
+         }
+         
+         if (conn5 != null)
+         {
+            conn5.close();
+         }
+      }
+      
+   }
+ 
+   /*
+    * Test that the failover mapping is created correctly and updated properly when nodes leave
+    * or join
+    */
+   public void testDefaultFailoverMap() throws Exception
+   {     
+      {
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+         
+         //The order here depends on the order the servers were started in
+         
+         //If any servers get stopped and then started then the order will change
+    
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         log.info("cf3 serverid=" + cf3.getServerId());
+         
+         
+         assertEquals(0, cf1.getServerId());
+         
+         assertEquals(1, cf2.getServerId());
+         
+         assertEquals(2, cf3.getServerId());
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         assertEquals(3, delegates.length);
+         
+         assertEquals(3, failoverMap.size());
+         
+         // Default failover policy just chooses the node to the right
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+      }
+      
+      //Now cleanly stop one of the servers
+      
+      
+      log.info("************** STOPPING SERVER 0");
+      ServerManagement.stop(0, true);
+      
+      log.info("server stopped");
+      
+      assertEquals(2, ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+      
+      {         
+         //Lookup another connection factory
+         
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+         
+         log.info("Got connection factory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         log.info("Got failover map");
+         
+         assertEquals(2, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         //Order here depends on order servers were started in
+         
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         assertEquals(1, cf1.getServerId());
+         
+         assertEquals(2, cf2.getServerId());
+         
+         
+         assertEquals(2, failoverMap.size());
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+      }
+      
+      //Cleanly stop another server
+      
+      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+      
+      ServerManagement.stop(1, true);
+      
+      assertEquals(1, ServerManagement.getServer(2).getNumberOfNodesOnCluster());
+      
+      {         
+         //Lookup another connection factory
+         
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         assertEquals(1, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         assertEquals(2, cf1.getServerId());
+         
+         
+         assertEquals(1, failoverMap.size());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+      }
+            
+      //Restart server 0
+      
+      ServerManagement.start("all", 0);
+      
+      {
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+         
+         log.info("Got connection factory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         log.info("Got failover map");
+         
+         assertEquals(2, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         assertEquals(2, cf1.getServerId());
+         
+         assertEquals(0, cf2.getServerId());
+         
+         
+         assertEquals(2, failoverMap.size());
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+      }
+      
+      
+      //Restart server 1
+      
+      ServerManagement.start("all", 1);
+      
+      {
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+         
+         log.info("Got connection factory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         log.info("Got failover map");
+         
+         assertEquals(3, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+         
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         log.info("cf3 serverid=" + cf3.getServerId());
+         
+         assertEquals(2, cf1.getServerId());
+         
+         assertEquals(0, cf2.getServerId());
+         
+         assertEquals(1, cf3.getServerId());
+         
+         
+         assertEquals(3, failoverMap.size());
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+      }            
+   }
    
    public void testSimpleFailover() throws Exception
    {
@@ -463,13 +463,13 @@
          int initialServerID = state.getServerID();
          
          assertEquals(0, initialServerID);
-         
-         
-         
+                           
          Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          MessageProducer prod = sess.createProducer(queue0);
          
+         MessageConsumer cons = sess.createConsumer(queue0);
+         
          final int NUM_MESSAGES = 100;
          
          for (int i = 0; i < NUM_MESSAGES; i++)
@@ -489,7 +489,7 @@
          
          log.info("killed server, now waiting");
          
-         Thread.sleep(30000);
+         Thread.sleep(25000);
          
          log.info("done wait");
          
@@ -505,10 +505,6 @@
          
          log.info("here 2");
          
-         //I should now be able to create a consumer on the same connection and consume the messages
-         
-         MessageConsumer cons = sess.createConsumer(queue0);
-         
          conn.start();
          
          log.info("here 3");
@@ -517,6 +513,8 @@
          {
             TextMessage tm = (TextMessage)cons.receive(1000);
             
+            log.info("message is " + tm);
+            
             assertNotNull(tm);
             
             assertEquals("message:" + i, tm.getText());
@@ -533,14 +531,51 @@
             }
             catch (Exception e)
             {
-               //Ignore
+               e.printStackTrace();
             }
          }
       }
       
    }
    
+   public void testEvenSimplerFailover() throws Exception
+   {
+      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+                  
+      Connection conn = null;
+      
+      try
+      {
+         conn = factory.createConnection();
+                           
+         log.info("************ KILLING (CRASHING) SERVER 0");
+         
+         ServerManagement.getServer(0).destroy();
+         
+         log.info("killed server, now waiting");
+         
+         Thread.sleep(25000);
+         
+         log.info("done wait");                
+      }
+      finally
+      {         
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+      
+   }
    
+   
 // public void testConnectionFactoryConnect() throws Exception
 // {
 // try

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -106,8 +106,6 @@
       ServerManagement.getServer(0).destroy();
    }
 
-
-
    public void testDistributedTopic() throws Exception
    {
       Connection conn = null;

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2006-12-09 23:28:59 UTC (rev 1751)
@@ -222,15 +222,10 @@
 
    public synchronized void destroy() throws Exception
    {
-      server.destroy();
-
-      registry.unbind(RMI_SERVER_PREFIX + index);
-      registry.unbind(NAMING_SERVER_PREFIX + index);
+      //Kill the server without doing any graceful shutdown
       
-      // Now shutdown the process. The registry will be taken out with the last RMI server standing
+      //For graceful shutdown use stop()
       
-      //TODO - we should shutdown cleanly - let main() exit - not kill the process
-      
       new Thread(new VMKiller(), "VM Killer").start();
    }
 




More information about the jboss-cvs-commits mailing list