[jboss-cvs] JBoss Messaging SVN: r1556 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/util src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/ha
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 9 17:57:10 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-09 17:57:01 -0500 (Thu, 09 Nov 2006)
New Revision: 1556
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
Log:
AFter tests with Queues + round robbin of failed queues
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,18 +21,7 @@
*/
package org.jboss.jms.server.endpoint;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.delegate.BrowserDelegate;
@@ -68,7 +57,10 @@
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.util.id.GUID;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import java.util.*;
/**
* Concrete implementation of SessionEndpoint.
@@ -150,8 +142,18 @@
{
try
{
- ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
+ // this code needs to be transfered to PostOffices, JGroups fail detection
+ if (jmsDestination.isTopic())
+ {
+ ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
+ }
+ else
+ if (jmsDestination.isQueue())
+ {
+ ((ClusteredPostOffice)queuePostOffice).failOver(nodeId);
+ }
+
// fail over channel
PostOffice postOfficeToUse = null;
if (jmsDestination.isTopic())
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/MessagingJMSException.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -72,6 +72,8 @@
{
setLinkedException(new Exception(cause));
}
+
+ this.initCause(cause);
}
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/local/PagingFilteredQueue.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,20 +21,13 @@
*/
package org.jboss.messaging.core.local;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.PagingChannelSupport;
-import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.*;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.tx.Transaction;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
*
* A PagingFilteredQueue
@@ -137,7 +130,7 @@
public String toString()
{
- return "Queue[" + getChannelID() + "]";
+ return "Queue[" + getChannelID() + "/" + this.getName() + "]";
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,11 +21,11 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.Router;
+
import java.util.List;
-import org.jboss.messaging.core.Router;
-import org.jboss.messaging.core.Receiver;
-
/**
* A ClusterRouter
*
@@ -39,6 +39,8 @@
{
List getQueues();
+ List getFailedQueues();
+
ClusteredQueue getLocalQueue();
boolean add(Receiver receiver, boolean failedOver);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,18 +21,10 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.io.*;
-import java.util.*;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.*;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.contract.MessageStore;
@@ -43,20 +35,19 @@
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.StreamUtils;
-import org.jgroups.Address;
+import org.jgroups.*;
import org.jgroups.Channel;
-import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
import org.jgroups.Message;
-import org.jgroups.MessageListener;
import org.jgroups.Receiver;
-import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
import org.w3c.dom.Element;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import java.io.*;
+import java.util.*;
/**
*
@@ -1108,6 +1099,11 @@
{
log.info("Preparing failover against node " + nodeId);
Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
+ if (subMaps==null || subMaps.size()==0)
+ {
+ log.warn("Couldn't find any binding to failOver from serverId=" +nodeId);
+ return;
+ }
ArrayList namesToRemove = new ArrayList();
for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
{
@@ -1157,7 +1153,6 @@
clusteredQueue.load();
clusteredQueue.activate();
addBinding(newBinding);
- System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
}
}
@@ -1218,6 +1213,7 @@
PrintWriter out = new PrintWriter(buffer);
out.print(super.printBindingInformation());
+ out.println("<br>FailOver bindings");
out.println("<table border=1><tr><td>Node</td><td>ChannelID</td><td>Binding</td>");
for (Iterator iter = this.failedBindings.entrySet().iterator(); iter.hasNext();)
@@ -1245,21 +1241,44 @@
out.println("<br>Router Information");
+ out.println("<table border=1><tr><td>Queue Route</td><td>Local Queue</td><td>Elements</td></tr>");
+
for (Iterator iterRouter = routerMap.entrySet().iterator();iterRouter.hasNext();)
{
Map.Entry entry = (Map.Entry)iterRouter.next();
ClusterRouter router = (ClusterRouter)entry.getValue();
- out.println("<br> queue " + entry.getKey() + " being routed to:");
- out.println("<br> LocalQueue = " + router.getLocalQueue());
+ out.println("<tr><td>" + entry.getKey() + "</td><td>" + router.getLocalQueue() + "</td>");
+ out.println("<td>");
+
+ out.println("<table border=1>");
+
+ if (!router.getFailedQueues().isEmpty())
+ {
+ out.println("<tr><td><b>Failed Over Queues</b></td><</tr>");
+ for (Iterator queuesIterator = router.getFailedQueues().iterator();queuesIterator.hasNext();)
+ {
+ Object queue = queuesIterator.next();
+ out.println("<tr><td>" + queue + "</td></tr>");
+ }
+ }
+
+ out.println("<tr><td><b>Queues</b></td><</tr>");
+
for (Iterator queuesIterator = router.getQueues().iterator();queuesIterator.hasNext();)
{
Object queueRouted = queuesIterator.next();
- out.println("<br> RoutedQueue=" + queueRouted + " class=" + queueRouted.getClass().getName());
+ out.println("<tr><td>" + queueRouted + "</td></tr>");
}
+ out.println("</table>");
+
+ out.println("</td></tr>");
+
}
+ out.println("</table>");
+
return buffer.toString();
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,10 +21,6 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
@@ -32,6 +28,10 @@
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.tx.Transaction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
/**
*
* A DefaultRouter
@@ -166,8 +166,32 @@
{
if (trace) { log.trace(this + " routing ref " + reference); }
- //Favour the local queue
+ //Favour the local queue or the failedOver queue in round robbin
+ if (!failedOverQueues.isEmpty())
+ {
+
+ if (trace) { log.trace("Round robbing on FailedOver queue, currentTarget=" + target);}
+ LocalClusteredQueue queueToUse = null;
+
+ if (target==-1)
+ {
+ queueToUse = (LocalClusteredQueue)this.localQueue;
+ }
+ else
+ {
+ queueToUse = (LocalClusteredQueue)failedOverQueues.get(target);
+ }
+
+ incTargetFailedOver();
+
+ Delivery del = queueToUse.handle(observer, reference, tx);
+
+ if (trace) { log.trace(this+" routed to failed queue, using failedOver round robbing, returned " + del); }
+
+ return del;
+ }
+ else
if (localQueue != null)
{
//The only time the local queue won't accept is if the selector doesn't
@@ -207,6 +231,17 @@
return null;
}
+ private void incTargetFailedOver()
+ {
+ target++;
+
+ if (target == failedOverQueues.size())
+ {
+ target = -1; // use the local queue
+ }
+ }
+
+
private void incTarget()
{
target++;
@@ -217,6 +252,12 @@
}
}
+
+ public java.util.List getFailedQueues()
+ {
+ return failedOverQueues;
+ }
+
public List getQueues()
{
List queues = new ArrayList();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -21,17 +21,9 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.*;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
@@ -40,8 +32,6 @@
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.Future;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
*
* A LocalClusteredQueue
@@ -141,6 +131,12 @@
{
return nodeId;
}
+
+
+ public String toString()
+ {
+ return "LocalClusteredQueue[" + this.getChannelID() + "/" + this.getName() +"]";
+ }
/*
* This is the same as the normal handle() method on the Channel except it doesn't
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java 2006-11-03 23:27:52 UTC (rev 1555)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java 2006-11-09 22:57:01 UTC (rev 1556)
@@ -1,22 +1,17 @@
package org.jboss.test.messaging.core.ha;
import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossMessageConsumer;
import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.ConsumerState;
import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.client.state.ConsumerState;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.message.TextMessageProxy;
-import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
-import org.jboss.test.messaging.tools.ServerManagement;
-
import javax.jms.*;
-import javax.management.ObjectName;
/** Start two JBoss instances (clustered) to run these tests.
* */
@@ -141,62 +136,6 @@
assertNotNull(consumer.receive(2000));
}
- public void testDurableTopicCluster() throws Exception
- {
- log.info("++testDurableTopicCluster");
-
- log.info(">>Lookup Queue");
- Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
-
- JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
- connFirstServer.setClientID("test");
- connFirstServer.start();
- JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer consumer = sessionFirstServer.createDurableSubscriber((Topic)destination,"test");
-
- MessageProducer producer = sessionFirstServer.createProducer(destination);
-
- for (int i=0;i<10;i++)
- {
- producer.send(sessionFirstServer.createTextMessage("Test" + i));
- }
-
- Object objectReceived=consumer.receive(5000);
- if (objectReceived!=null)
- {
- System.out.println("Object received=" + objectReceived);
- }
- assertNull(objectReceived);
-
- sessionFirstServer.commit();
-
-
- for (int i=0;i<5;i++)
- {
- assertNotNull(consumer.receive(1000));
- }
-
- sessionFirstServer.rollback();
- connFirstServer.close();
-
-
- JBossConnection connectionSecondServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
- connectionSecondServer.setClientID("test");
- connectionSecondServer.start();
-
- JBossSession sessionSecondServer = (JBossSession)connectionSecondServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
-
- consumer = sessionSecondServer.createDurableSubscriber((Topic)destination,"test");
-
- for (int i=0;i<10;i++)
- {
- assertNotNull(consumer.receive(1000));
- }
-
- assertNull(consumer.receive(1000));
- }
-
public void testTopicSubscriber() throws Exception
{
log.info("++testTopicSubscriber");
@@ -232,6 +171,9 @@
receiveMessage("consumerHA",consumerHA,true,false);
+ session.commit();
+ //if (true) return;
+
Object txID = sessionState.getCurrentTxId();
producer.send(session.createTextMessage("Hello again before failover"));
@@ -267,26 +209,117 @@
System.out.println("TransactionID on client = " + txID);
log.info(">>Final commit");
- JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+ /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
connSecondServer.start();
JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
+ MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
session.commit();
+ /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
- receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
- receiveMessage("consumerSecondServer",consumerSecondServer,true,true);
+ receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
log.info("Calling alternate receiver");
receiveMessage("consumerHA",consumerHA,true,false);
receiveMessage("consumerHA",consumerHA,true,false);
receiveMessage("consumerHA",consumerHA,true,true);
+
session.commit();
}
+ public void testQueueHA() throws Exception
+ {
+ log.info("++testTopicSubscriber");
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination)getCtx1().lookup("queue/testDistributedQueue");
+
+ log.info("Creating connection server1");
+ JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
+ conn.setClientID("testClient");
+ conn.start();
+
+ JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+ ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
+ SessionState sessionState = (SessionState)clientSessionDelegate.getState();
+
+ MessageConsumer consumerHA = session.createConsumer(destination);
+ JBossMessageConsumer jbossConsumerHA =(JBossMessageConsumer)consumerHA;
+
+ org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate)jbossConsumerHA.getDelegate();
+ ConsumerState consumerState = (ConsumerState)clientDelegate.getState();
+
+ log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+ log.info(">>Creating Producer");
+ MessageProducer producer = session.createProducer(destination);
+ log.info(">>creating Message");
+ Message message = session.createTextMessage("Hello Before");
+ log.info(">>sending Message");
+ producer.send(message);
+ session.commit();
+
+ session.commit();
+ //if (true) return;
+
+ Object txID = sessionState.getCurrentTxId();
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+
+ JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+ log.info(">>Creating alternate connection");
+ JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+ log.info("NewConnectionCreated=" + conn2);
+
+ log.info(">>Failling over");
+ assertSame(originalRemoting,delegate.getRemotingConnection());
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+ try {
+ originalRemoting.stop();
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+
+ assertNotSame(originalRemoting,delegate.getRemotingConnection());
+
+ //System.out.println("Kill server1"); Thread.sleep(10000);
+ assertEquals(txID,sessionState.getCurrentTxId());
+ System.out.println("TransactionID on client = " + txID);
+ log.info(">>Final commit");
+
+ session.commit();
+
+ log.info("Calling alternate receiver");
+ receiveMessage("consumerHA",consumerHA,true,false);
+ receiveMessage("consumerHA",consumerHA,true,true);
+
+ session.commit();
+
+ for (int i=0;i<30;i++)
+ {
+ log.info("Message Sent " + i);
+ producer.send(session.createTextMessage("Message " + i));
+ }
+ session.commit();
+
+ Thread.sleep(5000);
+
+ TextMessage messageLoop = null;
+ while (!((messageLoop = (TextMessage) consumerHA.receive(5000)) == null))
+ {
+ log.info("Message received = " + messageLoop.getText());
+ }
+
+ }
+
+
private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
{
MessageProxy message = (MessageProxy)consumer.receive(3000);
More information about the jboss-cvs-commits
mailing list