[hornetq-commits] JBoss hornetq SVN: r8575 - trunk/src/main/org/hornetq/core/remoting/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Dec 5 01:32:26 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-05 01:32:26 -0500 (Sat, 05 Dec 2009)
New Revision: 8575

Modified:
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
Log:
Reverting last commit on ordering & failover since it didn't fix the issue

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-05 04:41:10 UTC (rev 8574)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-05 06:32:26 UTC (rev 8575)
@@ -68,8 +68,6 @@
 
    private final Object sendBlockingLock = new Object();
 
-   private final Object replayLock = new Object();
-
    private boolean failingOver;
 
    private final int confWindowSize;
@@ -110,7 +108,7 @@
    {
       return lock;
    }
-
+   
    public int getConfirmationWindowSize()
    {
       return confWindowSize;
@@ -147,7 +145,7 @@
 
    // This must never called by more than one thread concurrently
    public void send(final Packet packet, final boolean flush)
-   {
+   {      
       synchronized (sendLock)
       {
          packet.setChannelID(id);
@@ -331,20 +329,15 @@
 
    public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
    {
-      // need to make sure we won't clear any packets while replaying or we could 
-      // break order eventually
-      synchronized (replayLock)
+      if (resendCache != null)
       {
-         if (resendCache != null)
+         clearUpTo(otherLastReceivedCommandID);
+
+         for (final Packet packet : resendCache)
          {
-            clearUpTo(otherLastReceivedCommandID);
-   
-            for (final Packet packet : resendCache)
-            {
-               packet.setChannelID(newChannelID);
-   
-               doWrite(packet);
-            }
+            packet.setChannelID(newChannelID);
+
+            doWrite(packet);
          }
       }
    }
@@ -394,7 +387,7 @@
       {
          lastReceivedCommandID++;
 
-         receivedBytes += packet.getPacketSize();
+         receivedBytes += packet.getPacketSize();         
 
          if (receivedBytes >= confWindowSize)
          {
@@ -474,42 +467,39 @@
 
    private void clearUpTo(final int lastReceivedCommandID)
    {
-      synchronized (replayLock)
+      final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+
+      if (numberToClear == -1)
       {
-         final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+         throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+      }
 
-         if (numberToClear == -1)
-         {
-            throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
-         }
+      int sizeToFree = 0;
 
-         int sizeToFree = 0;
+      for (int i = 0; i < numberToClear; i++)
+      {
+         final Packet packet = resendCache.poll();
 
-         for (int i = 0; i < numberToClear; i++)
+         if (packet == null)
          {
-            final Packet packet = resendCache.poll();
+            log.warn("Can't find packet to clear: " + " last received command id " +
+                     lastReceivedCommandID +
+                     " first stored command id " +
+                     firstStoredCommandID);
+            return;
+         }
 
-            if (packet == null)
-            {
-               log.warn("Can't find packet to clear: " + " last received command id " +
-                        lastReceivedCommandID +
-                        " first stored command id " +
-                        firstStoredCommandID);
-               return;
-            }
-
-            if (packet.getType() != PACKETS_CONFIRMED)
-            {
-               sizeToFree += packet.getPacketSize();
-            }
-
-            if (commandConfirmationHandler != null)
-            {
-               commandConfirmationHandler.commandConfirmed(packet);
-            }
+         if (packet.getType() != PACKETS_CONFIRMED)
+         {
+            sizeToFree += packet.getPacketSize();
          }
 
-         firstStoredCommandID += numberToClear;
+         if (commandConfirmationHandler != null)
+         {
+            commandConfirmationHandler.commandConfirmed(packet);
+         }
       }
+
+      firstStoredCommandID += numberToClear;
    }
 }



More information about the hornetq-commits mailing list