Author: clebert.suconic(a)jboss.com
Date: 2009-12-05 01:32:26 -0500 (Sat, 05 Dec 2009)
New Revision: 8575
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
Log:
Reverting last commit on ordering & failover since it didn't fix the issue
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-05 04:41:10 UTC
(rev 8574)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-05 06:32:26 UTC
(rev 8575)
@@ -68,8 +68,6 @@
private final Object sendBlockingLock = new Object();
- private final Object replayLock = new Object();
-
private boolean failingOver;
private final int confWindowSize;
@@ -110,7 +108,7 @@
{
return lock;
}
-
+
public int getConfirmationWindowSize()
{
return confWindowSize;
@@ -147,7 +145,7 @@
// This must never called by more than one thread concurrently
public void send(final Packet packet, final boolean flush)
- {
+ {
synchronized (sendLock)
{
packet.setChannelID(id);
@@ -331,20 +329,15 @@
public void replayCommands(final int otherLastReceivedCommandID, final long
newChannelID)
{
- // need to make sure we won't clear any packets while replaying or we could
- // break order eventually
- synchronized (replayLock)
+ if (resendCache != null)
{
- if (resendCache != null)
+ clearUpTo(otherLastReceivedCommandID);
+
+ for (final Packet packet : resendCache)
{
- clearUpTo(otherLastReceivedCommandID);
-
- for (final Packet packet : resendCache)
- {
- packet.setChannelID(newChannelID);
-
- doWrite(packet);
- }
+ packet.setChannelID(newChannelID);
+
+ doWrite(packet);
}
}
}
@@ -394,7 +387,7 @@
{
lastReceivedCommandID++;
- receivedBytes += packet.getPacketSize();
+ receivedBytes += packet.getPacketSize();
if (receivedBytes >= confWindowSize)
{
@@ -474,42 +467,39 @@
private void clearUpTo(final int lastReceivedCommandID)
{
- synchronized (replayLock)
+ final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+
+ if (numberToClear == -1)
{
- final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+ throw new IllegalArgumentException("Invalid lastReceivedCommandID: " +
lastReceivedCommandID);
+ }
- if (numberToClear == -1)
- {
- throw new IllegalArgumentException("Invalid lastReceivedCommandID:
" + lastReceivedCommandID);
- }
+ int sizeToFree = 0;
- int sizeToFree = 0;
+ for (int i = 0; i < numberToClear; i++)
+ {
+ final Packet packet = resendCache.poll();
- for (int i = 0; i < numberToClear; i++)
+ if (packet == null)
{
- final Packet packet = resendCache.poll();
+ log.warn("Can't find packet to clear: " + " last received
command id " +
+ lastReceivedCommandID +
+ " first stored command id " +
+ firstStoredCommandID);
+ return;
+ }
- if (packet == null)
- {
- log.warn("Can't find packet to clear: " + " last
received command id " +
- lastReceivedCommandID +
- " first stored command id " +
- firstStoredCommandID);
- return;
- }
-
- if (packet.getType() != PACKETS_CONFIRMED)
- {
- sizeToFree += packet.getPacketSize();
- }
-
- if (commandConfirmationHandler != null)
- {
- commandConfirmationHandler.commandConfirmed(packet);
- }
+ if (packet.getType() != PACKETS_CONFIRMED)
+ {
+ sizeToFree += packet.getPacketSize();
}
- firstStoredCommandID += numberToClear;
+ if (commandConfirmationHandler != null)
+ {
+ commandConfirmationHandler.commandConfirmed(packet);
+ }
}
+
+ firstStoredCommandID += numberToClear;
}
}