[hornetq-commits] JBoss hornetq SVN: r8651 - in trunk: examples/core/perf/server0 and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 9 15:29:24 EST 2009


Author: timfox
Date: 2009-12-09 15:29:24 -0500 (Wed, 09 Dec 2009)
New Revision: 8651

Modified:
   trunk/build-hornetq.xml
   trunk/examples/core/perf/server0/hornetq-configuration.xml
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/remoting/Channel.java
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/ServerConsumer.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
fixed re-attach ordering issue

Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/build-hornetq.xml	2009-12-09 20:29:24 UTC (rev 8651)
@@ -1373,8 +1373,7 @@
                     haltonerror="${junit.batchtest.haltonerror}">
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
             <fileset dir="${test.classes.dir}">
-               <include name="**/org/hornetq/tests/stress/**/*${test-mask}.class"/>
-               <exclude name="**/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.class"/>
+               <include name="**/org/hornetq/tests/stress/**/*${test-mask}.class"/>               
             </fileset>
          </batchtest>
       </junit>

Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml	2009-12-09 20:29:24 UTC (rev 8651)
@@ -17,7 +17,7 @@
    
    <persistence-enabled>true</persistence-enabled>
 
-   <journal-sync-non-transactional>true</journal-sync-non-transactional>
+   <journal-sync-non-transactional>false</journal-sync-non-transactional>
    <journal-sync-transactional>true</journal-sync-transactional>
    <journal-type>ASYNCIO</journal-type>
    <journal-min-files>20</journal-min-files>

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -846,7 +846,6 @@
       // We lock the channel to prevent any packets to be added to the resend
       // cache during the failover process
       channel.lock();
-
       try
       {
          channel.transferConnection(backupConnection);
@@ -854,9 +853,11 @@
          backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
          remotingConnection = backupConnection;
+         
+         int lcid = channel.getLastConfirmedCommandID();
+         
+         Packet request = new ReattachSessionMessage(name, lcid);
 
-         Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
-
          Channel channel1 = backupConnection.getChannel(1, -1);
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
@@ -865,10 +866,11 @@
          {
             // The session was found on the server - we reattached transparently ok
 
-            channel.replayCommands(response.getLastReceivedCommandID(), channel.getID());
+            channel.replayCommands(response.getLastConfirmedCommandID(), channel.getID());                        
          }
          else
          {
+            
             // The session wasn't found on the server - probably we're failing over onto a backup server where the
             // session won't exist or the target server has been restarted - in this case the session will need to be
             // recreated,
@@ -994,6 +996,7 @@
             channel.returnBlocking();
          }
 
+         channel.setTransferring(false);         
       }
       catch (Throwable t)
       {

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -899,26 +899,20 @@
          Connection tc = null;
 
          try
-         {
-            if (connector == null)
-            {
-               DelegatingBufferHandler handler = new DelegatingBufferHandler();
+         {            
+            DelegatingBufferHandler handler = new DelegatingBufferHandler();
 
-               connector = connectorFactory.createConnector(transportParams,
-                                                            handler,
-                                                            this,
-                                                            closeExecutor,
-                                                            threadPool,
-                                                            scheduledThreadPool);
+            connector = connectorFactory.createConnector(transportParams,
+                                                         handler,
+                                                         this,
+                                                         closeExecutor,
+                                                         threadPool,
+                                                         scheduledThreadPool);
 
-               if (connector != null)
-               {
-                  connector.start();
-               }
-            }
-
             if (connector != null)
             {
+               connector.start();
+            
                tc = connector.createConnection();
 
                if (tc == null)

Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -39,9 +39,9 @@
 
    void transferConnection(RemotingConnection newConnection);
 
-   void replayCommands(int lastReceivedCommandID, final long newID);
+   void replayCommands(int lastConfirmedCommandID, final long newID);
 
-   int getLastReceivedCommandID();
+   int getLastConfirmedCommandID();
 
    void lock();
 
@@ -64,4 +64,6 @@
    void clearCommands();
 
    int getConfirmationWindowSize();
+   
+   void setTransferring(boolean transferring);
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -50,7 +50,7 @@
 
    private volatile int firstStoredCommandID;
 
-   private volatile int lastReceivedCommandID = -1;
+   private volatile int lastConfirmedCommandID = -1;
 
    private volatile RemotingConnection connection;
 
@@ -73,7 +73,9 @@
    private int receivedBytes;
 
    private CommandConfirmationHandler commandConfirmationHandler;
-
+      
+   private volatile boolean transferring;
+ 
    public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
    {
       this.connection = connection;
@@ -97,9 +99,9 @@
       return id;
    }
 
-   public int getLastReceivedCommandID()
+   public int getLastConfirmedCommandID()
    {
-      return lastReceivedCommandID;
+      return lastConfirmedCommandID;
    }
 
    public Lock getLock()
@@ -140,6 +142,11 @@
    {
       send(packet, false);
    }
+   
+   public void setTransferring(boolean transferring)
+   {
+      this.transferring = transferring;
+   }
 
    // This must never called by more than one thread concurrently
    public void send(final Packet packet, final boolean flush)
@@ -147,11 +154,11 @@
       synchronized (sendLock)
       {
          packet.setChannelID(id);
-
+         
          final HornetQBuffer buffer = packet.encode(connection);
 
          lock.lock();
-
+                  
          try
          {
             while (failingOver)
@@ -165,6 +172,13 @@
                {
                }
             }
+            
+            //Sanity check
+            if (transferring)
+            {
+               throw new IllegalStateException("Cannot send a packet while channel is doing failover");
+            }
+            
 
             if (resendCache != null && packet.isRequiresConfirmations())
             {
@@ -197,7 +211,7 @@
       synchronized (sendBlockingLock)
       {
          packet.setChannelID(id);
-
+         
          final HornetQBuffer buffer = packet.encode(connection);
 
          lock.lock();
@@ -306,7 +320,7 @@
 
       closed = true;
    }
-
+   
    public void transferConnection(final RemotingConnection newConnection)
    {
       // Needs to synchronize on the connection to make sure no packets from
@@ -322,19 +336,21 @@
          rnewConnection.putChannel(id, this);
 
          connection = rnewConnection;
+
+         transferring = true;
       }
    }
 
-   public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
+   public void replayCommands(final int otherLastConfirmedCommandID, final long newChannelID)
    {
       if (resendCache != null)
       {
-         clearUpTo(otherLastReceivedCommandID);
+         clearUpTo(otherLastConfirmedCommandID);
 
          for (final Packet packet : resendCache)
          {
             packet.setChannelID(newChannelID);
-
+            
             doWrite(packet);
          }
       }
@@ -372,7 +388,7 @@
       {
          receivedBytes = 0;
 
-         final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+         final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
 
          confirmed.setChannelID(id);
 
@@ -384,7 +400,7 @@
    {
       if (resendCache != null && packet.isRequiresConfirmations())
       {
-         lastReceivedCommandID++;
+         lastConfirmedCommandID++;
 
          receivedBytes += packet.getPacketSize();
 
@@ -392,7 +408,7 @@
          {
             receivedBytes = 0;
 
-            final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+            final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
 
             confirmed.setChannelID(id);
 
@@ -405,14 +421,14 @@
    {
       if (resendCache != null)
       {
-         lastReceivedCommandID = -1;
+         lastConfirmedCommandID = -1;
 
          firstStoredCommandID = 0;
 
          resendCache.clear();
       }
    }
-
+   
    public void handlePacket(final Packet packet)
    {
       if (packet.getType() == PacketImpl.PACKETS_CONFIRMED)

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -30,19 +30,19 @@
 
    private String name;
 
-   private int lastReceivedCommandID;
+   private int lastConfirmedCommandID;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ReattachSessionMessage(final String name, final int lastReceivedCommandID)
+   public ReattachSessionMessage(final String name, final int lastConfirmedCommandID)
    {
       super(PacketImpl.REATTACH_SESSION);
 
       this.name = name;
 
-      this.lastReceivedCommandID = lastReceivedCommandID;
+      this.lastConfirmedCommandID = lastConfirmedCommandID;
    }
 
    public ReattachSessionMessage()
@@ -57,23 +57,23 @@
       return name;
    }
 
-   public int getLastReceivedCommandID()
+   public int getLastConfirmedCommandID()
    {
-      return lastReceivedCommandID;
+      return lastConfirmedCommandID;
    }
 
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeString(name);
-      buffer.writeInt(lastReceivedCommandID);
+      buffer.writeInt(lastConfirmedCommandID);
    }
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)
    {
       name = buffer.readString();
-      lastReceivedCommandID = buffer.readInt();
+      lastConfirmedCommandID = buffer.readInt();
    }
 
    @Override

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -28,7 +28,7 @@
 
    // Attributes ----------------------------------------------------
 
-   private int lastReceivedCommandID;
+   private int lastConfirmedCommandID;
 
    private boolean reattached;
 
@@ -36,11 +36,11 @@
 
    // Constructors --------------------------------------------------
 
-   public ReattachSessionResponseMessage(final int lastReceivedCommandID, final boolean reattached)
+   public ReattachSessionResponseMessage(final int lastConfirmedCommandID, final boolean reattached)
    {
       super(PacketImpl.REATTACH_SESSION_RESP);
 
-      this.lastReceivedCommandID = lastReceivedCommandID;
+      this.lastConfirmedCommandID = lastConfirmedCommandID;
 
       this.reattached = reattached;
    }
@@ -52,9 +52,9 @@
 
    // Public --------------------------------------------------------
 
-   public int getLastReceivedCommandID()
+   public int getLastConfirmedCommandID()
    {
-      return lastReceivedCommandID;
+      return lastConfirmedCommandID;
    }
 
    public boolean isReattached()
@@ -65,14 +65,14 @@
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
-      buffer.writeInt(lastReceivedCommandID);
+      buffer.writeInt(lastConfirmedCommandID);
       buffer.writeBoolean(reattached);
    }
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)
    {
-      lastReceivedCommandID = buffer.readInt();
+      lastConfirmedCommandID = buffer.readInt();
       reattached = buffer.readBoolean();
    }
 
@@ -92,7 +92,7 @@
 
       ReattachSessionResponseMessage r = (ReattachSessionResponseMessage)other;
 
-      return super.equals(other) && lastReceivedCommandID == r.lastReceivedCommandID;
+      return super.equals(other) && lastConfirmedCommandID == r.lastConfirmedCommandID;
    }
 
    @Override

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -70,7 +70,7 @@
 
    void unregisterActivateCallback(ActivateCallback callback);
 
-   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
 
    /** The journal at the backup server has to be equivalent as the journal used on the live node. 
     *  Or else the backup node is out of sync. */

Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -42,5 +42,9 @@
 
    void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
 
-   void forceDelivery(long sequence);
+   void forceDelivery(long sequence);   
+   
+   void setTransferring(boolean transferring);
 }
+
+

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -150,7 +150,7 @@
 
       try
       {
-         response = server.reattachSession(connection, request.getName(), request.getLastReceivedCommandID());
+         response = server.reattachSession(connection, request.getName(), request.getLastConfirmedCommandID());
       }
       catch (Exception e)
       {

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -539,7 +539,7 @@
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
                                                          final String name,
-                                                         final int lastReceivedCommandID) throws Exception
+                                                         final int lastConfirmedCommandID) throws Exception
    {
       if (!started)
       {
@@ -581,9 +581,9 @@
          else
          {
             // Reconnect the channel to the new connection
-            int serverLastReceivedCommandID = session.transferConnection(connection, lastReceivedCommandID);
+            int serverLastConfirmedCommandID = session.transferConnection(connection, lastConfirmedCommandID);
 
-            return new ReattachSessionResponseMessage(serverLastReceivedCommandID, true);
+            return new ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
          }
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -120,7 +120,10 @@
    private final ManagementService managementService;
 
    private final Binding binding;
+   
+   private boolean transferring = false;
 
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerConsumerImpl(final long id,
@@ -197,8 +200,8 @@
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the
          // queue for delivery later.
-         if (!started)
-         {
+         if (!started || transferring)
+         {            
             return HandleStatus.BUSY;
          }
 
@@ -413,10 +416,27 @@
          promptDelivery(true);
       }
    }
-
+   
+   public void setTransferring(final boolean transferring)
+   {
+      lock.lock();
+      try
+      {
+         this.transferring = transferring;
+      }
+      finally
+      {
+         lock.unlock();
+      }
+      
+      if (!transferring)
+      {
+         promptDelivery(true);
+      }
+   }
+   
    public void receiveCredits(final int credits) throws Exception
    {
-
       if (credits == -1)
       {
          // No flow control

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-09 20:29:24 UTC (rev 8651)
@@ -1596,14 +1596,14 @@
    }
 
    public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
-   {
-      boolean wasStarted = started;
+   { 
+      //We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get delivered
+      //after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
+      //sequence of packets.
+      //It is not sufficient to just stop the session, since right after stopping the session, another session start might be executed
+      //before we have transferred the connection, leaving it in a started state      
+      setTransferring(true);
 
-      if (wasStarted)
-      {
-         setStarted(false);
-      }
-
       remotingConnection.removeFailureListener(this);
       remotingConnection.removeCloseListener(this);
 
@@ -1613,9 +1613,9 @@
       // the replicating connection will cause the outstanding responses to be be replayed on the live server,
       // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
       // received responses that the backup did not know about.
-
+            
       channel.transferConnection(newConnection);
-
+      
       newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
       remotingConnection = newConnection;
@@ -1623,15 +1623,14 @@
       remotingConnection.addFailureListener(this);
       remotingConnection.addCloseListener(this);
 
-      int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+      int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
 
       channel.replayCommands(lastReceivedCommandID, id);
+       
+      channel.setTransferring(false);
+      
+      setTransferring(false);
 
-      if (wasStarted)
-      {
-         setStarted(true);
-      }
-
       return serverLastReceivedCommandID;
    }
 
@@ -1807,7 +1806,17 @@
 
       started = s;
    }
+   
+   private void setTransferring(final boolean transferring)
+   {
+      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
 
+      for (ServerConsumer consumer : consumersClone)
+      {
+         consumer.setTransferring(transferring);
+      }
+   }   
+
    /**
     * We need to create the LargeMessage before replicating the packet, or else we won't know how to extract the destination,
     * which is stored on the header



More information about the hornetq-commits mailing list