[jboss-cvs] JBoss Messaging SVN: r5291 - in trunk: src/main/org/jboss/messaging/core/remoting and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 6 12:10:18 EST 2008
Author: timfox
Date: 2008-11-06 12:10:18 -0500 (Thu, 06 Nov 2008)
New Revision: 5291
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Cope with backup server dying
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -609,7 +609,7 @@
sessions.add(session);
- ChannelHandler handler = new ClientSessionPacketHandler(session);
+ ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
sessionChannel.setHandler(handler);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -43,10 +44,14 @@
private static final Logger log = Logger.getLogger(ClientSessionPacketHandler.class);
private final ClientSessionInternal clientSession;
+
+ private final Channel channel;
- public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion)
+ public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion, final Channel channel)
{
this.clientSession = clientSesssion;
+
+ this.channel = channel;
}
public void handlePacket(final Packet packet)
@@ -85,5 +90,7 @@
{
log.error("Failed to handle packet", e);
}
+
+ channel.confirm(packet);
}
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -56,23 +56,7 @@
RemotingConnection getConnection();
-// //debug only
-// Queue<Command> getSentCommands();
-//
-// Queue<Command> getReceivedCommands();
-//
-// // For debug only
-// static class Command
-// {
-// public final int commandID;
-//
-// public final Packet packet;
-//
-// public Command(final int commandID, final Packet packet)
-// {
-// this.commandID = commandID;
-//
-// this.packet = packet;
-// }
-// }
+ void replicatingChannelDead();
+
+ void confirm(Packet packet);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -51,4 +51,6 @@
void activate();
void freeze();
+
+ RemotingConnection getReplicatingConnection();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -266,6 +266,11 @@
this.replicatingConnection = replicatingConnection;
+ if (replicatingConnection != null)
+ {
+ replicatingConnection.addFailureListener(new ReplicatingConnectionFailureListener());
+ }
+
this.active = active;
this.pingPeriod = pingPeriod;
@@ -345,10 +350,14 @@
return transportConnection.createBuffer(size);
}
+ public RemotingConnection getReplicatingConnection()
+ {
+ return replicatingConnection;
+ }
+
/*
* This can be called concurrently by more than one thread so needs to be locked
*/
-
public void fail(final MessagingException me)
{
synchronized (failLock)
@@ -853,31 +862,18 @@
this.id = id;
- if (connection.replicatingConnection != null)
+ if (connection.replicatingConnection != null && id != 0)
{
- // Don't want to send confirmations if replicating to backup
- this.windowSize = -1;
-
- this.confWindowSize = -1;
-
// We don't redirect the ping channel
- if (id != 0)
- {
- replicatingChannel = connection.replicatingConnection.getChannel(id, -1, false);
+ replicatingChannel = connection.replicatingConnection.getChannel(id, -1, false);
- replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
- }
+ replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
}
- else
- {
- this.windowSize = windowSize;
+ this.windowSize = windowSize;
- this.confWindowSize = (int)(0.75 * windowSize);
+ this.confWindowSize = (int)(0.75 * windowSize);
- replicatingChannel = null;
- }
-
if (this.windowSize != -1)
{
resendCache = new ConcurrentLinkedQueue<Packet>();
@@ -936,13 +932,13 @@
synchronized (sendLock)
{
packet.setChannelID(id);
-
+
final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-
+
int size = packet.encode(buffer);
-
+
// Must block on semaphore outside the main lock or this can prevent failover from occurring
- if (sendSemaphore != null)
+ if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
{
try
{
@@ -953,9 +949,9 @@
throw new IllegalStateException("Semaphore interrupted");
}
}
-
+
lock.lock();
-
+
try
{
while (failingOver)
@@ -969,12 +965,12 @@
{
}
}
-
+
if (resendCache != null && packet.isRequiresConfirmations())
{
resendCache.add(packet);
}
-
+
if (connection.active || packet.isWriteAlways())
{
connection.transportConnection.write(buffer);
@@ -1088,21 +1084,19 @@
}
}
- public DelayedResult replicatePacket(final Packet packet)
+ // Must be synchronized since can be called by incoming session commands but also by deliveries
+ // Also needs to be synchronized with respect to replicatingChannelDead
+ public synchronized DelayedResult replicatePacket(final Packet packet)
{
if (replicatingChannel != null)
{
- // Must be synchronized since can be called by incoming session commands but also by deliveries
- synchronized (this)
- {
- DelayedResult result = new DelayedResult();
+ DelayedResult result = new DelayedResult();
- responseActions.add(result);
+ responseActions.add(result);
- replicatingChannel.send(packet);
+ replicatingChannel.send(packet);
- return result;
- }
+ return result;
}
else
{
@@ -1110,6 +1104,28 @@
}
}
+ // The replicating connection has died (backup has died)
+ public synchronized void replicatingChannelDead()
+ {
+ replicatingChannel = null;
+
+ // Execute all the response actions now
+
+ while (true)
+ {
+ DelayedResult result = responseActions.poll();
+
+ if (result != null)
+ {
+ result.replicated();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
public void replicateComplete()
{
if (!connection.active)
@@ -1125,16 +1141,31 @@
}
// This will never get called concurrently by more than one thread
+
+ // TODO it's not ideal synchronizing this since it forms a contention point with replication
+ // but we need to do this to protect it w.r.t. the check on replicatingChannel
public void replicateResponseReceived()
{
- DelayedResult result = responseActions.poll();
+ DelayedResult result = null;
- if (result == null)
+ synchronized (this)
{
- throw new IllegalStateException("Cannot find response action");
+ if (replicatingChannel != null)
+ {
+ result = responseActions.poll();
+
+ if (result == null)
+ {
+ throw new IllegalStateException("Cannot find response action");
+ }
+ }
}
- result.replicated();
+ //Must execute outside of lock
+ if (result != null)
+ {
+ result.replicated();
+ }
}
public void setHandler(final ChannelHandler handler)
@@ -1278,7 +1309,7 @@
{
response = packet;
- checkConfirmation(packet);
+ confirm(packet);
lock.lock();
@@ -1293,14 +1324,8 @@
}
else if (handler != null)
{
- checkConfirmation(packet);
-
handler.handlePacket(packet);
}
- else
- {
- checkConfirmation(packet);
- }
}
}
@@ -1313,7 +1338,7 @@
connection.transportConnection.write(buffer);
}
- private void checkConfirmation(final Packet packet)
+ public void confirm(final Packet packet)
{
if (resendCache != null && packet.isRequiresConfirmations())
{
@@ -1351,7 +1376,6 @@
if (packet == null)
{
- // report();
throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
" last received command id " +
lastReceivedCommandID +
@@ -1367,7 +1391,10 @@
connection.createdActive);
}
- sizeToFree += packet.getPacketSize();
+ if (packet.getType() != PACKETS_CONFIRMED)
+ {
+ sizeToFree += packet.getPacketSize();
+ }
}
firstStoredCommandID += numberToClear;
@@ -1461,4 +1488,18 @@
}
}
}
+
+ private class ReplicatingConnectionFailureListener implements FailureListener
+ {
+ public void connectionFailed(final MessagingException me)
+ {
+ synchronized (RemotingConnectionImpl.this)
+ {
+ for (Channel channel : channels.values())
+ {
+ channel.replicatingChannelDead();
+ }
+ }
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -95,11 +95,6 @@
return false;
}
- public boolean isWriteAlways()
- {
- return true;
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -224,6 +224,8 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-06 14:57:06 UTC (rev 5290)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -453,6 +453,8 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
}
@@ -558,7 +560,9 @@
}
}
- channel.send(response);
+ channel.confirm(packet);
+
+ channel.send(response);
}
@@ -677,6 +681,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -749,6 +755,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -816,6 +824,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -849,9 +859,7 @@
}
public void doHandleCreateProducer(final SessionCreateProducerMessage packet)
- {
- SimpleString address = packet.getAddress();
-
+ {
int maxRate = packet.getMaxRate();
boolean autoGroupID = packet.isAutoGroupId();
@@ -889,6 +897,8 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
}
@@ -961,6 +971,8 @@
}
}
+ channel.confirm(packet);
+
if (response != null)
{
channel.send(response);
@@ -1015,7 +1027,9 @@
{
tx = new TransactionImpl(storageManager, postOffice);
}
-
+
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1064,6 +1078,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1146,6 +1162,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1239,6 +1257,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1270,6 +1290,8 @@
Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1339,6 +1361,8 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
}
@@ -1421,6 +1445,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1503,6 +1529,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1572,6 +1600,8 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
}
@@ -1641,6 +1671,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1726,6 +1758,8 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
}
@@ -1755,6 +1789,8 @@
{
Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1783,6 +1819,8 @@
{
Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1811,6 +1849,8 @@
{
Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1894,6 +1934,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1952,6 +1994,8 @@
}
}
+ channel.confirm(packet);
+
channel.send(response);
}
@@ -1984,10 +2028,11 @@
//is being processed.
//Otherwise we can end up with start/stop being processed in different order on backup to live.
//Which can result in, say, a delivery arriving at backup, but it's still not started!
+ DelayedResult result = null;
try
{
- channel.replicatePacket(packet);
-
+ result = channel.replicatePacket(packet);
+
//note we process start before response is back from the backup
setStarted(true);
}
@@ -1998,6 +2043,22 @@
unlockConsumers();
}
}
+
+ if (result == null)
+ {
+ channel.confirm(packet);
+ }
+ else
+ {
+ //Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ channel.confirm(packet);
+ }
+ });
+ }
}
//TODO try removing the lock consumers and see what happens!!
@@ -2022,6 +2083,7 @@
if (result == null)
{
+ channel.confirm(packet);
// Not clustered - just send now
channel.send(response);
}
@@ -2031,6 +2093,8 @@
{
public void run()
{
+ channel.confirm(packet);
+
channel.send(response);
}
});
@@ -2109,6 +2173,8 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
@@ -2135,54 +2201,9 @@
ServerConsumer consumer = consumers.get(packet.getConsumerID());
- consumer.handleClose(packet);
-
-// DelayedResult result = channel.replicatePacket(packet);
-//
-// if (result == null)
-// {
-// doHandleCloseConsumer(packet);
-// }
-// else
-// {
-// //Don't process until result has come back from backup
-// result.setResultRunner(new Runnable()
-// {
-// public void run()
-// {
-// doHandleCloseConsumer(packet);
-// }
-// });
-// }
+ consumer.handleClose(packet);
}
- public void doHandleCloseConsumer(final SessionConsumerCloseMessage packet)
- {
- Packet response = null;
-
- try
- {
- consumers.get(packet.getConsumerID()).close();
-
- response = new NullResponseMessage();
- }
- catch (Exception e)
- {
- log.error("Failed to close", e);
-
- if (e instanceof MessagingException)
- {
- response = new MessagingExceptionMessage((MessagingException)e);
- }
- else
- {
- response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
- }
- }
-
- channel.send(response);
- }
-
public void handleCloseProducer(final SessionProducerCloseMessage packet)
{
DelayedResult result = channel.replicatePacket(packet);
@@ -2227,13 +2248,15 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
+
+ channel.confirm(packet);
channel.send(response);
}
public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
{
- channel.replicatePacket(packet);
+ DelayedResult result = channel.replicatePacket(packet);
try
{
@@ -2244,6 +2267,21 @@
{
log.error("Failed to receive credits", e);
}
+
+ if (result == null)
+ {
+ channel.confirm(packet);
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ channel.confirm(packet);
+ }
+ });
+ }
}
public void handleSendProducerMessage(final SessionSendMessage packet)
@@ -2327,6 +2365,8 @@
}
}
+ channel.confirm(packet);
+
if (response != null)
{
channel.send(response);
@@ -2409,6 +2449,8 @@
}
}
}
+
+ channel.confirm(packet);
if (response != null)
{
@@ -2493,6 +2535,8 @@
{
log.error("Failed to send management message", e);
}
+
+ channel.confirm(packet);
}
public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailBackupServerTest.java 2008-11-06 17:10:18 UTC (rev 5291)
@@ -0,0 +1,222 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A FailBackupServerTest
+ *
+ * Make sure live sever continues ok if backup server fails
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 6 Nov 2008 11:27:17
+ *
+ *
+ */
+public class FailBackupServerTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(FailBackupServerTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testFailBackup() throws Exception
+ {
+ ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ sf1.setSendWindowSize(32 * 1024);
+
+ ClientSession session1 = sf1.createSession(false, true, true, false);
+
+ session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session1.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+
+ session1.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals("aardvarks", message.getBody().getString());
+
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ if (i == 0)
+ {
+ // Fail all the replicating connections - this simulates the backup server crashing
+
+ Set<RemotingConnection> conns = liveService.getServer().getRemotingService().getConnections();
+
+ for (RemotingConnection conn : conns)
+ {
+ log.info("Failing replicating connection");
+ conn.getReplicatingConnection().fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+ }
+ }
+
+ message.acknowledge();
+ }
+
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNull(message);
+
+ // Send some more
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ message = session1.createClientMessage(JBossTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals("aardvarks", message.getBody().getString());
+
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.acknowledge();
+ }
+
+ message = consumer1.receive(1000);
+
+ assertNull(message);
+
+ session1.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+ liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+ liveService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list