[jboss-cvs] JBoss Messaging SVN: r1913 - in trunk: . src/etc/server/default/deploy src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools/jmx/rmi

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jan 6 04:18:33 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-01-06 04:18:24 -0500 (Sat, 06 Jan 2007)
New Revision: 1913

Modified:
   trunk/messaging.ipr
   trunk/src/etc/server/default/deploy/multiplexer-service.xml
   trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
   trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
Added extra session failover tests (http://jira.jboss.org/jira/browse/JBMESSAGING-707)
and also fixed a few bugs in the process.

Modified: trunk/messaging.ipr
===================================================================
--- trunk/messaging.ipr	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/messaging.ipr	2007-01-06 09:18:24 UTC (rev 1913)
@@ -259,7 +259,9 @@
         <root url="jar://$PROJECT_DIR$/thirdparty/jgroups/lib/jgroups.jar!/" />
       </CLASSES>
       <JAVADOC />
-      <SOURCES />
+      <SOURCES>
+        <root url="file://C:/work/src/JGroups-2.4.1.src/src" />
+      </SOURCES>
     </library>
     <library name="junit">
       <CLASSES>

Modified: trunk/src/etc/server/default/deploy/multiplexer-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/multiplexer-service.xml	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/etc/server/default/deploy/multiplexer-service.xml	2007-01-06 09:18:24 UTC (rev 1913)
@@ -1,9 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE server
-        PUBLIC "-//JBoss//DTD MBean Service 3.2//EN"
-        "http://www.jboss.org/j2ee/dtd/jboss-service_3_2.dtd">
 
-
 <server>
 
 

Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -164,22 +164,29 @@
 
       // attempt to grab the reader's lock and go forward
 
+      boolean exempt = false;
+
       try
       {
-         boolean acquired = false;
+         exempt = isInvocationExempt(methodName);
 
-         while(!acquired)
+         if (!exempt)
          {
-            try
+            boolean acquired = false;
+
+            while(!acquired)
             {
-               acquired = readLock.attempt(500);
+               try
+               {
+                  acquired = readLock.attempt(500);
+               }
+               catch(InterruptedException e)
+               {
+                  // OK
+               }
+
+               if (trace && !acquired ) { log.trace(methodName + "() trying to pass through " + this); }
             }
-            catch(InterruptedException e)
-            {
-               // OK
-            }
-
-            if (trace && !acquired ) { log.trace(methodName + "() trying to pass through " + this); }
          }
 
          synchronized(this)
@@ -191,13 +198,17 @@
             }
          }
 
-         if(trace) { log.trace(this + " has let " + methodName + "() pass through"); }
+         if (trace) { log.trace(this + " allowed " + (exempt ? "exempt" : "") + " method " + methodName + "() to pass through"); }
 
          return invocation.invokeNext();
       }
       finally
       {
-         readLock.release();
+         if (!exempt)
+         {
+            readLock.release();
+         }
+
          synchronized(this)
          {
             activeThreadsCount--;
@@ -206,6 +217,7 @@
                activeMethods.remove(methodName);
             }
          }
+
       }
    }
 
@@ -225,5 +237,10 @@
 
    // Private --------------------------------------------------------------------------------------
 
+   private boolean isInvocationExempt(String methodName)
+   {
+      return "recoverDeliveries".equals(methodName);
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -238,9 +238,9 @@
       if (!isTransacted() || (isXA() && getCurrentTxId() == null))
       {
          // Non transacted session or an XA session with no transaction set (it falls back
-         // to auto_ack)
+         // to AUTO_ACKNOWLEDGE)
 
-         log.debug(this + " is not transacted (or XA with no tx set), " +
+         log.debug(this + " is not transacted (or XA with no transaction set), " +
             "retrieving deliveries from session state");
 
          // We remove any unacked non-persistent messages - this is because we don't want to ack
@@ -284,31 +284,27 @@
       else
       {
          // Transacted session - we need to get the acks from the resource manager. BTW we have
-         // kept the old resource manager
+         // kept the old resource manager.
 
          ackInfos = rm.getDeliveriesForSession(getSessionID());
       }
 
       if (!ackInfos.isEmpty())
       {
-         SessionDelegate nd = (SessionDelegate)getDelegate();
-
          List recoveryInfos = new ArrayList();
-
          for (Iterator i = ackInfos.iterator(); i.hasNext(); )
          {
-            DeliveryInfo info = (DeliveryInfo)i.next();
-
+            DeliveryInfo del = (DeliveryInfo)i.next();
             DeliveryRecovery recInfo =
-               new DeliveryRecovery(info.getMessageProxy().getDeliveryId(),
-                                    info.getMessageProxy().getMessage().getMessageID(),
-                                    info.getChannelId());
+               new DeliveryRecovery(del.getMessageProxy().getDeliveryId(),
+                                    del.getMessageProxy().getMessage().getMessageID(),
+                                    del.getChannelId());
 
             recoveryInfos.add(recInfo);
          }
 
          log.debug(this + " sending delivery recovery " + recoveryInfos + " on failover");
-         nd.recoverDeliveries(recoveryInfos);
+         newDelegate.recoverDeliveries(recoveryInfos);
       }
       else
       {

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -835,7 +835,7 @@
       // This node may be failing over for another node - in which case we must wait for that to be
       // complete.
       
-      log.info(this + " waiting for server-side failover for failed node " + failedNodeID + " to complete");
+      log.debug(this + " waiting for server-side failover for failed node " + failedNodeID + " to complete");
       
       Replicator replicator = getReplicator();
 
@@ -863,12 +863,12 @@
                   
                   if (status.isFailedOverForNode(failedNodeID))
                   {
-                     log.info(this + ": failover is complete on node " + nid);
+                     log.debug(this + ": failover is complete on node " + nid);
                      return nid.intValue();
                   }
                   else if (status.isFailingOverForNode(failedNodeID))
                   {
-                     log.info(this + ": fail over is in progress on node " + nid);
+                     log.debug(this + ": fail over is in progress on node " + nid);
                      
                      // A server has started failing over for the failed node, but not completed.
                      // If it's not this node then we immediately return so the connection can be
@@ -883,7 +883,7 @@
                      if (completeToWait <= 0)
                      {
                         // Give up now
-                        log.info(this + " already waited long enough for failover to complete, giving up");
+                        log.debug(this + " already waited long enough for failover to complete, giving up");
                         return -1;
                      }
                      
@@ -919,7 +919,7 @@
                if (startToWait <= 0)
                {
                   // Don't want to wait again
-                  log.info(this + " already waited long enough for failover to start, giving up");
+                  log.debug(this + " already waited long enough for failover to start, giving up");
                   return -1;
                }
                

Modified: trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -39,56 +39,74 @@
  *
  */
 public class DeliveryRecovery implements Streamable
-{   
-   private long deliveryId;
-   
-   private long messageId;
-   
-   private long channelId;
-   
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private long deliveryID;
+   private long messageID;
+   private long channelID;
+
+   // Constructors ---------------------------------------------------------------------------------
+
    public DeliveryRecovery()
-   {      
+   {
    }
-   
-   public DeliveryRecovery(long deliveryId, long messageId, long channelId)
+
+   public DeliveryRecovery(long deliveryID, long messageID, long channelID)
    {
-      this.deliveryId = deliveryId;
-      
-      this.messageId = messageId;
-      
-      this.channelId = channelId;
+      this.deliveryID = deliveryID;
+      this.messageID = messageID;
+      this.channelID = channelID;
    }
-   
-   public long getDeliveryId()
+
+   // Streamable implementation --------------------------------------------------------------------
+
+   public void read(DataInputStream in) throws Exception
    {
-      return deliveryId;
+      deliveryID = in.readLong();
+      messageID = in.readLong();
+      channelID = in.readLong();
    }
-   
-   public long getMessageId()
+
+   public void write(DataOutputStream out) throws Exception
    {
-      return messageId;
+      out.writeLong(deliveryID);
+      out.writeLong(messageID);
+      out.writeLong(channelID);
    }
-   
-   public long getChannelId()
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public long getDeliveryID()
    {
-      return channelId;
+      return deliveryID;
    }
 
-   public void read(DataInputStream in) throws Exception
+   public long getMessageID()
    {
-      deliveryId = in.readLong();
-      
-      messageId = in.readLong();
-      
-      channelId = in.readLong();
+      return messageID;
    }
 
-   public void write(DataOutputStream out) throws Exception
+   public long getChannelID()
    {
-      out.writeLong(deliveryId);
-      
-      out.writeLong(messageId);
-      
-      out.writeLong(channelId);
+      return channelID;
    }
+
+   public String toString()
+   {
+      return "DeliveryRecovery[ID=" + deliveryID + ", MID=" + messageID + ", CID=" + channelID + "]";
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -411,7 +411,7 @@
          {
             DeliveryRecovery deliveryInfo = (DeliveryRecovery)iter.next();
                 
-            Long channelId = new Long(deliveryInfo.getChannelId());
+            Long channelId = new Long(deliveryInfo.getChannelID());
             
             List acks = (List)ackMap.get(channelId);
             
@@ -449,7 +449,7 @@
             {
                DeliveryRecovery info = (DeliveryRecovery)iter2.next();
                
-               ids.add(new Long(info.getMessageId()));
+               ids.add(new Long(info.getMessageID()));
             }
             
             Queue queue = binding.getQueue();
@@ -466,7 +466,7 @@
                
                DeliveryRecovery info = (DeliveryRecovery)iter3.next();
                
-               long deliveryId = info.getDeliveryId();
+               long deliveryId = info.getDeliveryID();
                
                maxDeliveryId = Math.max(maxDeliveryId, deliveryId);
                
@@ -849,7 +849,7 @@
    
    void acknowledgeTransactionally(List acks, Transaction tx) throws Throwable
    {
-      if (trace) { log.trace(this + " acknowledging transactionally " + acks.size() + " for tx: " + tx); }
+      if (trace) { log.trace(this + " acknowledging transactionally " + acks.size() + " messages for " + tx); }
       
       DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getCallback(this);
       
@@ -859,14 +859,10 @@
          tx.addCallback(deliveryCallback, this);
       }
             
-      Iterator iter = acks.iterator();
-      
-      while (iter.hasNext())
+      for(Iterator i = acks.iterator(); i.hasNext(); )
       {
-         Ack ack = (Ack)iter.next();
-         
+         Ack ack = (Ack)i.next();
          Long id = new Long(ack.getDeliveryID());
-           
          DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
          
          if (rec == null)
@@ -875,7 +871,6 @@
          }
                            
          deliveryCallback.addDeliveryId(id);
-         
          rec.del.acknowledge(tx);
       }      
    }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -126,7 +126,8 @@
    void send(JBossMessage message) throws JMSException;
    
    /**
-    * Send delivery info to the server so the delivery lists can be repopulated used at failover
+    * Send delivery info to the server so the delivery lists can be repopulated. Used only in
+    * failover.
     */
    void recoverDeliveries(List createInfos) throws JMSException;
 }

Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -35,6 +35,7 @@
 import org.jboss.jms.server.endpoint.DefaultAck;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.messaging.core.message.MessageFactory;
+import org.jboss.logging.Logger;
 
 /**
  * Holds the state of a transaction on the client side
@@ -42,47 +43,47 @@
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
  */
 public class ClientTransaction
-{  
+{
    // Constants -----------------------------------------------------
-   
+
+   private static final Logger log = Logger.getLogger(ClientTransaction.class);
+
    public final static byte TX_OPEN = 0;
-   
    public final static byte TX_ENDED = 1;
-   
    public final static byte TX_PREPARED = 2;
-   
    public final static byte TX_COMMITED = 3;
-   
    public final static byte TX_ROLLEDBACK = 4;
-   
+
+   private static boolean trace = log.isTraceEnabled();
+
    // Attributes ----------------------------------------------------
-   
+
    private byte state = TX_OPEN;
-   
-   //Maintained on the client side
+
+   // Map<Integer(sessionID) - SessionTxState> maintained on the client side
    private Map sessionStatesMap;
-   
-   //Read from on the server side
+
+   // Read from on the server side
    private List sessionStatesList;
-   
+
    private boolean clientSide;
-   
+
    // Static --------------------------------------------------------
-   
+
    // Constructors --------------------------------------------------
-   
+
    public ClientTransaction()
-   {                  
+   {
       clientSide = true;
    }
-   
+
    // Public --------------------------------------------------------
-   
+
    public byte getState()
    {
       return state;
    }
-   
+
    public void addMessage(int sessionId, JBossMessage msg)
    {
       if (!clientSide)
@@ -90,10 +91,10 @@
          throw new IllegalStateException("Cannot call this method on the server side");
       }
       SessionTxState sessionTxState = getSessionTxState(sessionId);
-      
+
       sessionTxState.addMessage(msg);
    }
-   
+
    public void addAck(int sessionId, DeliveryInfo info)
    {
       if (!clientSide)
@@ -101,32 +102,29 @@
          throw new IllegalStateException("Cannot call this method on the server side");
       }
       SessionTxState sessionTxState = getSessionTxState(sessionId);
-      
+
       sessionTxState.addAck(info);
-   }      
-   
+   }
+
    public void clearMessages()
    {
       if (!clientSide)
       {
          throw new IllegalStateException("Cannot call this method on the server side");
       }
-      
+
       if (sessionStatesMap != null)
       {
-         //This can be null if the tx was recreated on the client side due to recovery
-         
-         Iterator iter = sessionStatesMap.values().iterator();
-         
-         while (iter.hasNext())
+         // This can be null if the tx was recreated on the client side due to recovery
+
+         for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
          {
-            SessionTxState sessionTxState = (SessionTxState)iter.next();
-            
+            SessionTxState sessionTxState = (SessionTxState)i.next();
             sessionTxState.clearMessages();
          }
       }
    }
-   
+
    public void setState(byte state)
    {
       if (!clientSide)
@@ -135,7 +133,7 @@
       }
       this.state = state;
    }
-   
+
    public List getSessionStates()
    {
       if (sessionStatesList != null)
@@ -144,15 +142,16 @@
       }
       else
       {
-         return sessionStatesMap == null ? Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+         return sessionStatesMap == null ?
+            Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
       }
-   }   
-   
+   }
+
    /*
-    * Substitute newSessionID for oldSessionID
-    */
+   * Substitute newSessionID for oldSessionID
+   */
    public void handleFailover(int newServerID, int oldSessionID, int newSessionID)
-   {    
+   {
       if (!clientSide)
       {
          throw new IllegalStateException("Cannot call this method on the server side");
@@ -160,15 +159,30 @@
 
       // Note we have to do this in one go since there may be overlap between old and new session
       // IDs and we don't want to overwrite keys in the map.
-      
+
+      Map tmpMap = null;
+
       if (sessionStatesMap != null)
-      {                          
+      {
          for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
          {
+
             SessionTxState state = (SessionTxState)i.next();
             state.handleFailover(newServerID, oldSessionID, newSessionID);
+
+            if (tmpMap == null)
+            {
+               tmpMap = new LinkedHashMap();
+            }
+            tmpMap.put(new Integer(newSessionID), state);
          }
       }
+
+      if (tmpMap != null)
+      {
+         // swap
+         sessionStatesMap = tmpMap;
+      }
    }
 
    /**
@@ -182,7 +196,7 @@
       }
 
       SessionTxState state = getSessionTxState(sessionID);
-      
+
       if (state != null)
       {
          return state.getAcks();
@@ -192,9 +206,9 @@
          return Collections.EMPTY_LIST;
       }
    }
-   
+
    // Streamable implementation ---------------------------------
-   
+
    public void write(DataOutputStream out) throws Exception
    {
       out.writeByte(state);
@@ -246,86 +260,84 @@
          }
       }
    }
-    
-   
+
+
    public void read(DataInputStream in) throws Exception
    {
       clientSide = false;
-      
+
       state = in.readByte();
-      
+
       int numSessions = in.readInt();
-      
+
       //Read in as a list since we don't want the extra overhead of putting into a map
       //which won't be used on the server side
       sessionStatesList = new ArrayList(numSessions);
-      
+
       for (int i = 0; i < numSessions; i++)
       {
          int sessionId = in.readInt();
-         
+
          SessionTxState sessionState = new SessionTxState(sessionId);
-         
+
          sessionStatesList.add(sessionState);
-         
+
          int numMsgs = in.readInt();
-         
-         for (int j = 0; j < numMsgs; j++)            
+
+         for (int j = 0; j < numMsgs; j++)
          {
             byte type = in.readByte();
-            
+
             JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
-            
+
             msg.read(in);
-            
+
             sessionState.addMessage(msg);
          }
-         
+
          int numAcks = in.readInt();
-         
-         for (int j = 0; j < numAcks; j++)            
+
+         for (int j = 0; j < numAcks; j++)
          {
             long ack = in.readLong();
-            
+
             sessionState.addAck(new DefaultAck(ack));
          }
-      }      
+      }
    }
-   
+
    // Protected -----------------------------------------------------
-   
+
    // Package Private -----------------------------------------------
-   
+
    // Private -------------------------------------------------------
-   
-   private SessionTxState getSessionTxState(int sessionId)
-   {                  
+
+   private SessionTxState getSessionTxState(int sessionID)
+   {
       if (sessionStatesMap == null)
       {
          sessionStatesMap = new LinkedHashMap();
       }
-      
-      SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(new Integer(sessionId));
-      
+
+      SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
+
       if (sessionTxState == null)
       {
-         sessionTxState = new SessionTxState(sessionId);
-         
-         sessionStatesMap.put(new Integer(sessionId), sessionTxState);
+         sessionTxState = new SessionTxState(sessionID);
+         sessionStatesMap.put(new Integer(sessionID), sessionTxState);
       }
-      
+
       return sessionTxState;
    }
-         
-      
+
    // Inner Classes -------------------------------------------------
-   
+
    public class SessionTxState
    {
       private int sessionID;
 
-      // We record the server id when doing failover to avoid overwriting the sesion ID again
-      // if multiple connections fail on the same resource mamanger but fail onto old values of the
+      // We record the server id when doing failover to avoid overwriting the sesion ID again if
+      // multiple connections fail on the same resource mamanger but fail onto old values of the
       // session ID. This prevents the ID being failed over more than once for the same server.
       private int serverID = -1;
 
@@ -336,58 +348,58 @@
       {
          this.sessionID = sessionID;
       }
-      
+
       void addMessage(JBossMessage msg)
       {
          msgs.add(msg);
       }
-      
+
       void addAck(Ack ack)
       {
          acks.add(ack);
       }
-      
+
       public List getMsgs()
       {
          return msgs;
       }
-      
+
       public List getAcks()
       {
          return acks;
       }
-      
+
       public int getSessionId()
       {
          return sessionID;
       }
-      
+
       void handleFailover(int newServerID, int oldSessionID, int newSessionID)
       {
          if (sessionID == oldSessionID && serverID != newServerID)
-         {            
+         {
             sessionID = newSessionID;
             serverID = newServerID;
-            
+
             // Remove any non persistent acks
             for(Iterator i = acks.iterator(); i.hasNext(); )
             {
                DeliveryInfo di = (DeliveryInfo)i.next();
-               
+
                if (!di.getMessageProxy().getMessage().isReliable())
                {
+                  if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
                   i.remove();
                }
             }
          }
+      }
 
-      }
-      
       void clearMessages()
       {
          msgs.clear();
       }
-      
+
    }
-      
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -119,7 +119,7 @@
 
    public void acknowledge(Transaction tx) throws Throwable
    {        
-      if (trace) { log.trace(this + " acknowledging delivery in tx:" + tx); }
+      if (trace) { log.trace(this + " acknowledging delivery in " + tx); }
       
       observer.acknowledge(this, tx);
 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -479,7 +479,6 @@
          {
             binding = super.getBindingforChannelId(channelId);
          }
-         log.info("Returned " + binding);
          return binding;
       }
       finally
@@ -2144,7 +2143,7 @@
                }
                else
                {
-                  log.info(this + " has already a " + queueName + " queue so adding to failed map");
+                  log.debug(this + " has already a " + queueName + " queue so adding to failed map");
                }
 
                // Create a new binding

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -31,8 +31,8 @@
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  * @version <tt>$Revision$</tt>
- *          <p/>
- *          $Id$
+ * 
+ * $Id$
  */
 public class MultiplexorJChannelFactory implements JChannelFactory
 {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -1047,11 +1047,275 @@
       }
    }
 
-   // TODO add a test whose session has NON-TRANSACTED, PERSISTENT and NON-PERSISTENT ACKS.
+   public void testSessionWithAcknowledgmentsFailover() throws Exception
+   {
+      Connection conn = null;
 
-   // TODO add a test whose session has TRANSACTED, PERSISTENT and NON-PERSISTENT ACKS.
+      try
+      {
+         // skip connection to node 0
+         conn = cf.createConnection();
+         conn.close();
 
+         // create a connection to node 1
+         conn = cf.createConnection();
 
+         conn.start();
+
+         assertEquals(1, ((JBossConnection)conn).getServerID());
+
+         Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         // send 2 messages (one persistent and one non-persistent)
+
+         MessageProducer prod = session.createProducer(queue[1]);
+
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod.send(session.createTextMessage("clik-persistent"));
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.send(session.createTextMessage("clak-non-persistent"));
+
+         // close the producer
+         prod.close();
+
+         // create a consumer and receive messages, but don't acknowledge
+
+         MessageConsumer cons = session.createConsumer(queue[1]);
+         TextMessage clik = (TextMessage)cons.receive(2000);
+         assertEquals("clik-persistent", clik.getText());
+         TextMessage clak = (TextMessage)cons.receive(2000);
+         assertEquals("clak-non-persistent", clak.getText());
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+
+         assertEquals(0, ((JBossConnection)conn).getServerID());
+
+         // acknowledge the messages
+         clik.acknowledge();
+         clak.acknowledge();
+
+         // make sure no messages are left in the queue
+         Message m = cons.receive(1000);
+         assertNull(m);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testTransactedSessionWithAcknowledgmentsCommitOnFailover() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         // skip connection to node 0
+         conn = cf.createConnection();
+         conn.close();
+
+         // create a connection to node 1
+         conn = cf.createConnection();
+
+         conn.start();
+
+         assertEquals(1, ((JBossConnection)conn).getServerID());
+
+         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         // send 2 messages (one persistent and one non-persistent)
+
+         MessageProducer prod = session.createProducer(queue[1]);
+
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod.send(session.createTextMessage("clik-persistent"));
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.send(session.createTextMessage("clak-non-persistent"));
+
+         session.commit();
+
+         // close the producer
+         prod.close();
+
+         // create a consumer and receive messages, but don't acknowledge
+
+         MessageConsumer cons = session.createConsumer(queue[1]);
+         TextMessage clik = (TextMessage)cons.receive(2000);
+         assertEquals("clik-persistent", clik.getText());
+         TextMessage clak = (TextMessage)cons.receive(2000);
+         assertEquals("clak-non-persistent", clak.getText());
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+
+         assertEquals(0, ((JBossConnection)conn).getServerID());
+
+         // acknowledge the messages
+         session.commit();
+
+         // make sure no messages are left in the queue
+         Message m = cons.receive(1000);
+         assertNull(m);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testTransactedSessionWithAcknowledgmentsRollbackOnFailover() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         // skip connection to node 0
+         conn = cf.createConnection();
+         conn.close();
+
+         // create a connection to node 1
+         conn = cf.createConnection();
+
+         conn.start();
+
+         assertEquals(1, ((JBossConnection)conn).getServerID());
+
+         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         // send 2 messages (one persistent and one non-persistent)
+
+         MessageProducer prod = session.createProducer(queue[1]);
+
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod.send(session.createTextMessage("clik-persistent"));
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.send(session.createTextMessage("clak-non-persistent"));
+
+         session.commit();
+
+         // close the producer
+         prod.close();
+
+         // create a consumer and receive messages, but don't acknowledge
+
+         MessageConsumer cons = session.createConsumer(queue[1]);
+         TextMessage clik = (TextMessage)cons.receive(2000);
+         assertEquals("clik-persistent", clik.getText());
+         TextMessage clak = (TextMessage)cons.receive(2000);
+         assertEquals("clak-non-persistent", clak.getText());
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+
+         assertEquals(0, ((JBossConnection)conn).getServerID());
+
+         session.rollback();
+
+         TextMessage m = (TextMessage)cons.receive(2000);
+         assertNotNull(m);
+         assertEquals("clik-persistent", m.getText());
+
+         m = (TextMessage)cons.receive(2000);
+         assertNull(m);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+
    public void testFailoverListener() throws Exception
    {
       Connection conn = null;

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-01-06 09:18:24 UTC (rev 1913)
@@ -317,7 +317,7 @@
                log.info("Could find multiplexer config");
             }
 
-            MBeanConfigurationElement multiplexerConfig = (MBeanConfigurationElement) services.iterator().next();
+            MBeanConfigurationElement multiplexerConfig = (MBeanConfigurationElement)services.iterator().next();
             ObjectName nameMultiplexer = sc.registerAndConfigureService(multiplexerConfig);
             sc.invoke(nameMultiplexer,"create", new Object[0], new String[0]);
             sc.invoke(nameMultiplexer,"start", new Object[0], new String[0]);




More information about the jboss-cvs-commits mailing list