[jboss-cvs] JBoss Messaging SVN: r5062 - in trunk: src/main/org/jboss/messaging/core/remoting and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 3 08:24:02 EDT 2008
Author: timfox
Date: 2008-10-03 08:24:02 -0400 (Fri, 03 Oct 2008)
New Revision: 5062
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
More failover and session replication malarkey
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -474,7 +474,7 @@
autoCommitSends,
autoCommitAcks);
- Channel channel1 = connection.getChannel(1, false, -1, true);
+ Channel channel1 = connection.getChannel(1, false, -1, false, true);
try
{
@@ -499,9 +499,16 @@
CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
+ int packetConfirmationBatchSize = response.getPacketConfirmationBatchSize();
+
+ //TODO - don't need packet confirmationbatch size on the client side - it's only really
+ //needed on the server side
+ //since confirmations are only sent from server to client
+
Channel sessionChannel = connection.getChannel(sessionChannelID,
false,
- response.getPacketConfirmationBatchSize(),
+ -1,
+ packetConfirmationBatchSize != -1,
!hasBackup);
ClientSessionInternal session = new ClientSessionImpl(this,
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -735,9 +735,9 @@
remotingConnection = backupConnection;
- Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
+ Packet request = new ReattachSessionMessage(name);
- Channel channel1 = backupConnection.getChannel(1, false, -1, true);
+ Channel channel1 = backupConnection.getChannel(1, false, -1, false, true);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -42,7 +42,7 @@
void transferConnection(RemotingConnection newConnection);
- int replayCommands(int lastReceivedCommandID);
+ void replayCommands(int lastReceivedCommandID);
int getLastReceivedCommandID();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -26,14 +26,11 @@
{
Object getID();
- Channel getChannel(long channelID, boolean ordered, int packetConfirmationBatchSize, boolean interruptBlockOnFailure);
+ Channel getChannel(long channelID, boolean ordered, int packetConfirmationBatchSize,
+ boolean hasResendCache, boolean interruptBlockOnFailure);
long generateChannelID();
- public void setReplicating(boolean backup);
-
- boolean isReplicating();
-
void addFailureListener(FailureListener listener);
boolean removeFailureListener(FailureListener listener);
@@ -51,4 +48,6 @@
void syncIDGeneratorSequence(long id);
long getIDGeneratorSequence();
+
+ void activate();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -103,7 +103,7 @@
pingExecutor,
null,
null,
- true);
+ true);
handler.conn = connection;
@@ -146,7 +146,7 @@
pingExecutor,
null,
null,
- true);
+ true);
handler.conn = connection;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -203,12 +203,8 @@
private final RemotingConnection replicatingConnection;
- private volatile boolean replicating;
+ private volatile boolean active;
- private final boolean client;
-
- private boolean writePackets;
-
private final long pingPeriod;
private final ScheduledExecutorService pingExecutor;
@@ -230,7 +226,7 @@
final ScheduledExecutorService pingExecutor,
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
- final boolean client)
+ final boolean active)
{
this.transportConnection = transportConnection;
@@ -250,16 +246,14 @@
this.replicatingConnection = replicatingConnection;
- this.client = client;
-
- writePackets = client; // Gets changed when setReplicating is called
-
+ this.active = active;
+
this.pingPeriod = pingPeriod;
this.pingExecutor = pingExecutor;
// Channel zero is reserved for pinging
- pingChannel = getChannel(0, false, -1, false);
+ pingChannel = getChannel(0, false, -1, false, false);
final ChannelHandler ppHandler = new PingPongHandler();
@@ -293,13 +287,14 @@
public synchronized Channel getChannel(final long channelID,
final boolean ordered,
final int packetConfirmationBatchSize,
+ final boolean hasResendCache,
final boolean interruptBlockOnFailure)
{
ChannelImpl channel = channels.get(channelID);
if (channel == null)
{
- channel = new ChannelImpl(this, channelID, ordered, packetConfirmationBatchSize, interruptBlockOnFailure);
+ channel = new ChannelImpl(this, channelID, ordered, packetConfirmationBatchSize, hasResendCache, interruptBlockOnFailure);
channels.put(channelID, channel);
}
@@ -307,19 +302,6 @@
return channel;
}
- // This is a bit hacky - can we somehow do this in the constructor?
- public void setReplicating(final boolean replicating)
- {
- this.replicating = replicating;
-
- writePackets = client || !replicating;
- }
-
- public boolean isReplicating()
- {
- return replicating;
- }
-
public void addFailureListener(final FailureListener listener)
{
if (listener == null)
@@ -442,6 +424,11 @@
}
}
}
+
+ public void activate()
+ {
+ active = true;
+ }
// Package protected
// ----------------------------------------------------------------------------
@@ -844,11 +831,18 @@
private volatile boolean closed;
private final boolean interruptBlockOnFailure;
+
+ private final Object waitLock = new Object();
+ private Thread blockThread;
+
+ private ResponseNotifier responseNotifier;
+
private ChannelImpl(final RemotingConnectionImpl connection,
final long id,
final boolean ordered,
final int packetConfirmationBatchSize,
+ final boolean hasResendCache,
final boolean interruptBlockOnFailure)
{
this.connection = connection;
@@ -863,10 +857,9 @@
{
executor = null;
}
+
- this.packetConfirmationBatchSize = packetConfirmationBatchSize;
-
- if (packetConfirmationBatchSize != -1 && ((connection.client && !connection.replicating) || (!connection.client && connection.replicatingConnection == null)))
+ if (hasResendCache)
{
resendCache = new ConcurrentLinkedQueue<Packet>();
@@ -879,12 +872,17 @@
if (connection.replicatingConnection != null)
{
- replicatingChannel = connection.replicatingConnection.getChannel(id, ordered, -1, interruptBlockOnFailure);
+ //Don't want to send confirmations if replicating to backup
+ this.packetConfirmationBatchSize = -1;
+
+ replicatingChannel = connection.replicatingConnection.getChannel(id, ordered, -1, false, interruptBlockOnFailure);
replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
}
else
{
+ this.packetConfirmationBatchSize = packetConfirmationBatchSize;
+
replicatingChannel = null;
}
@@ -905,36 +903,27 @@
// This must never called by more than one thread concurrently
public void send(final Packet packet)
{
- synchronized (this)
+ if (connection.active || packet.isWriteAlways())
{
- packet.setChannelID(id);
-
- lock.lock();
- try
+ synchronized (this)
{
- if (resendCache != null)
+ packet.setChannelID(id);
+
+ lock.lock();
+ try
{
- addToCache(packet);
+ addToCache(packet);
}
+ finally
+ {
+ lock.unlock();
+ }
+
+ connection.doWrite(packet);
}
- finally
- {
- lock.unlock();
- }
-
- if (connection.writePackets || packet.isWriteAlways())
- {
- connection.doWrite(packet);
- }
}
}
-
- private final Object waitLock = new Object();
-
- private Thread blockThread;
-
- private ResponseNotifier responseNotifier;
-
+
public Executor getExecutor()
{
return executor;
@@ -967,10 +956,7 @@
lock.lock();
try
{
- if (resendCache != null)
- {
- addToCache(packet);
- }
+ addToCache(packet);
}
finally
{
@@ -1149,7 +1135,7 @@
}
}
- public int replayCommands(final int otherLastReceivedCommandID)
+ public void replayCommands(final int otherLastReceivedCommandID)
{
clearUpTo(otherLastReceivedCommandID);
@@ -1157,8 +1143,6 @@
{
connection.doWrite(packet);
}
-
- return lastReceivedCommandID;
}
public void lock()
@@ -1287,16 +1271,12 @@
}
}
- private volatile Packet lastReceivedPacket;
-
private void checkConfirmation(final Packet packet)
{
- if (resendCache != null && packet.isRequiresConfirmations())
+ if (packet.isRequiresConfirmations() && packetConfirmationBatchSize != -1)
{
lastReceivedCommandID++;
- lastReceivedPacket = packet;
-
if (lastReceivedCommandID == nextConfirmation)
{
final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
@@ -1308,12 +1288,14 @@
connection.doWrite(confirmed);
}
}
-
}
private void addToCache(final Packet packet)
{
- resendCache.add(packet);
+ if (resendCache != null)
+ {
+ resendCache.add(packet);
+ }
}
private void clearUpTo(final int lastReceivedCommandID)
@@ -1331,8 +1313,7 @@
if (packet == null)
{
- throw new IllegalStateException("Can't find packet to clear, client: " + connection.client +
- " replicating: " + connection.replicating +
+ throw new IllegalStateException("Can't find packet to clear: " +
" last received command id " + lastReceivedCommandID +
" first stored command id " + firstStoredCommandID +
" channel id " + id);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -239,12 +239,10 @@
null,
interceptors,
replicatingConnection,
- false);
+ !backup);
- rc.setReplicating(backup);
+ Channel channel1 = rc.getChannel(1, false, -1, false, true);
- Channel channel1 = rc.getChannel(1, false, -1, true);
-
ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
channel1.setHandler(handler);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -39,19 +39,15 @@
private String name;
- private int lastReceivedCommandID;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReattachSessionMessage(final String name, final int lastReceivedCommandID)
+ public ReattachSessionMessage(final String name)
{
super(REATTACH_SESSION);
this.name = name;
-
- this.lastReceivedCommandID = lastReceivedCommandID;
}
public ReattachSessionMessage()
@@ -66,21 +62,14 @@
return name;
}
- public int getLastReceivedCommandID()
- {
- return lastReceivedCommandID;
- }
-
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putString(name);
- buffer.putInt(lastReceivedCommandID);
}
public void decodeBody(final MessagingBuffer buffer)
{
name = buffer.getString();
- lastReceivedCommandID = buffer.getInt();
}
public boolean equals(Object other)
@@ -92,8 +81,7 @@
ReattachSessionMessage r = (ReattachSessionMessage)other;
- return super.equals(other) && this.name.equals(r.name) &&
- this.lastReceivedCommandID == r.lastReceivedCommandID;
+ return super.equals(other) && this.name.equals(r.name);
}
public final boolean isRequiresConfirmations()
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -61,7 +61,7 @@
Version getVersion();
- ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+ ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name) throws Exception;
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -125,10 +125,8 @@
void browserReset(long browserID) throws Exception;
- void transferConnection(RemotingConnection newConnection);
+ int transferConnection(RemotingConnection newConnection);
- int replayCommands(int lastReceivedCommandID);
-
void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
void failedOver() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -384,8 +384,7 @@
}
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
- final String name,
- final int lastReceivedCommandID) throws Exception
+ final String name) throws Exception
{
ServerSession session = sessions.get(name);
@@ -395,25 +394,16 @@
}
// Reconnect the channel to the new connection
- session.transferConnection(connection);
+ int serverLastReceivedCommandID = session.transferConnection(connection);
- // This is necessary for invm since the replicating connection will be the
- // same connection
- // as the original replicating connection since the key is the same in the
- // registry, and that connection
- // won't have any resend buffer etc
- connection.setReplicating(false);
+ connection.activate();
- int serverLastReceivedCommandID = session.replayCommands(lastReceivedCommandID);
-
postOffice.activate();
configuration.setBackup(false);
remotingService.setBackup(false);
- connection.setReplicating(false);
-
session.failedOver();
return new ReattachSessionResponseMessage(serverLastReceivedCommandID);
@@ -448,7 +438,9 @@
securityStore.authenticate(username, password);
- Channel channel = connection.getChannel(channelID, true, configuration.getPacketConfirmationBatchSize(), false);
+ //Server side connections never have a resend cache
+
+ Channel channel = connection.getChannel(channelID, true, configuration.getPacketConfirmationBatchSize(), false, false);
final ServerSessionImpl session = new ServerSessionImpl(name,
channelID,
@@ -505,10 +497,7 @@
RemotingConnection replicatingConnection = ConnectionRegistryImpl.instance.getConnectionNoCache(backupConnectorFactory,
backupConnectorParams,
-1,
- 30000);
-
- replicatingConnection.setReplicating(true);
-
+ 30000);
return replicatingConnection;
}
else
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -87,7 +87,7 @@
{
ReattachSessionMessage request = (ReattachSessionMessage)packet;
- response = server.reattachSession(connection, request.getName(), request.getLastReceivedCommandID());
+ response = server.reattachSession(connection, request.getName());
break;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -1075,7 +1075,7 @@
producers.get(producerID).send(message);
}
- public void transferConnection(final RemotingConnection newConnection)
+ public int transferConnection(final RemotingConnection newConnection)
{
remotingConnection.removeFailureListener(this);
@@ -1089,13 +1089,14 @@
remotingConnection = newConnection;
remotingConnection.addFailureListener(this);
+
+ int lastReceivedCommandID = channel.getLastReceivedCommandID();
+
+ //TODO resend any dup responses
+
+ return lastReceivedCommandID;
}
- public int replayCommands(final int lastReceivedCommandID)
- {
- return channel.replayCommands(lastReceivedCommandID);
- }
-
public void handleManagementMessage(final SessionSendManagementMessage message) throws Exception
{
ServerMessage serverMessage = message.getServerMessage();
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -0,0 +1,1300 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RandomFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class RandomFailoverTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+ // Constants -----------------------------------------------------
+
+ private static final int RECEIVE_TIMEOUT = 5000;
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ //private volatile Thread failThread;
+
+ private Timer timer = new Timer();
+
+ private volatile Failer failer;
+
+ private void startFailer(final long time)
+ {
+ failer = new Failer();
+
+ timer.schedule(failer, (long)(time * Math.random()), Long.MAX_VALUE);
+ }
+
+ private static class Failer extends TimerTask
+ {
+ volatile ClientSession session;
+
+ public void run()
+ {
+ if (session != null)
+ {
+ log.info("** Failing connection");
+
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+ log.info("** Fail complete");
+
+ session = null;
+
+ cancel();
+ }
+ }
+ }
+
+ public void testFailureA() throws Exception
+ {
+ for (int its = 0; its < 1000; its++)
+ {
+ start();
+
+ startFailer(3000);
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ do
+ {
+ testA(sf);
+ }
+ while (failer.session != null);
+
+ stop();
+ }
+ }
+
+ public void testA(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false, false);
+
+ failer.session = s;
+
+ final int numMessages = 100;
+
+ final int numSessions = 50;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ // log.info("sent messages");
+
+ class MyHandler implements MessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ fail("Too many messages");
+ }
+
+ assertEquals(count, message.getProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ sessSend.close();
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ }
+
+ public void testB() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 1000;
+
+ final int numSessions = 100;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true, false);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler implements MessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ fail("Too many messages");
+ }
+
+ assertEquals(count, message.getProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ sessSend.close();
+
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+ public void testC() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 1000;
+
+ final int numSessions = 100;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ class MyHandler implements MessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ fail("Too many messages");
+ }
+
+ assertEquals(count, message.getProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ handlers.clear();
+
+ //New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+ public void testD() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false, false);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session: sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler implements MessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ fail("Too many messages");
+ }
+
+ assertEquals(count, message.getProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ handlers.clear();
+
+ //New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+ // Now with synchronous receive()
+
+
+ public void testE() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 1000;
+
+ final int numSessions = 100;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(new SimpleString("count")));
+
+ msg.processed();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+ public void testF() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 1000;
+
+ final int numSessions = 100;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true, false);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(new SimpleString("count")));
+
+ msg.processed();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+ public void testG() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(new SimpleString("count")));
+
+ msg.processed();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ assertNull(msg);
+ }
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(new SimpleString("count")));
+
+ msg.processed();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ assertNull(msg);
+ }
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+
+ public void testH() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false, false);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session: sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(new SimpleString("count")));
+
+ msg.processed();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ assertNull(msg);
+ }
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(new SimpleString("count")));
+
+ msg.processed();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ assertNull(msg);
+ }
+ }
+
+ for (ClientSession session: sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session: sessions)
+ {
+ session.close();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+ public void testI() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numIts = 1000;
+
+ for (int i = 0; i < numIts; i++)
+ {
+ //log.info("iteration " + i);
+ ClientSession sessCreate = sf.createSession(false, true, true, false);
+
+ sessCreate.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+
+
+ ClientSession sess = sf.createSession(false, true, true, false);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(ADDRESS);
+
+ ClientProducer producer = sess.createProducer(ADDRESS);
+
+ ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.getBody().flip();
+
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(message2);
+
+ message2.processed();
+
+ sess.close();
+
+ sessCreate.deleteQueue(ADDRESS);
+
+ sessCreate.close();
+
+ }
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+ public void testJ() throws Exception
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ ClientSession sessHold = sf.createSession(false, true, true, false);
+
+ final int numIts = 1000;
+
+ for (int i = 0; i < numIts; i++)
+ {
+ //log.info("iteration " + i);
+ ClientSession sessCreate = sf.createSession(false, true, true, false);
+
+ sessCreate.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+
+
+ ClientSession sess = sf.createSession(false, true, true, false);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(ADDRESS);
+
+ ClientProducer producer = sess.createProducer(ADDRESS);
+
+ ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.getBody().flip();
+
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(message2);
+
+ message2.processed();
+
+ sess.close();
+
+ sessCreate.deleteQueue(ADDRESS);
+
+ sessCreate.close();
+
+ }
+
+ sessHold.close();
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ private void start() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupConf.setPacketConfirmationBatchSize(10);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.setPacketConfirmationBatchSize(10);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+ liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+ liveService.start();
+ }
+
+ private void stop() throws Exception
+ {
+ ConnectionRegistryImpl.instance.dump();
+
+ assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -0,0 +1,212 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A SimpleAutomaticFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SimpleManualFailoverTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(SimpleManualFailoverTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private MessagingService server0Service;
+
+ private MessagingService server1Service;
+
+ private Map<String, Object> server1Params = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private ClientSession sendAndConsume(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ message2.processed();
+ }
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ return session;
+ }
+
+ public void testFailover() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sendAndConsume(sf);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements FailureListener
+ {
+ public void connectionFailed(MessagingException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ conn.addFailureListener(new MyListener());
+
+ // Simulate failure on connection
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ // Wait to be informed of failure
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+
+ log.info("closing session");
+ session.close();
+ log.info("closed session");
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ server1Params));
+
+ session = sendAndConsume(sf);
+
+ session.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ Configuration server1Conf = new ConfigurationImpl();
+ server1Conf.setSecurityEnabled(false);
+ server1Conf.setPacketConfirmationBatchSize(1);
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ server1Conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ server1Params));
+ server1Service = MessagingServiceImpl.newNullStorageMessagingServer(server1Conf);
+ server1Service.start();
+
+ Configuration server0Conf = new ConfigurationImpl();
+ server0Conf.setSecurityEnabled(false);
+ server0Conf.setPacketConfirmationBatchSize(1);
+ server0Conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ server0Service = MessagingServiceImpl.newNullStorageMessagingServer(server0Conf);
+ server0Service.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+ assertEquals(0, server1Service.getServer().getRemotingService().getConnections().size());
+
+ server1Service.stop();
+
+ assertEquals(0, server0Service.getServer().getRemotingService().getConnections().size());
+
+ server0Service.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java 2008-10-03 12:24:02 UTC (rev 5062)
@@ -0,0 +1,574 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A SystematicFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class SystematicFailoverTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private volatile Thread failThread;
+
+ private void setFailAfterRandomTime(final RemotingConnection conn)
+ {
+ failThread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep((long)(300 * Math.random()));
+
+ log.info("Failing");
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ };
+
+ failThread.start();
+ }
+
+ private void waitForFailThread() throws Exception
+ {
+ if (failThread != null)
+ {
+ failThread.join();
+ }
+ }
+
+ public void testExerciseAPI() throws Exception
+ {
+ for (int its = 0; its < 1000; its++)
+ {
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ // Non transacted
+ ClientSession session1 = sf.createSession(false, true, true, false);
+
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session1).getConnection();
+
+ setFailAfterRandomTime(conn);
+
+ session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ final int numMessages = 1000;
+
+ ClientProducer producer1 = session1.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer1.send(message);
+ }
+
+ ClientSession session2 = sf.createSession(false, true, true, false);
+
+ session2.start();
+
+ ClientConsumer consumer1 = session2.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive();
+
+ assertEquals("aardvarks", message.getBody().getString());
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.processed();
+ }
+
+ producer1.close();
+
+ ClientProducer producer2 = session1.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session2.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer2.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive();
+
+ assertEquals("aardvarks", message.getBody().getString());
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.processed();
+ }
+
+ consumer1.close();
+
+ // Transacted
+ ClientSession session3 = sf.createSession(false, false, false, false);
+
+ ClientProducer producer3 = session3.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session3.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer3.send(message);
+
+ session3.rollback();
+
+ message = session3.createClientMessage(JBossTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer3.send(message);
+
+ session3.commit();
+ }
+
+ ClientConsumer consumer2 = session3.createConsumer(ADDRESS);
+
+ session3.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer2.receive();
+
+ assertEquals("aardvarks", message.getBody().getString());
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.processed();
+ }
+
+ session3.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer2.receive();
+
+ assertEquals("aardvarks", message.getBody().getString());
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.processed();
+ }
+
+ session3.commit();
+
+ session1.close();
+
+ session2.close();
+
+ final int numConsumers = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+
+ final ClientSession session4 = sf.createSession(false, true, true, false);
+
+ session4.start();
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ SimpleString wibble = new SimpleString("wibble");
+
+ session4.addDestination(wibble, false, false);
+
+ session4.removeDestination(wibble, false);
+
+ SimpleString subName = new SimpleString("sub" + i);
+
+ session4.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = session4.createConsumer(subName);
+
+ consumers.add(consumer);
+ }
+
+ ClientProducer producer4 = session4.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session4.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer4.send(message);
+ }
+
+ // Consume synchronously
+ for (ClientConsumer consumer : consumers)
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive();
+
+ assertEquals("aardvarks", message.getBody().getString());
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.processed();
+ }
+ }
+
+ class MyHandler implements MessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ assertEquals(count, message.getProperty(new SimpleString("count")));
+
+ // log.info(this + " got message " + count);
+
+ try
+ {
+ message.processed();
+ }
+ catch (MessagingException me)
+ {
+ fail();
+ }
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session4.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer4.send(message);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ session3.close();
+
+ session4.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("iteration" + its + " duration " + (end - start));
+
+ this.waitForFailThread();
+
+ stop();
+ }
+ }
+
+ public void testReplicateWithHandlers() throws Exception
+ {
+ for (int its = 0; its < 100; its++)
+ {
+ log.info("Starting iteration " + its);
+
+ start();
+
+ long start = System.currentTimeMillis();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numMessages = 100;
+
+ final int numConsumers = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+
+ ClientSession sessConsume = sf.createSession(false, true, true, false);
+
+ sessConsume.start();
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true, false);
+
+ ClientProducer producer4 = sessSend.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer4.send(message);
+ }
+
+
+ class MyHandler implements MessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ //log.info("got message " + message.getMessageID());
+
+ assertEquals(count, message.getProperty(new SimpleString("count")));
+
+ //log.info(this + " got message " + count);
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ sessSend.close();
+ sessConsume.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ // this.waitForFailThread();
+
+ stop();
+ }
+ }
+
+ // public void testHangOnCreateSession(final byte type) throws Exception
+ // {
+ // for (int j = 0; j < 1000; j++)
+ // {
+ // start();
+ //
+ // ClientSessionFactory sf = new ClientSessionFactoryImpl(new
+ // TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ // new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ // backupParams));
+ //
+ // final int numSessions = 50;
+ //
+ // List<ClientSession> sessions = new ArrayList<ClientSession>();
+ //
+ // for (int i = 0; i < numSessions; i++)
+ // {
+ // ClientSession session = sf.createSession(false, true, true, -1, false);
+ //
+ // sessions.add(session);
+ //
+ // if (i == 0)
+ // {
+ // RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+ //
+ // //conn.failOnPacketType(type);
+ // }
+ // }
+ //
+ // for (ClientSession session : sessions)
+ // {
+ // session.close();
+ // }
+ //
+ // stop();
+ // }
+ //
+ // }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ private void start() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupConf.setPacketConfirmationBatchSize(10);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.setPacketConfirmationBatchSize(10);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+ liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+ liveService.start();
+ }
+
+ private void stop() throws Exception
+ {
+ ConnectionRegistryImpl.instance.dump();
+
+ assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list