Author: clebert.suconic(a)jboss.com
Date: 2009-12-04 23:41:10 -0500 (Fri, 04 Dec 2009)
New Revision: 8574
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
Out of order issue during failover fix.
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-05 02:06:10 UTC
(rev 8573)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-05 04:41:10 UTC
(rev 8574)
@@ -68,6 +68,8 @@
private final Object sendBlockingLock = new Object();
+ private final Object replayLock = new Object();
+
private boolean failingOver;
private final int confWindowSize;
@@ -108,7 +110,7 @@
{
return lock;
}
-
+
public int getConfirmationWindowSize()
{
return confWindowSize;
@@ -145,7 +147,7 @@
// This must never called by more than one thread concurrently
public void send(final Packet packet, final boolean flush)
- {
+ {
synchronized (sendLock)
{
packet.setChannelID(id);
@@ -329,15 +331,20 @@
public void replayCommands(final int otherLastReceivedCommandID, final long
newChannelID)
{
- if (resendCache != null)
+ // need to make sure we won't clear any packets while replaying or we could
+ // break order eventually
+ synchronized (replayLock)
{
- clearUpTo(otherLastReceivedCommandID);
-
- for (final Packet packet : resendCache)
+ if (resendCache != null)
{
- packet.setChannelID(newChannelID);
-
- doWrite(packet);
+ clearUpTo(otherLastReceivedCommandID);
+
+ for (final Packet packet : resendCache)
+ {
+ packet.setChannelID(newChannelID);
+
+ doWrite(packet);
+ }
}
}
}
@@ -387,7 +394,7 @@
{
lastReceivedCommandID++;
- receivedBytes += packet.getPacketSize();
+ receivedBytes += packet.getPacketSize();
if (receivedBytes >= confWindowSize)
{
@@ -467,39 +474,42 @@
private void clearUpTo(final int lastReceivedCommandID)
{
- final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
-
- if (numberToClear == -1)
+ synchronized (replayLock)
{
- throw new IllegalArgumentException("Invalid lastReceivedCommandID: " +
lastReceivedCommandID);
- }
+ final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
- int sizeToFree = 0;
+ if (numberToClear == -1)
+ {
+ throw new IllegalArgumentException("Invalid lastReceivedCommandID:
" + lastReceivedCommandID);
+ }
- for (int i = 0; i < numberToClear; i++)
- {
- final Packet packet = resendCache.poll();
+ int sizeToFree = 0;
- if (packet == null)
+ for (int i = 0; i < numberToClear; i++)
{
- log.warn("Can't find packet to clear: " + " last received
command id " +
- lastReceivedCommandID +
- " first stored command id " +
- firstStoredCommandID);
- return;
- }
+ final Packet packet = resendCache.poll();
- if (packet.getType() != PACKETS_CONFIRMED)
- {
- sizeToFree += packet.getPacketSize();
+ if (packet == null)
+ {
+ log.warn("Can't find packet to clear: " + " last
received command id " +
+ lastReceivedCommandID +
+ " first stored command id " +
+ firstStoredCommandID);
+ return;
+ }
+
+ if (packet.getType() != PACKETS_CONFIRMED)
+ {
+ sizeToFree += packet.getPacketSize();
+ }
+
+ if (commandConfirmationHandler != null)
+ {
+ commandConfirmationHandler.commandConfirmed(packet);
+ }
}
- if (commandConfirmationHandler != null)
- {
- commandConfirmationHandler.commandConfirmed(packet);
- }
+ firstStoredCommandID += numberToClear;
}
-
- firstStoredCommandID += numberToClear;
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-12-05
02:06:10 UTC (rev 8573)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-12-05
04:41:10 UTC (rev 8574)
@@ -216,6 +216,7 @@
for (int its = 0; its < numIts; its++)
{
+ log.info("####" + this.getName() + " iteration #" + its);
start();
ClientSessionFactoryImpl sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
Show replies by date