[jboss-cvs] JBoss Messaging SVN: r1769 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 12 04:11:54 EST 2006
Author: timfox
Date: 2006-12-12 04:11:39 -0500 (Tue, 12 Dec 2006)
New Revision: 1769
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
branches/Branch_Client_Failover_Experiment/tests/build.xml
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Final commit on this branch
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -49,6 +49,7 @@
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
+import org.jboss.jms.tx.AckInfo;
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.ConnectionListener;
@@ -328,8 +329,23 @@
failedSessionDelegate.copyState(newSessionDelegate);
log.info("copied state");
+
+ //Now we remove any unacked np messages - this is because we don't want to ack them
+ //since the server won't know about them and will barf
+ Iterator iter = failedSessionState.getToAck().iterator();
+
+ while (iter.hasNext())
+ {
+ AckInfo info = (AckInfo)iter.next();
+
+ if (!info.getMessage().getMessage().isReliable())
+ {
+ iter.remove();
+ }
+ }
+
+ //TODO remove any unacked from the resource manager
-
if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
//TODO Clebert please add comment as to why this clone is necessary
@@ -359,8 +375,32 @@
handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
}
}
+
+ /* Now we must sent the list of unacked AckInfos to the server - so the consumers
+ * delivery lists can be repopulated
+ */
+ List ackInfos = null;
+
+ if (!failedSessionState.isTransacted())
+ {
+ //Get the ack infos from the list in the session state
+ ackInfos = failedSessionState.getToAck();
+ }
+ else
+ {
+ //Transacted session - we need to get the acks
+ //TODO
+ }
+
+ //TODO for a transacted session the ackinfos will be in the resource manager!!
+
+ if (!ackInfos.isEmpty())
+ {
+ newSessionDelegate.sendUnackedAckInfos(ackInfos);
+ }
+
}
-
+
//We must not start the connection until the end
if (failedState.isStarted())
{
@@ -369,6 +409,8 @@
log.info("Failover done");
}
+
+
private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
ConnectionState failedConnectionState,
@@ -387,11 +429,11 @@
ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
- failedConsumerState.getSelector(),
- failedConsumerState.isNoLocal(),
- failedConsumerState.getSubscriptionName(),
- failedConsumerState.isConnectionConsumer(),
- failedConsumerDelegate.getChannelId());
+ failedConsumerState.getSelector(),
+ failedConsumerState.isNoLocal(),
+ failedConsumerState.getSubscriptionName(),
+ failedConsumerState.isConnectionConsumer(),
+ failedConsumerDelegate.getChannelId());
if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
@@ -418,6 +460,9 @@
MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
handler.setConsumerId(failedConsumerState.getConsumerID());
+
+ //Clear the buffer of the handler
+ handler.clearBuffer();
cm.registerHandler(failedConnectionState.getServerID(),
failedConsumerState.getConsumerID(),
@@ -426,8 +471,8 @@
failedSessionState.addCallbackHandler(handler);
log.info("failed over consumer");
-
}
+
private void handleFailoverOnProducer(ProducerState failedProducerState,
ClientSessionDelegate failedSessionDelegate)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -441,6 +441,15 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
+ public void sendUnackedAckInfos(List ackInfos) throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Public --------------------------------------------------------
@@ -457,6 +466,7 @@
return ((ConnectionState)state.getParent()).getRemotingConnection().getInvokingClient();
}
+
// Package Private -----------------------------------------------
// Private -------------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -514,6 +514,11 @@
}
}
+ public void clearBuffer()
+ {
+ buffer.clear();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -48,7 +48,6 @@
*/
public class SessionState extends HierarchicalStateSupport
{
-
protected static Logger log = Logger.getLogger(SessionState.class);
private int acknowledgeMode;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -31,9 +31,9 @@
import org.jboss.jms.selector.Selector;
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
/**
* Concrete implementation of BrowserEndpoint.
@@ -66,7 +66,7 @@
// Constructors --------------------------------------------------
protected ServerBrowserEndpoint(ServerSessionEndpoint session, int id,
- PagingFilteredQueue destination, String messageSelector)
+ Channel destination, String messageSelector)
throws JMSException
{
this.session = session;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -42,6 +42,7 @@
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.MessageReference;
@@ -49,7 +50,6 @@
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.Routable;
import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.tx.Transaction;
@@ -87,7 +87,7 @@
private int id;
- private PagingFilteredQueue messageQueue;
+ private Channel messageQueue;
private String queueName;
@@ -131,7 +131,7 @@
// Constructors --------------------------------------------------
- protected ServerConsumerEndpoint(int id, PagingFilteredQueue messageQueue, String queueName,
+ protected ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint,
String selector, boolean noLocal, JBossDestination dest,
int prefetchSize, Queue dlq)
@@ -633,7 +633,24 @@
}
}
-
+
+ protected void createDeliveries(List messageIds) throws Throwable
+ {
+ List dels = messageQueue.createDeliveries(messageIds);
+
+ synchronized (lock)
+ {
+ Iterator iter = dels.iterator();
+
+ while (iter.hasNext())
+ {
+ Delivery del = (Delivery)iter.next();
+
+ deliveries.put(new Long(del.getReference().getMessageID()), del);
+ }
+ }
+ }
+
protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
{
Delivery del = (Delivery)deliveries.remove(messageID);
@@ -653,7 +670,7 @@
throw new IllegalStateException("Cannot find delivery to cancel:" + id);
}
}
-
+
protected void start()
{
synchronized (lock)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -186,11 +187,11 @@
int prefetchSize = connectionEndpoint.getPrefetchSize();
ServerConsumerEndpoint ep =
- new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
+
+ new ServerConsumerEndpoint(consumerID, binding.getQueue(),
binding.getQueue().getName(), this, selectorString, noLocal,
jmsDestination, prefetchSize, dlq);
-
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
ClientConsumerDelegate stub =
@@ -755,6 +756,56 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDeliveries");
}
}
+
+ public void sendUnackedAckInfos(List ackInfos) throws JMSException
+ {
+ try
+ {
+ //Sort into different list for each consumer
+ Map ackMap = new HashMap();
+
+ for (int i = ackInfos.size() - 1; i >= 0; i--)
+ {
+ AckInfo ack = (AckInfo)ackInfos.get(i);
+
+ ServerConsumerEndpoint consumer =
+ this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
+ }
+
+ LinkedList acks = (LinkedList)ackMap.get(consumer);
+
+ if (acks == null)
+ {
+ acks = new LinkedList();
+
+ ackMap.put(consumer, acks);
+ }
+
+ acks.addFirst(new Long(ack.getMessageID()));
+ }
+
+ Iterator iter = ackMap.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)entry.getKey();
+
+ List acks = (List)entry.getValue();
+
+ consumer.createDeliveries(acks);
+ }
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " sendUnackedAckInfos");
+ }
+ }
public void addTemporaryDestination(JBossDestination dest) throws JMSException
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -120,5 +120,14 @@
* @param ackInfos
*/
void cancelDeliveries(List ackInfos) throws JMSException;
+
+
+ /**
+ * Send a list of unacked ackInfos to the server so the delivery lists can be repopulated
+ * used at failover
+ * @param ackInfos
+ * @throws JMSException
+ */
+ void sendUnackedAckInfos(List ackInfos) throws JMSException;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -147,6 +147,11 @@
{
endpoint.cancelDeliveries(ackInfos);
}
+
+ public void sendUnackedAckInfos(List ackInfos) throws JMSException
+ {
+ endpoint.sendUnackedAckInfos(ackInfos);
+ }
// AdvisedSupport overrides --------------------------------------
@@ -161,6 +166,7 @@
return "SessionAdvised->" + endpoint;
}
+
// Public --------------------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -37,8 +37,8 @@
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.client.remoting.HandleMessageResponse;
-import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.Version;
@@ -95,6 +95,7 @@
protected static final byte MORE = 5;
protected static final byte SEND_TRANSACTION = 6;
protected static final byte GET_ID_BLOCK = 7;
+ protected static final byte UNACKED_ACKINFOS = 8;
// The response codes - start from 100
@@ -312,6 +313,28 @@
if (trace) { log.trace("wrote cancelDeliveries()"); }
}
+ else if ("sendUnackedAckInfos".equals(methodName) && mi.getArguments() != null)
+ {
+ dos.writeByte(UNACKED_ACKINFOS);
+
+ writeHeader(mi, dos);
+
+ List ids = (List)mi.getArguments()[0];
+
+ dos.writeInt(ids.size());
+
+ Iterator iter = ids.iterator();
+
+ while (iter.hasNext())
+ {
+ AckInfo ack = (AckInfo)iter.next();
+ ack.write(dos);
+ }
+
+ dos.flush();
+
+ if (trace) { log.trace("wrote sendUnackedAckInfos()"); }
+ }
else
{
dos.write(SERIALIZED);
@@ -700,6 +723,35 @@
return request;
}
+ case UNACKED_ACKINFOS:
+ {
+ MethodInvocation mi = readHeader(dis);
+
+ int size = dis.readInt();
+
+ List acks = new ArrayList(size);
+
+ for (int i = 0; i < size; i++)
+ {
+ AckInfo ack = new AckInfo();
+
+ ack.read(dis);
+
+ acks.add(ack);
+ }
+
+ Object[] args = new Object[] {acks};
+
+ mi.setArguments(args);
+
+ InvocationRequest request =
+ new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+ new MessagingMarshallable(version, mi), null, null, null);
+
+ if (trace) { log.trace("read unackedAckInfos()"); }
+
+ return request;
+ }
case ID_BLOCK_RESPONSE:
{
IdBlock block = new IdBlock();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -86,7 +86,7 @@
/** Used to change ack's id during failover */
public void setConsumerID(int consumerID)
{
- this.consumerID=consumerID;
+ this.consumerID = consumerID;
}
public MessageProxy getMessage()
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -145,6 +145,8 @@
void deactivate();
boolean isActive();
+
+ List createDeliveries(List messageIds);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -491,6 +491,58 @@
}
}
}
+
+ public List createDeliveries(List messageIds)
+ {
+ //debug
+ Iterator iter = messageIds.iterator();
+
+ log.info("***** createdeliveries");
+ while (iter.hasNext())
+ {
+ Long l = (Long)iter.next();
+
+ log.info("Creating delivery for " + l);
+ }
+ log.info("**** end dump");
+
+ iter = messageIds.iterator();
+
+ List dels = new ArrayList();
+
+ synchronized (refLock)
+ {
+ ListIterator liter = messageRefs.iterator();
+
+ while (iter.hasNext())
+ {
+ Long id = (Long)iter.next();
+
+ if (!liter.hasNext())
+ {
+ throw new IllegalStateException("Cannot find ref in queue! (Might be paged!)");
+ }
+
+ MessageReference ref = (MessageReference)liter.next();
+
+ if (ref.getMessageID() == id.longValue())
+ {
+ liter.remove();
+
+ Delivery del = new SimpleDelivery(this, ref);
+
+ dels.add(del);
+
+ this.deliveries.add(del);
+
+ }
+ }
+ }
+
+ //TODO we need to look in paging state too - currently not supported
+
+ return dels;
+ }
// Public --------------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -272,4 +272,9 @@
{
return "RemoteQueueStub(node=" + this.nodeId + " name=" + this.name + " channelId=" + this.id + ")";
}
+
+ public List createDeliveries(List messageIds)
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/tests/build.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/build.xml 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/build.xml 2006-12-12 09:11:39 UTC (rev 1769)
@@ -745,7 +745,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${build.tests.classes}">
- <include name="**/jms/clustering/*Test.class"/>
+ <include name="**/jms/clustering/HATest.class"/>
<!--
<include name="**/jms/clustering/SimpleClusteringTest.class"/>
-->
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -216,6 +216,11 @@
throw new UnsupportedOperationException();
}
+ public List createDeliveries(List messageIds)
+ {
+ throw new UnsupportedOperationException();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -369,6 +369,12 @@
// TODO Auto-generated method stub
return false;
}
+
+ public List createDeliveries(List messageIds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Filter;
@@ -608,6 +609,12 @@
// TODO Auto-generated method stub
return false;
}
+
+ public List createDeliveries(List messageIds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 09:11:39 UTC (rev 1769)
@@ -26,6 +26,7 @@
import java.util.Set;
import javax.jms.Connection;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -69,355 +70,661 @@
}
// Public --------------------------------------------------------
+//
+// /*
+// * Test that connections created using a clustered connection factory are created round robin on
+// * different servers
+// */
+// public void testRoundRobinConnectionCreation() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// log.info ("number of delegates = " + delegate.getDelegates().length);
+// log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
+//
+// assertEquals(3, delegate.getDelegates().length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//
+// assertEquals(0, cf1.getServerId());
+//
+// assertEquals(1, cf2.getServerId());
+//
+// assertEquals(2, cf3.getServerId());
+//
+// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//
+// Connection conn1 = null;
+//
+// Connection conn2 = null;
+//
+// Connection conn3 = null;
+//
+// Connection conn4 = null;
+//
+// Connection conn5 = null;
+//
+// try
+// {
+// conn1 = factory.createConnection();
+//
+// conn2 = factory.createConnection();
+//
+// conn3 = factory.createConnection();
+//
+// conn4 = factory.createConnection();
+//
+// conn5 = factory.createConnection();
+//
+// ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+//
+// ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+//
+// ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+//
+// ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+//
+// ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+//
+// int serverID1 = state1.getServerID();
+//
+// int serverID2 = state2.getServerID();
+//
+// int serverID3 = state3.getServerID();
+//
+// int serverID4 = state4.getServerID();
+//
+// int serverID5 = state5.getServerID();
+//
+// log.info("server id 1: " + serverID1);
+//
+// log.info("server id 2: " + serverID2);
+//
+// log.info("server id 3: " + serverID3);
+//
+// log.info("server id 4: " + serverID4);
+//
+// log.info("server id 5: " + serverID5);
+//
+// assertEquals(0, serverID1);
+//
+// assertEquals(1, serverID2);
+//
+// assertEquals(2, serverID3);
+//
+// assertEquals(0, serverID4);
+//
+// assertEquals(1, serverID5);
+// }
+// finally
+// {
+// if (conn1 != null)
+// {
+// conn1.close();
+// }
+//
+// if (conn2 != null)
+// {
+// conn2.close();
+// }
+//
+// if (conn3 != null)
+// {
+// conn3.close();
+// }
+//
+// if (conn4 != null)
+// {
+// conn4.close();
+// }
+//
+// if (conn5 != null)
+// {
+// conn5.close();
+// }
+// }
+//
+// }
+//
+// /*
+// * Test that the failover mapping is created correctly and updated properly when nodes leave
+// * or join
+// */
+// public void testDefaultFailoverMap() throws Exception
+// {
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//
+// //The order here depends on the order the servers were started in
+//
+// //If any servers get stopped and then started then the order will change
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// log.info("cf3 serverid=" + cf3.getServerId());
+//
+//
+// assertEquals(0, cf1.getServerId());
+//
+// assertEquals(1, cf2.getServerId());
+//
+// assertEquals(2, cf3.getServerId());
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// assertEquals(3, delegates.length);
+//
+// assertEquals(3, failoverMap.size());
+//
+// // Default failover policy just chooses the node to the right
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+// }
+//
+// //Now cleanly stop one of the servers
+//
+// log.info("************** STOPPING SERVER 0");
+// ServerManagement.stop(0);
+//
+// log.info("server stopped");
+//
+// assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
+//
+// {
+// //Lookup another connection factory
+//
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//
+// log.info("Got connection factory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info("Got failover map");
+//
+// assertEquals(2, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// //Order here depends on order servers were started in
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// assertEquals(1, cf1.getServerId());
+//
+// assertEquals(2, cf2.getServerId());
+//
+//
+// assertEquals(2, failoverMap.size());
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+// }
+//
+// //Cleanly stop another server
+//
+// log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+//
+// ServerManagement.stop(1);
+//
+// assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
+//
+// {
+// //Lookup another connection factory
+//
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// assertEquals(1, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// assertEquals(2, cf1.getServerId());
+//
+//
+// assertEquals(1, failoverMap.size());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+// }
+//
+// //Restart server 0
+//
+// ServerManagement.start("all", 0);
+//
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// log.info("Got connection factory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info("Got failover map");
+//
+// assertEquals(2, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// assertEquals(2, cf1.getServerId());
+//
+// assertEquals(0, cf2.getServerId());
+//
+//
+// assertEquals(2, failoverMap.size());
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+// }
+//
+//
+// //Restart server 1
+//
+// ServerManagement.start("all", 1);
+//
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//
+// log.info("Got connection factory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info("Got failover map");
+//
+// assertEquals(3, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// log.info("cf3 serverid=" + cf3.getServerId());
+//
+// assertEquals(2, cf1.getServerId());
+//
+// assertEquals(0, cf2.getServerId());
+//
+// assertEquals(1, cf3.getServerId());
+//
+//
+// assertEquals(3, failoverMap.size());
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+// }
+// }
+//
+// public void testSimpleFailover() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+// assertEquals(3, nodeIDView.size());
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// ClientConnectionFactoryDelegate cf1 = delegates[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegates[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegates[2];
+//
+// int server0Id = cf1.getServerId();
+//
+// int server1Id = cf2.getServerId();
+//
+// int server2Id = cf3.getServerId();
+//
+// log.info("server 0 id: " + server0Id);
+//
+// log.info("server 1 id: " + server1Id);
+//
+// log.info("server 2 id: " + server2Id);
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info(failoverMap.get(new Integer(server0Id)));
+// log.info(failoverMap.get(new Integer(server1Id)));
+// log.info(failoverMap.get(new Integer(server2Id)));
+//
+// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//
+// // server 1 should failover onto server 2
+//
+// assertEquals(server2Id, server1FailoverId);
+//
+// Connection conn = null;
+//
+// try
+// {
+//
+// //Get a connection on server 1
+// conn = factory.createConnection(); //connection on server 0
+//
+// conn.close();
+//
+// conn = factory.createConnection(); //connection on server 1
+//
+// JBossConnection jbc = (JBossConnection)conn;
+//
+// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//
+// ConnectionState state = (ConnectionState)del.getState();
+//
+// int initialServerID = state.getServerID();
+//
+// assertEquals(1, initialServerID);
+//
+// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// MessageProducer prod = sess.createProducer(queue1);
+//
+// MessageConsumer cons = sess.createConsumer(queue1);
+//
+// final int NUM_MESSAGES = 100;
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = sess.createTextMessage("message:" + i);
+//
+// prod.send(tm);
+// }
+//
+// //So now, messages should be in queue1 on server 1
+// //So we now kill server 1
+// //Which should cause transparent failover of connection conn onto server 1
+//
+// log.info("************ KILLING (CRASHING) SERVER 1");
+//
+// ServerManagement.getServer(1).destroy();
+//
+// log.info("killed server, now waiting");
+//
+// Thread.sleep(5000);
+//
+// log.info("done wait");
+//
+// state = (ConnectionState)del.getState();
+//
+// int finalServerID = state.getServerID();
+//
+// log.info("final server id= " + finalServerID);
+//
+// //server id should now be 2
+//
+// assertEquals(2, finalServerID);
+//
+// conn.start();
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = (TextMessage)cons.receive(1000);
+//
+// log.info("message is " + tm);
+//
+// assertNotNull(tm);
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+// log.info("done");
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// try
+// {
+// conn.close();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// }
- /*
- * Test that connections created using a clustered connection factory are created round robin on
- * different servers
- */
- public void testRoundRobinConnectionCreation() throws Exception
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- log.info ("number of delegates = " + delegate.getDelegates().length);
- log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
-
- assertEquals(3, delegate.getDelegates().length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-
- assertEquals(0, cf1.getServerId());
-
- assertEquals(1, cf2.getServerId());
-
- assertEquals(2, cf3.getServerId());
-
- assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-
- Connection conn1 = null;
-
- Connection conn2 = null;
-
- Connection conn3 = null;
-
- Connection conn4 = null;
-
- Connection conn5 = null;
-
- try
- {
- conn1 = factory.createConnection();
-
- conn2 = factory.createConnection();
-
- conn3 = factory.createConnection();
-
- conn4 = factory.createConnection();
-
- conn5 = factory.createConnection();
-
- ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-
- ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-
- ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-
- ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-
- ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-
- int serverID1 = state1.getServerID();
-
- int serverID2 = state2.getServerID();
-
- int serverID3 = state3.getServerID();
-
- int serverID4 = state4.getServerID();
-
- int serverID5 = state5.getServerID();
-
- log.info("server id 1: " + serverID1);
-
- log.info("server id 2: " + serverID2);
-
- log.info("server id 3: " + serverID3);
-
- log.info("server id 4: " + serverID4);
-
- log.info("server id 5: " + serverID5);
-
- assertEquals(0, serverID1);
-
- assertEquals(1, serverID2);
-
- assertEquals(2, serverID3);
-
- assertEquals(0, serverID4);
-
- assertEquals(1, serverID5);
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
-
- if (conn4 != null)
- {
- conn4.close();
- }
-
- if (conn5 != null)
- {
- conn5.close();
- }
- }
-
- }
-
- /*
- * Test that the failover mapping is created correctly and updated properly when nodes leave
- * or join
- */
- public void testDefaultFailoverMap() throws Exception
- {
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-
- //The order here depends on the order the servers were started in
-
- //If any servers get stopped and then started then the order will change
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- log.info("cf3 serverid=" + cf3.getServerId());
-
-
- assertEquals(0, cf1.getServerId());
-
- assertEquals(1, cf2.getServerId());
-
- assertEquals(2, cf3.getServerId());
-
- Map failoverMap = delegate.getFailoverMap();
-
- assertEquals(3, delegates.length);
-
- assertEquals(3, failoverMap.size());
-
- // Default failover policy just chooses the node to the right
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
- }
-
- //Now cleanly stop one of the servers
-
- log.info("************** STOPPING SERVER 0");
- ServerManagement.stop(0);
-
- log.info("server stopped");
-
- assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
-
- {
- //Lookup another connection factory
-
- JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-
- log.info("Got connection factory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- log.info("Got failover map");
-
- assertEquals(2, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- //Order here depends on order servers were started in
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- assertEquals(1, cf1.getServerId());
-
- assertEquals(2, cf2.getServerId());
-
-
- assertEquals(2, failoverMap.size());
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
- }
-
- //Cleanly stop another server
-
- log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-
- ServerManagement.stop(1);
-
- assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
-
- {
- //Lookup another connection factory
-
- JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- assertEquals(1, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- assertEquals(2, cf1.getServerId());
-
-
- assertEquals(1, failoverMap.size());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
- }
-
- //Restart server 0
-
- ServerManagement.start("all", 0);
-
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-
- log.info("Got connection factory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- log.info("Got failover map");
-
- assertEquals(2, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- assertEquals(2, cf1.getServerId());
-
- assertEquals(0, cf2.getServerId());
-
-
- assertEquals(2, failoverMap.size());
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
- }
-
-
- //Restart server 1
-
- ServerManagement.start("all", 1);
-
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-
- log.info("Got connection factory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- log.info("Got failover map");
-
- assertEquals(3, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- log.info("cf3 serverid=" + cf3.getServerId());
-
- assertEquals(2, cf1.getServerId());
-
- assertEquals(0, cf2.getServerId());
-
- assertEquals(1, cf3.getServerId());
-
-
- assertEquals(3, failoverMap.size());
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
- }
- }
- public void testSimpleFailover() throws Exception
+// public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+// assertEquals(3, nodeIDView.size());
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// ClientConnectionFactoryDelegate cf1 = delegates[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegates[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegates[2];
+//
+// int server0Id = cf1.getServerId();
+//
+// int server1Id = cf2.getServerId();
+//
+// int server2Id = cf3.getServerId();
+//
+// log.info("server 0 id: " + server0Id);
+//
+// log.info("server 1 id: " + server1Id);
+//
+// log.info("server 2 id: " + server2Id);
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info(failoverMap.get(new Integer(server0Id)));
+// log.info(failoverMap.get(new Integer(server1Id)));
+// log.info(failoverMap.get(new Integer(server2Id)));
+//
+// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//
+// // server 1 should failover onto server 2
+//
+// assertEquals(server2Id, server1FailoverId);
+//
+// Connection conn = null;
+//
+// try
+// {
+// //Get a connection on server 1
+// conn = factory.createConnection(); //connection on server 0
+//
+// conn.close();
+//
+// conn = factory.createConnection(); //connection on server 1
+//
+// JBossConnection jbc = (JBossConnection)conn;
+//
+// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//
+// ConnectionState state = (ConnectionState)del.getState();
+//
+// int initialServerID = state.getServerID();
+//
+// assertEquals(1, initialServerID);
+//
+// Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+//
+// MessageProducer prod = sess.createProducer(queue1);
+//
+// MessageConsumer cons = sess.createConsumer(queue1);
+//
+// final int NUM_MESSAGES = 100;
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = sess.createTextMessage("message:" + i);
+//
+// prod.send(tm);
+// }
+//
+// conn.start();
+//
+// //Now consume half of the messages but don't ack them these will end up in
+// //client side toAck list
+//
+// for (int i = 0; i < NUM_MESSAGES / 2; i++)
+// {
+// TextMessage tm = (TextMessage)cons.receive(500);
+//
+// assertNotNull(tm);
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+//
+// //So now, messages should be in queue1 on server 1
+// //So we now kill server 1
+// //Which should cause transparent failover of connection conn onto server 1
+//
+// log.info("************ KILLING (CRASHING) SERVER 1");
+//
+// ServerManagement.getServer(1).destroy();
+//
+// log.info("killed server, now waiting");
+//
+// Thread.sleep(5000);
+//
+// log.info("done wait");
+//
+// state = (ConnectionState)del.getState();
+//
+// int finalServerID = state.getServerID();
+//
+// log.info("final server id= " + finalServerID);
+//
+// //server id should now be 2
+//
+// assertEquals(2, finalServerID);
+//
+// conn.start();
+//
+// //Now should be able to consume the rest of the messages
+//
+// log.info("here1");
+//
+// TextMessage tm = null;
+//
+// for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+// {
+// tm = (TextMessage)cons.receive(500);
+//
+// log.info("message is " + tm.getText());
+//
+// assertNotNull(tm);
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+//
+// log.info("here2");
+//
+// //Now should be able to acknowledge them
+//
+// tm.acknowledge();
+//
+// //Now check there are no more messages there
+// sess.close();
+//
+// sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// cons = sess.createConsumer(queue1);
+//
+// Message m = cons.receive(500);
+//
+// assertNull(m);
+//
+// log.info("got to end of test");
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// try
+// {
+// conn.close();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// }
+
+ public void testFailoverWithUnackedMessagesTransactional() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
@@ -462,8 +769,7 @@
Connection conn = null;
try
- {
-
+ {
//Get a connection on server 1
conn = factory.createConnection(); //connection on server 0
@@ -481,7 +787,7 @@
assertEquals(1, initialServerID);
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer prod = sess.createProducer(queue1);
@@ -496,6 +802,22 @@
prod.send(tm);
}
+ sess.commit();
+
+ conn.start();
+
+ //Now consume half of the messages but don't commit them these will end up in
+ //client side resource manager
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+
//So now, messages should be in queue1 on server 1
//So we now kill server 1
//Which should cause transparent failover of connection conn onto server 1
@@ -522,17 +844,41 @@
conn.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
+ //Now should be able to consume the rest of the messages
+
+ log.info("here1");
+
+ TextMessage tm = null;
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
- TextMessage tm = (TextMessage)cons.receive(1000);
+ tm = (TextMessage)cons.receive(500);
+
+ log.info("message is " + tm.getText());
- log.info("message is " + tm);
-
assertNotNull(tm);
assertEquals("message:" + i, tm.getText());
}
- log.info("done");
+
+ log.info("here2");
+
+ //Now should be able to commit them
+
+ sess.commit();
+
+ //Now check there are no more messages there
+ sess.close();
+
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons = sess.createConsumer(queue1);
+
+ Message m = cons.receive(500);
+
+ assertNull(m);
+
+ log.info("got to end of test");
}
finally
{
@@ -551,6 +897,9 @@
}
+
+
+
// public void testEvenSimplerFailover() throws Exception
// {
// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
More information about the jboss-cvs-commits
mailing list