[jboss-cvs] JBoss Messaging SVN: r5291 - in trunk: src/main/org/jboss/messaging/core/remoting and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 6 12:10:18 EST 2008


Author: timfox
Date: 2008-11-06 12:10:18 -0500 (Thu, 06 Nov 2008)
New Revision: 5291

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Cope with backup server dying


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -609,7 +609,7 @@
                            
                   sessions.add(session);
 
-                  ChannelHandler handler = new ClientSessionPacketHandler(session);
+                  ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
          
                   sessionChannel.setHandler(handler);
          

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -26,6 +26,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
 
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -43,10 +44,14 @@
    private static final Logger log = Logger.getLogger(ClientSessionPacketHandler.class);
 
    private final ClientSessionInternal clientSession;
+   
+   private final Channel channel;
 
-   public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion)
+   public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion, final Channel channel)
    {     
       this.clientSession = clientSesssion;
+      
+      this.channel = channel;
    }
       
    public void handlePacket(final Packet packet)
@@ -85,5 +90,7 @@
       {
          log.error("Failed to handle packet", e);
       }
+      
+      channel.confirm(packet);
    }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -56,23 +56,7 @@
    
    RemotingConnection getConnection();
    
-//   //debug only
-//   Queue<Command> getSentCommands();
-//   
-//   Queue<Command> getReceivedCommands();
-//   
-//   // For debug only
-//   static class Command
-//   {
-//      public final int commandID;
-//
-//      public final Packet packet;
-//
-//      public Command(final int commandID, final Packet packet)
-//      {
-//         this.commandID = commandID;
-//
-//         this.packet = packet;
-//      }
-//   }
+   void replicatingChannelDead();
+   
+   void confirm(Packet packet);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -51,4 +51,6 @@
    void activate();
    
    void freeze();
+   
+   RemotingConnection getReplicatingConnection();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -266,6 +266,11 @@
 
       this.replicatingConnection = replicatingConnection;
 
+      if (replicatingConnection != null)
+      {
+         replicatingConnection.addFailureListener(new ReplicatingConnectionFailureListener());
+      }
+
       this.active = active;
 
       this.pingPeriod = pingPeriod;
@@ -345,10 +350,14 @@
       return transportConnection.createBuffer(size);
    }
 
+   public RemotingConnection getReplicatingConnection()
+   {
+      return replicatingConnection;
+   }
+
    /*
     * This can be called concurrently by more than one thread so needs to be locked
     */
-
    public void fail(final MessagingException me)
    {
       synchronized (failLock)
@@ -853,31 +862,18 @@
 
          this.id = id;
 
-         if (connection.replicatingConnection != null)
+         if (connection.replicatingConnection != null && id != 0)
          {
-            // Don't want to send confirmations if replicating to backup
-            this.windowSize = -1;
-
-            this.confWindowSize = -1;
-
             // We don't redirect the ping channel
 
-            if (id != 0)
-            {
-               replicatingChannel = connection.replicatingConnection.getChannel(id, -1, false);
+            replicatingChannel = connection.replicatingConnection.getChannel(id, -1, false);
 
-               replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
-            }
+            replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
          }
-         else
-         {
-            this.windowSize = windowSize;
+         this.windowSize = windowSize;
 
-            this.confWindowSize = (int)(0.75 * windowSize);
+         this.confWindowSize = (int)(0.75 * windowSize);
 
-            replicatingChannel = null;
-         }
-
          if (this.windowSize != -1)
          {
             resendCache = new ConcurrentLinkedQueue<Packet>();
@@ -936,13 +932,13 @@
          synchronized (sendLock)
          {
             packet.setChannelID(id);
-   
+
             final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-   
+
             int size = packet.encode(buffer);
-   
+
             // Must block on semaphore outside the main lock or this can prevent failover from occurring
-            if (sendSemaphore != null)
+            if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
             {
                try
                {
@@ -953,9 +949,9 @@
                   throw new IllegalStateException("Semaphore interrupted");
                }
             }
-   
+
             lock.lock();
-   
+
             try
             {
                while (failingOver)
@@ -969,12 +965,12 @@
                   {
                   }
                }
-   
+
                if (resendCache != null && packet.isRequiresConfirmations())
                {
                   resendCache.add(packet);
                }
-   
+
                if (connection.active || packet.isWriteAlways())
                {
                   connection.transportConnection.write(buffer);
@@ -1088,21 +1084,19 @@
          }
       }
 
-      public DelayedResult replicatePacket(final Packet packet)
+      // Must be synchronized since can be called by incoming session commands but also by deliveries
+      // Also needs to be synchronized with respect to replicatingChannelDead
+      public synchronized DelayedResult replicatePacket(final Packet packet)
       {
          if (replicatingChannel != null)
          {
-            // Must be synchronized since can be called by incoming session commands but also by deliveries
-            synchronized (this)
-            {
-               DelayedResult result = new DelayedResult();
+            DelayedResult result = new DelayedResult();
 
-               responseActions.add(result);
+            responseActions.add(result);
 
-               replicatingChannel.send(packet);
+            replicatingChannel.send(packet);
 
-               return result;
-            }
+            return result;
          }
          else
          {
@@ -1110,6 +1104,28 @@
          }
       }
 
+      // The replicating connection has died (backup has died)
+      public synchronized void replicatingChannelDead()
+      {
+         replicatingChannel = null;
+
+         // Execute all the response actions now
+
+         while (true)
+         {
+            DelayedResult result = responseActions.poll();
+
+            if (result != null)
+            {
+               result.replicated();
+            }
+            else
+            {
+               break;
+            }
+         }
+      }
+
       public void replicateComplete()
       {
          if (!connection.active)
@@ -1125,16 +1141,31 @@
       }
 
       // This will never get called concurrently by more than one thread
+
+      // TODO it's not ideal synchronizing this since it forms a contention point with replication
+      // but we need to do this to protect it w.r.t. the check on replicatingChannel
       public void replicateResponseReceived()
       {
-         DelayedResult result = responseActions.poll();
+         DelayedResult result = null;
 
-         if (result == null)
+         synchronized (this)
          {
-            throw new IllegalStateException("Cannot find response action");
+            if (replicatingChannel != null)
+            {
+               result = responseActions.poll();
+
+               if (result == null)
+               {
+                  throw new IllegalStateException("Cannot find response action");
+               }
+            }
          }
 
-         result.replicated();
+         //Must execute outside of lock
+         if (result != null)
+         {
+            result.replicated();
+         }
       }
 
       public void setHandler(final ChannelHandler handler)
@@ -1278,7 +1309,7 @@
             {
                response = packet;
 
-               checkConfirmation(packet);
+               confirm(packet);
 
                lock.lock();
 
@@ -1293,14 +1324,8 @@
             }
             else if (handler != null)
             {
-               checkConfirmation(packet);
-
                handler.handlePacket(packet);
             }
-            else
-            {
-               checkConfirmation(packet);
-            }
          }
       }
 
@@ -1313,7 +1338,7 @@
          connection.transportConnection.write(buffer);
       }
 
-      private void checkConfirmation(final Packet packet)
+      public void confirm(final Packet packet)
       {
          if (resendCache != null && packet.isRequiresConfirmations())
          {
@@ -1351,7 +1376,6 @@
 
             if (packet == null)
             {
-               // report();
                throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
                                                " last received command id " +
                                                lastReceivedCommandID +
@@ -1367,7 +1391,10 @@
                                                connection.createdActive);
             }
 
-            sizeToFree += packet.getPacketSize();
+            if (packet.getType() != PACKETS_CONFIRMED)
+            {
+               sizeToFree += packet.getPacketSize();
+            }
          }
 
          firstStoredCommandID += numberToClear;
@@ -1461,4 +1488,18 @@
          }
       }
    }
+
+   private class ReplicatingConnectionFailureListener implements FailureListener
+   {
+      public void connectionFailed(final MessagingException me)
+      {
+         synchronized (RemotingConnectionImpl.this)
+         {
+            for (Channel channel : channels.values())
+            {
+               channel.replicatingChannelDead();
+            }
+         }
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -95,11 +95,6 @@
       return false;
    }
    
-   public boolean isWriteAlways()
-   {
-      return true;
-   }
-   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -224,6 +224,8 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
    
       channel.send(response);
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -453,6 +453,8 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
 
       channel.send(response);
    }
@@ -558,7 +560,9 @@
          }
       }
       
-      channel.send(response);
+      channel.confirm(packet);
+      
+      channel.send(response);            
    }
       
 
@@ -677,6 +681,8 @@
          }
       }
       
+      channel.confirm(packet);
+      
       channel.send(response);
    }
       
@@ -749,6 +755,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
    
@@ -816,6 +824,8 @@
          }
       }
       
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -849,9 +859,7 @@
    }
    
    public void doHandleCreateProducer(final SessionCreateProducerMessage packet)
-   {
-      SimpleString address = packet.getAddress();
-
+   {      
       int maxRate = packet.getMaxRate();
 
       boolean autoGroupID = packet.isAutoGroupId();
@@ -889,6 +897,8 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
 
       channel.send(response);
    }
@@ -961,6 +971,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       if (response != null)
       {
          channel.send(response);
@@ -1015,7 +1027,9 @@
       {
          tx = new TransactionImpl(storageManager, postOffice);
       }
-
+      
+      channel.confirm(packet);
+      
       channel.send(response);
    }
    
@@ -1064,6 +1078,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -1146,6 +1162,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
    
@@ -1239,6 +1257,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
    
@@ -1270,6 +1290,8 @@
 
       Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -1339,6 +1361,8 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1421,6 +1445,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
    
@@ -1503,6 +1529,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -1572,6 +1600,8 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1641,6 +1671,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -1726,6 +1758,8 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1755,6 +1789,8 @@
    {
       Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -1783,6 +1819,8 @@
    {
       Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -1811,6 +1849,8 @@
    {
       Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
    
@@ -1894,6 +1934,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
    
@@ -1952,6 +1994,8 @@
          }
       }
 
+      channel.confirm(packet);
+      
       channel.send(response);
    }
 
@@ -1984,10 +2028,11 @@
       //is being processed.
       //Otherwise we can end up with start/stop being processed in different order on backup to live.
       //Which can result in, say, a delivery arriving at backup, but it's still not started!
+      DelayedResult result = null;
       try
       {
-         channel.replicatePacket(packet);
- 
+         result = channel.replicatePacket(packet);
+          
          //note we process start before response is back from the backup
          setStarted(true);         
       }
@@ -1998,6 +2043,22 @@
             unlockConsumers();
          }
       }
+      
+      if (result == null)
+      {
+         channel.confirm(packet);
+      }
+      else
+      {
+         //Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               channel.confirm(packet);
+            }
+         });
+      }
    }
 
    //TODO try removing the lock consumers and see what happens!!
@@ -2022,6 +2083,7 @@
 
          if (result == null)
          {
+            channel.confirm(packet);
             // Not clustered - just send now
             channel.send(response);
          }
@@ -2031,6 +2093,8 @@
             {
                public void run()
                {
+                  channel.confirm(packet);
+                  
                   channel.send(response);
                }
             });
@@ -2109,6 +2173,8 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
 
       channel.send(response);
       
@@ -2135,54 +2201,9 @@
       
       ServerConsumer consumer = consumers.get(packet.getConsumerID());    
       
-      consumer.handleClose(packet);
-      
-//      DelayedResult result = channel.replicatePacket(packet);
-//      
-//      if (result == null)
-//      {
-//         doHandleCloseConsumer(packet);
-//      }
-//      else
-//      {
-//         //Don't process until result has come back from backup
-//         result.setResultRunner(new Runnable()
-//         {
-//            public void run()
-//            {
-//               doHandleCloseConsumer(packet);
-//            }
-//         });
-//      }
+      consumer.handleClose(packet);      
    }
 
-   public void doHandleCloseConsumer(final SessionConsumerCloseMessage packet)
-   {
-      Packet response = null;
-
-      try
-      {
-         consumers.get(packet.getConsumerID()).close();
-   
-         response = new NullResponseMessage();
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to close", e);
-   
-         if (e instanceof MessagingException)
-         {
-            response = new MessagingExceptionMessage((MessagingException)e);
-         }
-         else
-         {
-            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
-         }
-      }
-   
-      channel.send(response);
-   }
-
    public void handleCloseProducer(final SessionProducerCloseMessage packet)
    {
       DelayedResult result = channel.replicatePacket(packet);
@@ -2227,13 +2248,15 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
+      
+      channel.confirm(packet);
 
       channel.send(response);
    }
 
    public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
    {
-      channel.replicatePacket(packet);
+      DelayedResult result = channel.replicatePacket(packet);
 
       try
       {
@@ -2244,6 +2267,21 @@
       {
          log.error("Failed to receive credits", e);
       }
+      
+      if (result == null)
+      {
+         channel.confirm(packet);
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               channel.confirm(packet);
+            }
+         });
+      }
    }
 
    public void handleSendProducerMessage(final SessionSendMessage packet)
@@ -2327,6 +2365,8 @@
          }
       }
       
+      channel.confirm(packet);
+      
       if (response != null)
       {
          channel.send(response);
@@ -2409,6 +2449,8 @@
             }
          }
       }
+      
+      channel.confirm(packet);
 
       if (response != null)
       {
@@ -2493,6 +2535,8 @@
       {
          log.error("Failed to send management message", e);        
       }
+      
+      channel.confirm(packet);
    }
 
    public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java	2008-11-06 17:10:18 UTC (rev 5291)
@@ -0,0 +1,222 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A FailBackupServerTest
+ * 
+ * Make sure live sever continues ok if backup server fails
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 6 Nov 2008 11:27:17
+ *
+ *
+ */
+public class FailBackupServerTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(FailBackupServerTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testFailBackup() throws Exception
+   {
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams));
+
+      sf1.setSendWindowSize(32 * 1024);
+
+      ClientSession session1 = sf1.createSession(false, true, true, false);
+
+      session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session1.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+
+      session1.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(1000);
+
+         assertNotNull(message);
+
+         assertEquals("aardvarks", message.getBody().getString());
+
+         assertEquals(i, message.getProperty(new SimpleString("count")));
+
+         if (i == 0)
+         {
+            // Fail all the replicating connections - this simulates the backup server crashing
+
+            Set<RemotingConnection> conns = liveService.getServer().getRemotingService().getConnections();
+
+            for (RemotingConnection conn : conns)
+            {
+               log.info("Failing replicating connection");
+               conn.getReplicatingConnection().fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+            }
+         }
+
+         message.acknowledge();
+      }
+
+      ClientMessage message = consumer1.receive(1000);
+
+      assertNull(message);
+
+      // Send some more
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         message = session1.createClientMessage(JBossTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         message = consumer1.receive(1000);
+
+         assertNotNull(message);
+
+         assertEquals("aardvarks", message.getBody().getString());
+
+         assertEquals(i, message.getProperty(new SimpleString("count")));
+
+         message.acknowledge();
+      }
+
+      message = consumer1.receive(1000);
+
+      assertNull(message);
+
+      session1.close();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list