[hornetq-commits] JBoss hornetq SVN: r8651 - in trunk: examples/core/perf/server0 and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Dec 9 15:29:24 EST 2009
Author: timfox
Date: 2009-12-09 15:29:24 -0500 (Wed, 09 Dec 2009)
New Revision: 8651
Modified:
trunk/build-hornetq.xml
trunk/examples/core/perf/server0/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
fixed re-attach ordering issue
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/build-hornetq.xml 2009-12-09 20:29:24 UTC (rev 8651)
@@ -1373,8 +1373,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${test.classes.dir}">
- <include name="**/org/hornetq/tests/stress/**/*${test-mask}.class"/>
- <exclude name="**/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.class"/>
+ <include name="**/org/hornetq/tests/stress/**/*${test-mask}.class"/>
</fileset>
</batchtest>
</junit>
Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-09 20:29:24 UTC (rev 8651)
@@ -17,7 +17,7 @@
<persistence-enabled>true</persistence-enabled>
- <journal-sync-non-transactional>true</journal-sync-non-transactional>
+ <journal-sync-non-transactional>false</journal-sync-non-transactional>
<journal-sync-transactional>true</journal-sync-transactional>
<journal-type>ASYNCIO</journal-type>
<journal-min-files>20</journal-min-files>
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -846,7 +846,6 @@
// We lock the channel to prevent any packets to be added to the resend
// cache during the failover process
channel.lock();
-
try
{
channel.transferConnection(backupConnection);
@@ -854,9 +853,11 @@
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
+
+ int lcid = channel.getLastConfirmedCommandID();
+
+ Packet request = new ReattachSessionMessage(name, lcid);
- Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
-
Channel channel1 = backupConnection.getChannel(1, -1);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
@@ -865,10 +866,11 @@
{
// The session was found on the server - we reattached transparently ok
- channel.replayCommands(response.getLastReceivedCommandID(), channel.getID());
+ channel.replayCommands(response.getLastConfirmedCommandID(), channel.getID());
}
else
{
+
// The session wasn't found on the server - probably we're failing over onto a backup server where the
// session won't exist or the target server has been restarted - in this case the session will need to be
// recreated,
@@ -994,6 +996,7 @@
channel.returnBlocking();
}
+ channel.setTransferring(false);
}
catch (Throwable t)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -899,26 +899,20 @@
Connection tc = null;
try
- {
- if (connector == null)
- {
- DelegatingBufferHandler handler = new DelegatingBufferHandler();
+ {
+ DelegatingBufferHandler handler = new DelegatingBufferHandler();
- connector = connectorFactory.createConnector(transportParams,
- handler,
- this,
- closeExecutor,
- threadPool,
- scheduledThreadPool);
+ connector = connectorFactory.createConnector(transportParams,
+ handler,
+ this,
+ closeExecutor,
+ threadPool,
+ scheduledThreadPool);
- if (connector != null)
- {
- connector.start();
- }
- }
-
if (connector != null)
{
+ connector.start();
+
tc = connector.createConnection();
if (tc == null)
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -39,9 +39,9 @@
void transferConnection(RemotingConnection newConnection);
- void replayCommands(int lastReceivedCommandID, final long newID);
+ void replayCommands(int lastConfirmedCommandID, final long newID);
- int getLastReceivedCommandID();
+ int getLastConfirmedCommandID();
void lock();
@@ -64,4 +64,6 @@
void clearCommands();
int getConfirmationWindowSize();
+
+ void setTransferring(boolean transferring);
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -50,7 +50,7 @@
private volatile int firstStoredCommandID;
- private volatile int lastReceivedCommandID = -1;
+ private volatile int lastConfirmedCommandID = -1;
private volatile RemotingConnection connection;
@@ -73,7 +73,9 @@
private int receivedBytes;
private CommandConfirmationHandler commandConfirmationHandler;
-
+
+ private volatile boolean transferring;
+
public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
{
this.connection = connection;
@@ -97,9 +99,9 @@
return id;
}
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
public Lock getLock()
@@ -140,6 +142,11 @@
{
send(packet, false);
}
+
+ public void setTransferring(boolean transferring)
+ {
+ this.transferring = transferring;
+ }
// This must never called by more than one thread concurrently
public void send(final Packet packet, final boolean flush)
@@ -147,11 +154,11 @@
synchronized (sendLock)
{
packet.setChannelID(id);
-
+
final HornetQBuffer buffer = packet.encode(connection);
lock.lock();
-
+
try
{
while (failingOver)
@@ -165,6 +172,13 @@
{
}
}
+
+ //Sanity check
+ if (transferring)
+ {
+ throw new IllegalStateException("Cannot send a packet while channel is doing failover");
+ }
+
if (resendCache != null && packet.isRequiresConfirmations())
{
@@ -197,7 +211,7 @@
synchronized (sendBlockingLock)
{
packet.setChannelID(id);
-
+
final HornetQBuffer buffer = packet.encode(connection);
lock.lock();
@@ -306,7 +320,7 @@
closed = true;
}
-
+
public void transferConnection(final RemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
@@ -322,19 +336,21 @@
rnewConnection.putChannel(id, this);
connection = rnewConnection;
+
+ transferring = true;
}
}
- public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
+ public void replayCommands(final int otherLastConfirmedCommandID, final long newChannelID)
{
if (resendCache != null)
{
- clearUpTo(otherLastReceivedCommandID);
+ clearUpTo(otherLastConfirmedCommandID);
for (final Packet packet : resendCache)
{
packet.setChannelID(newChannelID);
-
+
doWrite(packet);
}
}
@@ -372,7 +388,7 @@
{
receivedBytes = 0;
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
confirmed.setChannelID(id);
@@ -384,7 +400,7 @@
{
if (resendCache != null && packet.isRequiresConfirmations())
{
- lastReceivedCommandID++;
+ lastConfirmedCommandID++;
receivedBytes += packet.getPacketSize();
@@ -392,7 +408,7 @@
{
receivedBytes = 0;
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
confirmed.setChannelID(id);
@@ -405,14 +421,14 @@
{
if (resendCache != null)
{
- lastReceivedCommandID = -1;
+ lastConfirmedCommandID = -1;
firstStoredCommandID = 0;
resendCache.clear();
}
}
-
+
public void handlePacket(final Packet packet)
{
if (packet.getType() == PacketImpl.PACKETS_CONFIRMED)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -30,19 +30,19 @@
private String name;
- private int lastReceivedCommandID;
+ private int lastConfirmedCommandID;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReattachSessionMessage(final String name, final int lastReceivedCommandID)
+ public ReattachSessionMessage(final String name, final int lastConfirmedCommandID)
{
super(PacketImpl.REATTACH_SESSION);
this.name = name;
- this.lastReceivedCommandID = lastReceivedCommandID;
+ this.lastConfirmedCommandID = lastConfirmedCommandID;
}
public ReattachSessionMessage()
@@ -57,23 +57,23 @@
return name;
}
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(name);
- buffer.writeInt(lastReceivedCommandID);
+ buffer.writeInt(lastConfirmedCommandID);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
name = buffer.readString();
- lastReceivedCommandID = buffer.readInt();
+ lastConfirmedCommandID = buffer.readInt();
}
@Override
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -28,7 +28,7 @@
// Attributes ----------------------------------------------------
- private int lastReceivedCommandID;
+ private int lastConfirmedCommandID;
private boolean reattached;
@@ -36,11 +36,11 @@
// Constructors --------------------------------------------------
- public ReattachSessionResponseMessage(final int lastReceivedCommandID, final boolean reattached)
+ public ReattachSessionResponseMessage(final int lastConfirmedCommandID, final boolean reattached)
{
super(PacketImpl.REATTACH_SESSION_RESP);
- this.lastReceivedCommandID = lastReceivedCommandID;
+ this.lastConfirmedCommandID = lastConfirmedCommandID;
this.reattached = reattached;
}
@@ -52,9 +52,9 @@
// Public --------------------------------------------------------
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
public boolean isReattached()
@@ -65,14 +65,14 @@
@Override
public void encodeRest(final HornetQBuffer buffer)
{
- buffer.writeInt(lastReceivedCommandID);
+ buffer.writeInt(lastConfirmedCommandID);
buffer.writeBoolean(reattached);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
- lastReceivedCommandID = buffer.readInt();
+ lastConfirmedCommandID = buffer.readInt();
reattached = buffer.readBoolean();
}
@@ -92,7 +92,7 @@
ReattachSessionResponseMessage r = (ReattachSessionResponseMessage)other;
- return super.equals(other) && lastReceivedCommandID == r.lastReceivedCommandID;
+ return super.equals(other) && lastConfirmedCommandID == r.lastConfirmedCommandID;
}
@Override
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -70,7 +70,7 @@
void unregisterActivateCallback(ActivateCallback callback);
- ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+ ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
/** The journal at the backup server has to be equivalent as the journal used on the live node.
* Or else the backup node is out of sync. */
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -42,5 +42,9 @@
void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
- void forceDelivery(long sequence);
+ void forceDelivery(long sequence);
+
+ void setTransferring(boolean transferring);
}
+
+
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -150,7 +150,7 @@
try
{
- response = server.reattachSession(connection, request.getName(), request.getLastReceivedCommandID());
+ response = server.reattachSession(connection, request.getName(), request.getLastConfirmedCommandID());
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -539,7 +539,7 @@
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
final String name,
- final int lastReceivedCommandID) throws Exception
+ final int lastConfirmedCommandID) throws Exception
{
if (!started)
{
@@ -581,9 +581,9 @@
else
{
// Reconnect the channel to the new connection
- int serverLastReceivedCommandID = session.transferConnection(connection, lastReceivedCommandID);
+ int serverLastConfirmedCommandID = session.transferConnection(connection, lastConfirmedCommandID);
- return new ReattachSessionResponseMessage(serverLastReceivedCommandID, true);
+ return new ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -120,7 +120,10 @@
private final ManagementService managementService;
private final Binding binding;
+
+ private boolean transferring = false;
+
// Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@@ -197,8 +200,8 @@
// If the consumer is stopped then we don't accept the message, it
// should go back into the
// queue for delivery later.
- if (!started)
- {
+ if (!started || transferring)
+ {
return HandleStatus.BUSY;
}
@@ -413,10 +416,27 @@
promptDelivery(true);
}
}
-
+
+ public void setTransferring(final boolean transferring)
+ {
+ lock.lock();
+ try
+ {
+ this.transferring = transferring;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ if (!transferring)
+ {
+ promptDelivery(true);
+ }
+ }
+
public void receiveCredits(final int credits) throws Exception
{
-
if (credits == -1)
{
// No flow control
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -1596,14 +1596,14 @@
}
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
- {
- boolean wasStarted = started;
+ {
+ //We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get delivered
+ //after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
+ //sequence of packets.
+ //It is not sufficient to just stop the session, since right after stopping the session, another session start might be executed
+ //before we have transferred the connection, leaving it in a started state
+ setTransferring(true);
- if (wasStarted)
- {
- setStarted(false);
- }
-
remotingConnection.removeFailureListener(this);
remotingConnection.removeCloseListener(this);
@@ -1613,9 +1613,9 @@
// the replicating connection will cause the outstanding responses to be be replayed on the live server,
// if these reach the client who then subsequently fails over, on reconnection to backup, it will have
// received responses that the backup did not know about.
-
+
channel.transferConnection(newConnection);
-
+
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = newConnection;
@@ -1623,15 +1623,14 @@
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
- int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+ int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
channel.replayCommands(lastReceivedCommandID, id);
+
+ channel.setTransferring(false);
+
+ setTransferring(false);
- if (wasStarted)
- {
- setStarted(true);
- }
-
return serverLastReceivedCommandID;
}
@@ -1807,7 +1806,17 @@
started = s;
}
+
+ private void setTransferring(final boolean transferring)
+ {
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+ for (ServerConsumer consumer : consumersClone)
+ {
+ consumer.setTransferring(transferring);
+ }
+ }
+
/**
* We need to create the LargeMessage before replicating the packet, or else we won't know how to extract the destination,
* which is stored on the header
More information about the hornetq-commits
mailing list