[jboss-cvs] JBoss Messaging SVN: r7515 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 2 12:25:40 EDT 2009
Author: timfox
Date: 2009-07-02 12:25:39 -0400 (Thu, 02 Jul 2009)
New Revision: 7515
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
Log:
mt replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -727,15 +727,17 @@
remotingConnection = backupConnection;
- Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
+ Packet request = new ReattachSessionMessage(name, channel.getLastConfirmedCommandID());
Channel channel1 = backupConnection.getChannel(1, -1, false);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+
+ log.info("Got a response with a last received command id " + response.getLastConfirmedCommandID());
if (!response.isRemoved())
{
- channel.replayCommands(response.getLastReceivedCommandID());
+ channel.replayCommands(response.getLastConfirmedCommandID());
ok = true;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -41,9 +41,9 @@
void transferConnection(RemotingConnection newConnection);
- void replayCommands(int lastReceivedCommandID);
+ void replayCommands(int lastConfirmedCommandID);
- int getLastReceivedCommandID();
+ int getLastConfirmedCommandID();
void lock();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -68,7 +68,7 @@
private volatile int firstStoredCommandID;
- private volatile int lastReceivedCommandID = -1;
+ private volatile int lastConfirmedCommandID = -1;
private volatile RemotingConnection connection;
@@ -140,9 +140,9 @@
return id;
}
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
public Lock getLock()
@@ -515,13 +515,14 @@
}
}
- public void replayCommands(final int otherLastReceivedCommandID)
+ public void replayCommands(final int otherLastConfirmedCommandID)
{
- clearUpTo(otherLastReceivedCommandID);
+ clearUpTo(otherLastConfirmedCommandID);
for (final Packet packet : resendCache)
{
- log.info("Replaying command " + packet);
+ //log.info("Replaying command " + packet);
+
doWrite(packet);
}
}
@@ -557,7 +558,7 @@
{
receivedBytes = 0;
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
confirmed.setChannelID(id);
@@ -569,7 +570,7 @@
{
if (resendCache != null && packet.isRequiresConfirmations())
{
- lastReceivedCommandID++;
+ lastConfirmedCommandID++;
receivedBytes += packet.getPacketSize();
@@ -579,7 +580,8 @@
if (connection.isActive())
{
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ // log.info("sending packet confirmed message " + lastConfirmedCommandID);
+ final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
confirmed.setChannelID(id);
@@ -731,13 +733,16 @@
connection.getTransportConnection().write(buffer);
}
- private void clearUpTo(final int lastReceivedCommandID)
+ private void clearUpTo(final int lastConfirmedCommandID)
{
- final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+ //log.info("client " + connection.isClient() + " clearing up to " + lastConfirmedCommandID);
+
+ final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
- if (numberToClear == -1)
+ if (numberToClear < 0)
{
- throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+ throw new IllegalArgumentException("Invalid lastConfirmedCommandID: " + lastConfirmedCommandID +
+ " firstStoredCommandID " + firstStoredCommandID + " client " + connection.isClient());
}
int sizeToFree = 0;
@@ -750,7 +755,7 @@
{
throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
" last received command id " +
- lastReceivedCommandID +
+ lastConfirmedCommandID +
" first stored command id " +
firstStoredCommandID);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -42,6 +42,7 @@
public void registerAcceptor(final int id, final InVMAcceptor acceptor)
{
+ //log.info("Registering acceptor with id " + id, new Exception());
if (acceptors.putIfAbsent(id, acceptor) != null)
{
throw new IllegalArgumentException("Acceptor with id " + id + " already registered");
@@ -68,6 +69,11 @@
public int size()
{
+ log.info("** remaining acceptors");
+ for (Integer i: acceptors.keySet())
+ {
+ log.info("id: " + i);
+ }
return this.acceptors.size();
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -40,19 +40,19 @@
private String name;
- private int lastReceivedCommandID;
+ private int lastConfirmedCommandID;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReattachSessionMessage(final String name, final int lastReceivedCommandID)
+ public ReattachSessionMessage(final String name, final int lastConfirmedCommandID)
{
super(REATTACH_SESSION);
this.name = name;
- this.lastReceivedCommandID = lastReceivedCommandID;
+ this.lastConfirmedCommandID = lastConfirmedCommandID;
}
public ReattachSessionMessage()
@@ -67,9 +67,9 @@
return name;
}
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
public int getRequiredBufferSize()
@@ -81,13 +81,13 @@
public void encodeBody(final MessagingBuffer buffer)
{
buffer.writeString(name);
- buffer.writeInt(lastReceivedCommandID);
+ buffer.writeInt(lastConfirmedCommandID);
}
public void decodeBody(final MessagingBuffer buffer)
{
name = buffer.readString();
- lastReceivedCommandID = buffer.readInt();
+ lastConfirmedCommandID = buffer.readInt();
}
public boolean equals(Object other)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -38,7 +38,7 @@
// Attributes ----------------------------------------------------
- private int lastReceivedCommandID;
+ private int lastConfirmedCommandID;
//Is this flag really necessary - try removing it
private boolean removed;
@@ -47,11 +47,11 @@
// Constructors --------------------------------------------------
- public ReattachSessionResponseMessage(final int lastReceivedCommandID, final boolean removed)
+ public ReattachSessionResponseMessage(final int lastConfirmedCommandID, final boolean removed)
{
super(REATTACH_SESSION_RESP);
- this.lastReceivedCommandID = lastReceivedCommandID;
+ this.lastConfirmedCommandID = lastConfirmedCommandID;
this.removed = removed;
}
@@ -63,9 +63,9 @@
// Public --------------------------------------------------------
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
public boolean isRemoved()
@@ -81,13 +81,13 @@
public void encodeBody(final MessagingBuffer buffer)
{
- buffer.writeInt(lastReceivedCommandID);
+ buffer.writeInt(lastConfirmedCommandID);
buffer.writeBoolean(removed);
}
public void decodeBody(final MessagingBuffer buffer)
{
- lastReceivedCommandID = buffer.readInt();
+ lastConfirmedCommandID = buffer.readInt();
removed = buffer.readBoolean();
}
@@ -105,7 +105,7 @@
ReattachSessionResponseMessage r = (ReattachSessionResponseMessage)other;
- return super.equals(other) && this.lastReceivedCommandID == r.lastReceivedCommandID;
+ return super.equals(other) && this.lastConfirmedCommandID == r.lastConfirmedCommandID;
}
public final boolean isRequiresConfirmations()
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -35,19 +35,15 @@
private List<Long> sequences;
- private boolean requiresResponse;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReplicateLockSequenceMessage(final List<Long> sequences, final boolean requiresResponse)
+ public ReplicateLockSequenceMessage(final List<Long> sequences)
{
super(REPLICATE_LOCK_SEQUENCES);
this.sequences = sequences;
-
- this.requiresResponse = requiresResponse;
}
// Public --------------------------------------------------------
@@ -59,10 +55,7 @@
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
- sequences.size() *
- DataConstants.SIZE_LONG +
- DataConstants.SIZE_BOOLEAN;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + sequences.size() * DataConstants.SIZE_LONG;
}
@Override
@@ -73,7 +66,6 @@
{
buffer.writeLong(sequence);
}
- buffer.writeBoolean(requiresResponse);
}
@Override
@@ -85,18 +77,12 @@
{
sequences.add(buffer.readLong());
}
- requiresResponse = buffer.readBoolean();
}
public List<Long> getSequences()
{
return sequences;
}
-
- public boolean isRequiresResponse()
- {
- return requiresResponse;
- }
// Package protected ---------------------------------------------
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -235,6 +235,8 @@
for (Acceptor acceptor : acceptors)
{
acceptor.stop();
+
+ log.info("Stopping acceptor " + acceptor);
}
acceptors.clear();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -67,7 +67,7 @@
void unregisterActivateCallback(ActivateCallback callback);
- ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+ ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -140,7 +140,7 @@
void handleClose(Packet packet);
- int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
+ int transferConnection(RemotingConnection newConnection, int lastConfirmedCommandID);
Channel getChannel();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -286,7 +286,9 @@
// We start the remoting service here - if the server is a backup remoting service needs to be started
// so it can be initialised by the live node
remotingService.start();
-
+
+ started = true;
+
log.info("JBoss Messaging Server version " + getVersion().getFullVersion() + " started");
}
@@ -309,6 +311,8 @@
session.getChannel().flushConfirmations();
}
+ log.info("Stopping remoting service on backup " + configuration.isBackup());
+
remotingService.stop();
// Stop the deployers
@@ -333,18 +337,35 @@
managementService.stop();
- storageManager.stop();
+ if (storageManager != null)
+ {
+ storageManager.stop();
+ }
if (securityManager != null)
{
securityManager.stop();
}
- resourceManager.stop();
+ if (resourceManager != null)
+ {
+ resourceManager.stop();
+ }
- clusterQueueStateManager.stop();
+ if (clusterQueueStateManager != null)
+ {
+ clusterQueueStateManager.stop();
+ }
- postOffice.stop();
+ if (postOffice != null)
+ {
+ postOffice.stop();
+ }
+
+ if (replicatingConnectionManager != null)
+ {
+ replicatingConnectionManager.close();
+ }
// Need to shutdown pools before shutting down paging manager to make sure everything is written ok
@@ -371,7 +392,10 @@
scheduledPool = null;
threadPool = null;
- pagingManager.stop();
+ if (pagingManager != null)
+ {
+ pagingManager.stop();
+ }
pagingManager = null;
securityStore = null;
@@ -386,6 +410,9 @@
sessions.clear();
started = false;
+
+ log.info(System.identityHashCode(this) + " called stop ");
+
initialised = false;
uuid = null;
nodeID = null;
@@ -462,7 +489,7 @@
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
final String name,
- final int lastReceivedCommandID) throws Exception
+ final int lastConfirmedCommandID) throws Exception
{
ServerSession session = sessions.get(name);
@@ -480,11 +507,11 @@
else
{
// Reconnect the channel to the new connection
- int serverLastReceivedCommandID = session.transferConnection(connection, lastReceivedCommandID);
-
- log.info("Reattached session ok");
+ int serverLastConfirmedCommandID = session.transferConnection(connection, lastConfirmedCommandID);
+
+ log.info("Reattached session ok, server last received command id is " + serverLastConfirmedCommandID);
- return new ReattachSessionResponseMessage(serverLastReceivedCommandID, false);
+ return new ReattachSessionResponseMessage(serverLastConfirmedCommandID, false);
}
}
@@ -940,28 +967,29 @@
// can't find message in queue since active was delivered immediately
private void freezeBackupConnection()
{
- // Sanity check
- // All replicated sessions should be on the same connection
- RemotingConnection replConnection = null;
+// // Sanity check
+// // All replicated sessions should be on the same connection
+// RemotingConnection replConnection = null;
for (ServerSession session : sessions.values())
{
RemotingConnection rc = session.getChannel().getConnection();
- if (replConnection == null)
- {
- replConnection = rc;
- }
- else if (replConnection != rc)
- {
- throw new IllegalStateException("More than one replicating connection!");
- }
+// if (replConnection == null)
+// {
+// replConnection = rc;
+// }
+// else if (replConnection != rc)
+// {
+// throw new IllegalStateException("More than one replicating connection!");
+// }
+ rc.freeze();
}
-
- if (replConnection != null)
- {
- replConnection.freeze();
- }
+//
+// if (replConnection != null)
+// {
+// replConnection.freeze();
+// }
}
private void initialisePart1() throws Exception
@@ -1162,7 +1190,7 @@
initialised = true;
- started = true;
+ log.info(System.identityHashCode(this) + " called initialise part2");
}
private void deployQueuesFromConfiguration() throws Exception
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -61,7 +61,7 @@
private volatile List<Long> sequences;
- private volatile boolean requiresReplicationResponse;
+ // private volatile boolean requiresReplicationResponse;
public MessagingServerPacketHandler(final MessagingServer server,
final Channel channel1,
@@ -95,7 +95,7 @@
sequences = msg.getSequences();
- requiresReplicationResponse = msg.isRequiresResponse();
+ // requiresReplicationResponse = msg.isRequiresResponse();
return;
}
@@ -161,7 +161,7 @@
{
if (replicator != null)
{
- replicator.execute(action);
+ replicator.execute(action, null);
}
else
{
@@ -188,7 +188,7 @@
// send the response message
- if (server.getConfiguration().isBackup() && requiresReplicationResponse || type == REPLICATE_STARTUP_INFO)
+ if (server.getConfiguration().isBackup() || type == REPLICATE_STARTUP_INFO)
{
channel1.send(new ReplicationResponseMessage());
}
@@ -236,7 +236,7 @@
try
{
- response = server.reattachSession(connection, request.getName(), request.getLastReceivedCommandID());
+ response = server.reattachSession(connection, request.getName(), request.getLastConfirmedCommandID());
}
catch (Exception e)
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -1714,7 +1714,7 @@
do
{
- replicator.execute(action);
+ replicator.execute(action, null);
handled = action.getResult();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -56,7 +56,7 @@
private volatile List<Long> sequences;
- private volatile boolean requiresReplicationResponse;
+ //private volatile boolean requiresReplicationResponse;
private final PostOffice postOffice;
@@ -81,7 +81,7 @@
sequences = msg.getSequences();
- requiresReplicationResponse = msg.isRequiresResponse();
+ //requiresReplicationResponse = msg.isRequiresResponse();
break;
}
@@ -114,10 +114,7 @@
// log.info("*** delivered message on backup");
// }
- if (this.requiresReplicationResponse)
- {
- channel.send(new ReplicationResponseMessage());
- }
+ channel.send(new ReplicationResponseMessage());
thread.setNoReplayOrRecord();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -491,7 +491,7 @@
if ((flowControl && availableCredits <= 0) || !started)
{
- log.info("busy");
+ //log.info("busy");
return HandleStatus.BUSY;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -351,7 +351,7 @@
{
setStarted(true);
- channel.confirm(packet);
+ ////channel.confirm(packet);
}
public void handleStop(final Packet packet)
@@ -360,7 +360,7 @@
setStarted(false);
- channel.confirm(packet);
+ ////channel.confirm(packet);
channel.send(response);
}
@@ -430,7 +430,7 @@
}
}
- channel.confirm(packet);
+ ////channel.confirm(packet);
channel.send(response);
}
@@ -549,7 +549,7 @@
}
}
- channel.confirm(packet);
+ ////channel.confirm(packet);
channel.send(response);
}
@@ -625,7 +625,7 @@
}
}
- channel.confirm(packet);
+ ////channel.confirm(packet);
channel.send(response);
}
@@ -663,7 +663,7 @@
}
}
- channel.confirm(packet);
+ ////channel.confirm(packet);
channel.send(response);
}
@@ -716,7 +716,7 @@
}
}
- channel.confirm(packet);
+ ////channel.confirm(packet);
channel.send(response);
}
@@ -762,7 +762,7 @@
}
}
- channel.confirm(packet);
+ ////channel.confirm(packet);
channel.send(response);
}
@@ -799,7 +799,7 @@
}
}
- channel.confirm(packet);
+ ////channel.confirm(packet);
if (response != null)
{
@@ -824,7 +824,7 @@
log.error("Failed to acknowledge", e);
}
- channel.confirm(packet);
+ //channel.confirm(packet);
}
public void handleCommit(final Packet packet)
@@ -855,7 +855,7 @@
tx = new TransactionImpl(storageManager);
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -884,7 +884,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -947,7 +947,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1021,7 +1021,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1033,7 +1033,7 @@
Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1084,7 +1084,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1146,7 +1146,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1209,7 +1209,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1260,7 +1260,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1309,7 +1309,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1369,7 +1369,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1378,7 +1378,7 @@
{
Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1387,7 +1387,7 @@
{
Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1396,7 +1396,7 @@
{
Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
- channel.confirm(packet);
+ //channel.confirm(packet);
channel.send(response);
}
@@ -1446,7 +1446,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
if (response != null)
{
@@ -1503,7 +1503,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
if (response != null)
{
@@ -1535,7 +1535,7 @@
}
}
- channel.confirm(packet);
+ //channel.confirm(packet);
// We flush the confirmations to make sure any send confirmations get handled on the client side
channel.flushConfirmations();
@@ -1545,7 +1545,7 @@
channel.close();
}
- public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
+ public int transferConnection(final RemotingConnection newConnection, final int lastConfirmedCommandID)
{
boolean wasStarted = this.started;
@@ -1577,10 +1577,10 @@
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
- int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+ int serverLastConfirmedCommandID = channel.getLastConfirmedCommandID();
log.info("replaying commands");
- channel.replayCommands(lastReceivedCommandID);
+ channel.replayCommands(lastConfirmedCommandID);
if (wasStarted)
{
@@ -1589,7 +1589,7 @@
log.info("Transferred connection");
- return serverLastReceivedCommandID;
+ return serverLastConfirmedCommandID;
}
public Channel getChannel()
@@ -1698,7 +1698,7 @@
{
Packet response = null;
- channel.confirm(packet);
+ //channel.confirm(packet);
if (response != null)
{
@@ -1721,7 +1721,7 @@
{
log.error("Failed to receive credits", e);
}
- channel.confirm(packet);
+ //channel.confirm(packet);
}
private void doSendLargeMessage(final SessionSendLargeMessage packet)
@@ -1737,7 +1737,7 @@
log.error("Failed to send message", e);
}
- channel.confirm(packet);
+ //channel.confirm(packet);
}
private void handleManagementMessage(final ServerMessage message) throws Exception
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -104,7 +104,7 @@
private volatile List<Long> sequences;
- private volatile boolean requiresReplicationResponse;
+ // private volatile boolean requiresReplicationResponse;
private final Channel channel;
@@ -151,10 +151,12 @@
handlePacket();
// send the response message
-
- if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES && this.requiresReplicationResponse)
+
+ if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
{
- log.info("sending back replication response");
+ channel.confirm(packet);
+
+ //log.info("sending back replication response");
channel.send(new ReplicationResponseMessage());
}
@@ -164,11 +166,20 @@
{
if (replicator != null)
{
- replicator.execute(this);
+ replicator.execute(this,
+ new Runnable()
+ {
+ public void run()
+ {
+ channel.confirm(packet);
+ }
+ });
}
else
{
handlePacket();
+
+ channel.confirm(packet);
}
}
}
@@ -195,7 +206,7 @@
sequences = msg.getSequences();
- this.requiresReplicationResponse = msg.isRequiresResponse();
+ // this.requiresReplicationResponse = msg.isRequiresResponse();
// dumpSequences(sequences);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -35,13 +35,9 @@
*/
public interface Replicator
{
- void execute(ReplicableAction action);
+ void execute(ReplicableAction action, Runnable postReplicateAction);
void registerWaitingChannel(Channel channel);
- // boolean isResponseReceived();
-
void replicationResponseReceived();
-
- // long getReplicateSequence();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -69,7 +69,10 @@
{
long sequence = al.getAndIncrement();
- thread.addSequence(sequence);
+ if (thread.isRecording())
+ {
+ thread.addSequence(sequence);
+ }
return sequence;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -144,7 +144,10 @@
if (ok)
{
- thread.addSequence(counter.getAndIncrement());
+ if (thread.isRecording())
+ {
+ thread.addSequence(counter.getAndIncrement());
+ }
addOwner(thread);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -48,14 +48,17 @@
private final Channel replicatingChannel;
- private final Queue<Set<Channel>> waitingChannelsQueue = new ConcurrentLinkedQueue<Set<Channel>>();
+ private final Queue<WaitingChannelsHolder> waitingChannelsQueue = new ConcurrentLinkedQueue<WaitingChannelsHolder>();
private Set<Channel> currentChannels;
- // private long responseSequence;
-
- // private long replicateSequence;
+ private static class WaitingChannelsHolder
+ {
+ Runnable postReplicateAction;
+ Set<Channel> channels;
+ }
+
public ReplicatorImpl(final Channel replicatingChannel)
{
this.replicatingChannel = replicatingChannel;
@@ -68,17 +71,20 @@
public void replicationResponseReceived()
{
- //long sequence = responseSequence++;
-
- Set<Channel> waitingChannels = waitingChannelsQueue.remove();
+ WaitingChannelsHolder waitingChannelsHolder = waitingChannelsQueue.remove();
- for (Channel channel : waitingChannels)
+ for (Channel channel : waitingChannelsHolder.channels)
{
channel.replicationResponseReceived(this);
+ }
+
+ if (waitingChannelsHolder.postReplicateAction != null)
+ {
+ waitingChannelsHolder.postReplicateAction.run();
}
}
- public void execute(final ReplicableAction action)
+ public void execute(final ReplicableAction action, final Runnable postReplicateAction)
{
// First we execute the action
@@ -98,12 +104,13 @@
// We then send the sequences to the backup
- if (!currentChannels.isEmpty())
- {
- waitingChannelsQueue.add(currentChannels);
- }
+ WaitingChannelsHolder holder = new WaitingChannelsHolder();
+ holder.channels = currentChannels;
+ holder.postReplicateAction = postReplicateAction;
+
+ waitingChannelsQueue.add(holder);
- Packet packet = new ReplicateLockSequenceMessage(sequences, !currentChannels.isEmpty());
+ Packet packet = new ReplicateLockSequenceMessage(sequences);
replicatingChannel.send(packet);
Modified: branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -356,6 +356,7 @@
TextMessage m = session.createTextMessage("message one");
+
prod.send(m);
}
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java 2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java 2009-07-02 16:25:39 UTC (rev 7515)
@@ -85,140 +85,122 @@
public void testReplication1() throws Exception
{
- for (int j = 0; j < 5000; j++)
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setProducerWindowSize(32 * 1024);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
{
- log.info("Iteration " + j);
-
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
- sf.setProducerWindowSize(32 * 1024);
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(ADDRESS, ADDRESS, null, false);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 100;
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().writeString("aardvarks");
- producer.send(message);
- }
-
- //Thread.sleep(500);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- log.info("sent messages");
-
- session.start();
-
- log.info("Started session");
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- assertEquals("aardvarks", message2.getBody().readString());
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- message2.acknowledge();
- }
-
- long end = System.currentTimeMillis();
-
- log.info("That took " + (end - start));
-
- ClientMessage message3 = consumer.receive(250);
-
- assertNull(message3);
-
- session.close();
-
- tearDown();
-
- setUp();
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().writeString("aardvarks");
+ producer.send(message);
}
+
+ // Thread.sleep(500);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ log.info("sent messages");
+
+ session.start();
+
+ log.info("Started session");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().readString());
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ message2.acknowledge();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("That took " + (end - start));
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
}
-
+
public void testReplication2() throws Exception
{
- for (int j = 0; j < 5000; j++)
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setProducerWindowSize(32 * 1024);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ log.info("sent messages");
+
+ session.start();
+
+ // Thread.sleep(500);
+
+ final int numMessages = 1000;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
{
- log.info("Iteration " + j);
-
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
- sf.setProducerWindowSize(32 * 1024);
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(ADDRESS, ADDRESS, null, false);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- log.info("sent messages");
-
- session.start();
-
- //Thread.sleep(500);
-
- final int numMessages = 1000;
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().writeString("aardvarks");
- producer.send(message);
- }
-
- //Thread.sleep(500);
-
- log.info("Started session");
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- assertEquals("aardvarks", message2.getBody().readString());
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- message2.acknowledge();
- }
-
- long end = System.currentTimeMillis();
-
- log.info("That took " + (end - start));
-
- ClientMessage message3 = consumer.receive(250);
-
- assertNull(message3);
-
- session.close();
-
- tearDown();
-
- setUp();
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().writeString("aardvarks");
+ producer.send(message);
}
+
+ // Thread.sleep(500);
+
+ log.info("Started session");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().readString());
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ message2.acknowledge();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("That took " + (end - start));
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
}
public void testFailoverSameConnectionFactory() throws Exception
@@ -248,7 +230,7 @@
message.getBody().writeString("aardvarks");
producer.send(message);
}
-
+
RemotingConnection conn1 = ((ClientSessionImpl)session).getConnection();
// Simulate failure on connection
@@ -684,8 +666,7 @@
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
-
-
+
sf.setFailoverOnServerShutdown(true);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -780,17 +761,14 @@
public void testFailoverOnCreateSession() throws Exception
{
- stopServers();
-
for (int j = 0; j < 10; j++)
{
- startServers();
-
+ log.info("Iteration " + j);
+
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
-
-
+
sf.setFailoverOnServerShutdown(true);
sf.setRetryInterval(100);
sf.setRetryIntervalMultiplier(1);
@@ -854,8 +832,10 @@
assertEquals(0, sf.numConnections());
sf.close();
-
- stopServers();
+
+ tearDown();
+
+ setUp();
}
}
@@ -952,16 +932,16 @@
// We fail on the replicating connection and the client connection
MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED);
-
- //Note we call the remoting service impl handler which is what would happen in event
- //of real connection failure
-
+
+ // Note we call the remoting service impl handler which is what would happen in event
+ // of real connection failure
+
RemotingConnection serverSideReplicatingConnection = backupService.getRemotingService()
.getServerSideReplicatingConnection();
-
-
- ((ConnectionLifeCycleListener)backupService.getRemotingService()).connectionException(serverSideReplicatingConnection.getID(), me);
+ ((ConnectionLifeCycleListener)backupService.getRemotingService()).connectionException(serverSideReplicatingConnection.getID(),
+ me);
+
conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -1046,6 +1026,10 @@
{
backupService.stop();
}
+ else
+ {
+ log.info("*** not stopping backup server since not started");
+ }
if (liveService.isStarted())
{
More information about the jboss-cvs-commits
mailing list