[hornetq-commits] JBoss hornetq SVN: r8574 - in trunk: tests/src/org/hornetq/tests/integration/cluster/reattach and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 4 23:41:10 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-04 23:41:10 -0500 (Fri, 04 Dec 2009)
New Revision: 8574

Modified:
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
Out of order issue during failover fix.

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-05 02:06:10 UTC (rev 8573)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-05 04:41:10 UTC (rev 8574)
@@ -68,6 +68,8 @@
 
    private final Object sendBlockingLock = new Object();
 
+   private final Object replayLock = new Object();
+
    private boolean failingOver;
 
    private final int confWindowSize;
@@ -108,7 +110,7 @@
    {
       return lock;
    }
-   
+
    public int getConfirmationWindowSize()
    {
       return confWindowSize;
@@ -145,7 +147,7 @@
 
    // This must never called by more than one thread concurrently
    public void send(final Packet packet, final boolean flush)
-   {      
+   {
       synchronized (sendLock)
       {
          packet.setChannelID(id);
@@ -329,15 +331,20 @@
 
    public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
    {
-      if (resendCache != null)
+      // need to make sure we won't clear any packets while replaying or we could 
+      // break order eventually
+      synchronized (replayLock)
       {
-         clearUpTo(otherLastReceivedCommandID);
-
-         for (final Packet packet : resendCache)
+         if (resendCache != null)
          {
-            packet.setChannelID(newChannelID);
-
-            doWrite(packet);
+            clearUpTo(otherLastReceivedCommandID);
+   
+            for (final Packet packet : resendCache)
+            {
+               packet.setChannelID(newChannelID);
+   
+               doWrite(packet);
+            }
          }
       }
    }
@@ -387,7 +394,7 @@
       {
          lastReceivedCommandID++;
 
-         receivedBytes += packet.getPacketSize();         
+         receivedBytes += packet.getPacketSize();
 
          if (receivedBytes >= confWindowSize)
          {
@@ -467,39 +474,42 @@
 
    private void clearUpTo(final int lastReceivedCommandID)
    {
-      final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
-
-      if (numberToClear == -1)
+      synchronized (replayLock)
       {
-         throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
-      }
+         final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
 
-      int sizeToFree = 0;
+         if (numberToClear == -1)
+         {
+            throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+         }
 
-      for (int i = 0; i < numberToClear; i++)
-      {
-         final Packet packet = resendCache.poll();
+         int sizeToFree = 0;
 
-         if (packet == null)
+         for (int i = 0; i < numberToClear; i++)
          {
-            log.warn("Can't find packet to clear: " + " last received command id " +
-                     lastReceivedCommandID +
-                     " first stored command id " +
-                     firstStoredCommandID);
-            return;
-         }
+            final Packet packet = resendCache.poll();
 
-         if (packet.getType() != PACKETS_CONFIRMED)
-         {
-            sizeToFree += packet.getPacketSize();
+            if (packet == null)
+            {
+               log.warn("Can't find packet to clear: " + " last received command id " +
+                        lastReceivedCommandID +
+                        " first stored command id " +
+                        firstStoredCommandID);
+               return;
+            }
+
+            if (packet.getType() != PACKETS_CONFIRMED)
+            {
+               sizeToFree += packet.getPacketSize();
+            }
+
+            if (commandConfirmationHandler != null)
+            {
+               commandConfirmationHandler.commandConfirmed(packet);
+            }
          }
 
-         if (commandConfirmationHandler != null)
-         {
-            commandConfirmationHandler.commandConfirmed(packet);
-         }
+         firstStoredCommandID += numberToClear;
       }
-
-      firstStoredCommandID += numberToClear;
    }
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java	2009-12-05 02:06:10 UTC (rev 8573)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java	2009-12-05 04:41:10 UTC (rev 8574)
@@ -216,6 +216,7 @@
 
       for (int its = 0; its < numIts; its++)
       {
+         log.info("####" + this.getName() + " iteration #" + its);
          start();
 
          ClientSessionFactoryImpl sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));



More information about the hornetq-commits mailing list