[jboss-cvs] JBoss Messaging SVN: r1510 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/state src/main/org/jboss/jms/message src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/tx src/main/org/jboss/jms/util src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/ha
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 20 20:34:08 EDT 2006
Author: clebert.suconic at jboss.com
Date: 2006-10-20 20:33:54 -0400 (Fri, 20 Oct 2006)
New Revision: 1510
Added:
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
Removed:
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 -ServerSide failover first iteration
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -141,7 +141,7 @@
// call pre or postDeliver so messages won't be acked, or stored in session/tx
sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
- cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true);
+ cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true,-1l);
}
finally
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -40,7 +40,7 @@
*
* $Id$
*/
-class JBossMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber, Serializable
+public class JBossMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber, Serializable
{
// Constants -----------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -252,7 +252,7 @@
tccc.set(getClass().getClassLoader());
ConsumerDelegate consumerDelegate = delegate.
- createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false);
+ createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false,-1l);
return new JBossMessageConsumer(consumerDelegate);
}
@@ -305,7 +305,7 @@
tccc.set(getClass().getClassLoader());
ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false);
+ delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false,-1l);
return new JBossMessageConsumer(consumerDelegate);
}
@@ -339,7 +339,7 @@
messageSelector = null;
}
ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false);
+ delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false,-1l);
return new JBossMessageConsumer(consumerDelegate);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -283,17 +283,19 @@
private void handleFailoverOnConsumer(ConnectionState connectionState,SessionState sessionState, ConsumerState consumerState, ClientSessionDelegate sessionDelegate, int oldServerId) throws JMSException
{
+ ClientConsumerDelegate currentConsumerDelegate = (ClientConsumerDelegate)consumerState.getDelegate();
+
if (trace)
{
log.trace("handleFailoverOnConsumer: creating alternate consumer");
}
- ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)sessionDelegate.createConsumerDelegate((JBossDestination) consumerState.getDestination(),consumerState.getSelector(),consumerState.isNoLocal(),consumerState.getSubscriptionName(),false);
+ ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)sessionDelegate.createConsumerDelegate((JBossDestination) consumerState.getDestination(),consumerState.getSelector(),consumerState.isNoLocal(),consumerState.getSubscriptionName(),false, currentConsumerDelegate.getChannelId());
if (trace)
{
log.trace("handleFailoverOnConsumer: alternate consumer created");
}
- ClientConsumerDelegate currentConsumerDelegate = (ClientConsumerDelegate)consumerState.getDelegate();
+
currentConsumerDelegate.transferHAState(newConsumerDelegate);
int oldConsumerId = consumerState.getConsumerID();
@@ -301,6 +303,8 @@
ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
consumerState.failOver(newState);
+ connectionState.getResourceManager().handleFailover(sessionState.getCurrentTxId(),oldConsumerId,consumerState.getConsumerID());
+
CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
MessageCallbackHandler handler = cm.unregisterHandler(oldServerId,oldConsumerId);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -92,7 +92,7 @@
//Now we have finished creating the client consumer, we can tell the SCD
//we are ready
consumerDelegate.more();
-
+
return consumerDelegate;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -52,20 +52,28 @@
protected int bufferSize;
+ protected long channelId;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ClientConsumerDelegate(int objectID, int bufferSize)
+ public ClientConsumerDelegate(int objectID, long channelId, int bufferSize)
{
super(objectID);
this.bufferSize = bufferSize;
+ this.channelId = channelId;
}
public ClientConsumerDelegate()
{
}
+ public long getChannelId()
+ {
+ return channelId;
+ }
+
// ConsumerDelegate implementation -------------------------------
/**
@@ -171,7 +179,7 @@
public String toString()
{
- return "ConsumerDelegate[" + id + "]";
+ return "ConsumerDelegate[" + id + "](ChannelId=" + this.channelId+")" ;
}
// Protected -----------------------------------------------------
@@ -184,9 +192,12 @@
}
+
+
public void transferHAState(DelegateSupport copyFrom)
{
super.transferHAState(copyFrom);
+ this.channelId = ((ClientConsumerDelegate)copyFrom).channelId;
this.getMetaData().removeMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID);
this.getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID,copyFrom.getMetaData().getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID), PayloadKey.TRANSIENT);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -186,7 +186,8 @@
*/
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer) throws JMSException
+ boolean connectionConsumer,
+ long oldChannelID) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -196,6 +196,7 @@
public void failOver(ConnectionState newState)
{
this.serverID = newState.serverID;
+ this.idGenerator = newState.idGenerator;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/message/MessageIdGenerator.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -55,6 +55,11 @@
protected ConnectionFactoryDelegate cfd;
+ public ConnectionFactoryDelegate getDelegate()
+ {
+ return cfd;
+ }
+
// Constructors --------------------------------------------------
public MessageIdGenerator(ConnectionFactoryDelegate cfd, int blockSize) throws JMSException
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -141,7 +141,7 @@
String selectorString,
boolean noLocal,
String subscriptionName,
- boolean isCC) throws JMSException
+ boolean isCC, long oldchannelID) throws JMSException
{
try
{
@@ -382,14 +382,18 @@
}
int prefetchSize = connectionEndpoint.getPrefetchSize();
-
+
+ if (oldchannelID>=0)
+ {
+ ((PagingFilteredQueue)binding.getQueue()).transferChannel(oldchannelID);
+ }
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
this, selectorString, noLocal, jmsDestination, prefetchSize);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
- ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize);
+ ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(), prefetchSize);
putConsumerEndpoint(consumerID, ep); // caching consumer locally
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -47,7 +47,7 @@
{
ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer) throws JMSException;
+ boolean connectionConsumer, long oldchannelID) throws JMSException;
BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
throws JMSException;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -85,9 +85,9 @@
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer) throws JMSException
+ boolean connectionConsumer,long oldchannelID) throws JMSException
{
- return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer);
+ return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer, oldchannelID);
}
public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -96,6 +96,12 @@
{
return consumerID;
}
+
+ /** Used to change ack's id during failover */
+ public void setConsumerID(int consumerID)
+ {
+ this.consumerID=consumerID;
+ }
public MessageProxy getMessage()
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -119,6 +119,16 @@
tx.getMessages().add(m);
}
+
+ /** Navigate on ACK and change clientIDs on every ACK not sent yet */
+ public void handleFailover(Object xid, int oldClientId, int newClientId)
+ {
+ if (trace) { log.trace("handleFailover:: Transfering clientIds on ACKs from " + oldClientId + " to " + newClientId); }
+
+ TxState tx = getTx(xid);
+ tx.handleFailover(oldClientId, newClientId);
+ }
+
/**
* Add an acknowledgement to the transaction
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -95,6 +95,21 @@
{
this.state = state;
}
+
+
+ /** Navigate on ACK and change clientIDs on every ACK not sent yet */
+ public void handleFailover(int oldClientId, int newClientId)
+ {
+ Iterator ackIterator = acks.iterator();
+ while (ackIterator.hasNext())
+ {
+ AckInfo ackInfo = (AckInfo)ackIterator.next();
+ if (ackInfo.getConsumerID()==oldClientId)
+ {
+ ackInfo.setConsumerID(newClientId);
+ }
+ }
+ }
// Streamable implementation ---------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/XMLUtil.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -64,7 +64,7 @@
if (Node.CDATA_SECTION_NODE == type)
{
- return "<![CDATA[" + n.getNodeValue() + "]]>";
+ return n.getNodeValue();
}
if (name.startsWith("#"))
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -532,7 +532,7 @@
ListIterator iter = null;
MessageReference ref = null;
-
+
while (true)
{
synchronized (refLock)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -245,22 +245,66 @@
{
firstPagingOrder = nextPagingOrder = 0;
}
-
- Map refMap = processReferences(ili.getRefInfos());
-
- Iterator iter = ili.getRefInfos().iterator();
- while (iter.hasNext())
- {
- ReferenceInfo info = (ReferenceInfo)iter.next();
- addFromRefInfo(info, refMap);
- }
+ pushReferences(ili);
+
+ //Maybe we need to load some paged refs
- //Maybe we need to load some paged refs
-
while (checkLoad()) {}
}
- }
+ }
+
+ /** We extracted this as a method fro mload, as transferChannel (for HA recovery) also needs the same routine. */
+ private Map pushReferences(InitialLoadInfo ili) throws Exception {
+ Map refMap = processReferences(ili.getRefInfos());
+
+ Iterator iter = ili.getRefInfos().iterator();
+ while (iter.hasNext())
+ {
+ ReferenceInfo info = (ReferenceInfo)iter.next();
+
+ addFromRefInfo(info, refMap);
+ }
+ return refMap;
+ }
+
+ /** Transfer messages for an old channel to a new channel.
+ * This is used during HA failoever when a connection fail and messages will need to be transfered to a new node */
+ public void transferChannel(long oldchannelID) throws Exception
+ {
+ log.info("Transfering state from " + oldchannelID +" into " + this.getChannelID());
+ synchronized (refLock)
+ {
+ while(true)
+ {
+ InitialLoadInfo ili =pm.getInitialReferenceInfos(oldchannelID,fullSize);
+ if (ili.getRefInfos().size()==0)
+ {
+ break;
+ }
+
+ log.info("got " + ili.getRefInfos().size() + " references to move");
+
+
+
+ Map refMap = pushReferences(ili);
+ Iterator referencesIterator = ili.getRefInfos().iterator();
+ while (referencesIterator.hasNext())
+ {
+ ReferenceInfo info = (ReferenceInfo)referencesIterator.next();
+ log.info("transfering reference " + info.getMessageId() + " from " + oldchannelID + " into " + this.getChannelID());
+ MessageReference messageReference = (MessageReference )refMap.get(new Long(info.getMessageId()));
+
+ ///// BIG TODOS:
+ ///// What to do with transaction here?
+ ///// Do we need to remove from old channel? (Consider the case of the Old Server coming back... I guess we should.. bu we have to check this)
+ pm.addReference(this.getChannelID(),messageReference,null);
+ pm.removeReference(oldchannelID,messageReference, null);
+ }
+ }
+ }
+ log.info("transfer state done");
+ }
public void unload() throws Exception
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -1486,6 +1486,8 @@
public void viewAccepted(View view)
{
if (trace) { log.trace(nodeId + " Got new view, size=" + view.size()); }
+
+ log.info("JBoss Messaging DefaultClusteredPostOffice Accepted new view:" + view);;
if (currentView != null)
{
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/HATestBase.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -50,8 +50,8 @@
protected Context ctx2;
- protected String NODE1 =System.getProperty("NODE1","localhost");
- protected String NODE2 =System.getProperty("NODE2","localhost");
+ protected String NODE1 =System.getProperty("NODE1","localhost:1199");
+ protected String NODE2 =System.getProperty("NODE2","localhost:1299");
public void setUp() throws Exception
{
@@ -76,7 +76,7 @@
contextProperties.put(Context.INITIAL_CONTEXT_FACTORY,
jndiProviderClass);
contextProperties.put(Context.PROVIDER_URL,
- "jnp://"+ host + ":1099");
+ "jnp://"+ host);
return new InitialContext(contextProperties);
}
Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectClusteredTest.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -0,0 +1,356 @@
+package org.jboss.test.messaging.core.ha;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.message.TextMessageProxy;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+
+import javax.jms.*;
+import javax.management.ObjectName;
+
+/** Start two JBoss instances (clustered) to run these tests.
+ * */
+public class ReconnectClusteredTest extends HATestBase
+{
+
+ public void testSimpleReconnect() throws Exception
+ {
+ JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+ ConnectionState state = (ConnectionState)delegate.getState();
+
+ assertFalse(state.isStarted());
+ conn.start();
+ assertTrue(state.isStarted());
+
+ JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+ conn.stop();
+ assertFalse(state.isStarted());
+
+ }
+
+ public void testSimpleReconnectWithClientID() throws Exception
+ {
+ JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
+ conn.setClientID("someClient");
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+ ConnectionState state = (ConnectionState)delegate.getState();
+
+ assertFalse(state.isStarted());
+ conn.start();
+ assertTrue(state.isStarted());
+
+ JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+ // force recovering the clientID from server
+ state.setClientID(null);
+ assertEquals ("someClient",conn.getClientID());
+
+ conn.stop();
+ assertFalse(state.isStarted());
+
+ }
+
+ public void testWithSession() throws Exception
+ {
+ JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
+ Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+ ConnectionState state = (ConnectionState)delegate.getState();
+
+ JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
+ conn.getDelegate().failOver(conn2.getDelegate());
+ }
+
+ public void testSimpleWithOneProducerOnTopic() throws Exception
+ {
+ JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
+ Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
+ MessageProducer producer = session.createProducer(destination);
+
+ Message message = session.createTextMessage("Hello Before");
+ producer.send(message);
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+ ConnectionState state = (ConnectionState)delegate.getState();
+
+ JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+ message = session.createTextMessage("Hello After");
+ producer.send(message);
+ }
+
+ public void testSimpleWithOneProducerOnQueue() throws Exception
+ {
+ JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
+ Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination destination = (Destination)getCtx1().lookup("queue/testQueue");
+ MessageProducer producer = session.createProducer(destination);
+
+ Message message = session.createTextMessage("Hello Before");
+ producer.send(message);
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+ ConnectionState state = (ConnectionState)delegate.getState();
+
+ JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+ message = session.createTextMessage("Hello After");
+ producer.send(message);
+ }
+
+ public void testTopicCluster() throws Exception
+ {
+ log.info("++testTopicCluster");
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
+
+ JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
+ connFirstServer.start();
+ JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+
+ JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+ connSecondServer.start();
+ JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = sessionFirstServer.createProducer(destination);
+
+ MessageConsumer consumer = sessionSecondServer.createConsumer(destination);
+
+ producer.send(sessionFirstServer.createTextMessage("Hello"));
+
+ assertNotNull(consumer.receive(2000));
+ }
+
+ public void testDurableTopicCluster() throws Exception
+ {
+ log.info("++testDurableTopicCluster");
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
+
+ JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
+ connFirstServer.setClientID("test");
+ connFirstServer.start();
+ JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = sessionFirstServer.createDurableSubscriber((Topic)destination,"test");
+
+ MessageProducer producer = sessionFirstServer.createProducer(destination);
+
+ for (int i=0;i<10;i++)
+ {
+ producer.send(sessionFirstServer.createTextMessage("Test" + i));
+ }
+
+ Object objectReceived=consumer.receive(5000);
+ if (objectReceived!=null)
+ {
+ System.out.println("Object received=" + objectReceived);
+ }
+ assertNull(objectReceived);
+
+ sessionFirstServer.commit();
+
+
+ for (int i=0;i<5;i++)
+ {
+ assertNotNull(consumer.receive(1000));
+ }
+
+ sessionFirstServer.rollback();
+ connFirstServer.close();
+
+
+ JBossConnection connectionSecondServer = (JBossConnection)this.factoryServer1.createConnection("guest","guest");
+ connectionSecondServer.setClientID("test");
+ connectionSecondServer.start();
+
+ JBossSession sessionSecondServer = (JBossSession)connectionSecondServer.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+ consumer = sessionSecondServer.createDurableSubscriber((Topic)destination,"test");
+
+ for (int i=0;i<10;i++)
+ {
+ assertNotNull(consumer.receive(1000));
+ }
+
+ assertNull(consumer.receive(1000));
+ }
+
+ public void testTopicSubscriber() throws Exception
+ {
+ log.info("++testSimpleWithOneProducerTransacted");
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination)getCtx1().lookup("topic/testDistributedTopic");
+
+ JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
+ connFirstServer.start();
+ JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+
+ log.info("Creating connection server1");
+ JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
+ conn.setClientID("testClient");
+ conn.start();
+
+ log.info("ConnectionCreated=" + conn);
+ log.info(">>Creating Sessions");
+
+ JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+ ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
+ SessionState sessionState = (SessionState)clientSessionDelegate.getState();
+ //MessageConsumer consumerHA = session.createConsumer(destination);
+
+ MessageConsumer consumerHA = session.createDurableSubscriber((Topic)destination,"T1");
+
+ log.info(">>Creating Producer");
+ MessageProducer producer = session.createProducer(destination);
+ log.info(">>creating Message");
+ Message message = session.createTextMessage("Hello Before");
+ log.info(">>sending Message");
+ producer.send(message);
+ session.commit();
+
+ assertNotNull(consumerHA.receive(3000));
+
+ Object txID = sessionState.getCurrentTxId();
+
+ producer.send(session.createTextMessage("Hello again before failover"));
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
+
+ JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+ ConnectionState state = (ConnectionState)delegate.getState();
+
+ log.info(">>Creating alternate connection");
+ JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
+ log.info("NewConnectionCreated=" + conn2);
+
+ log.info(">>Failling over");
+ assertSame(originalRemoting,delegate.getRemotingConnection());
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+ try {
+ originalRemoting.stop();
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+
+ assertNotSame(originalRemoting,delegate.getRemotingConnection());
+
+ //System.out.println("Kill server1"); Thread.sleep(10000);
+
+ message = session.createTextMessage("Hello After");
+ log.info(">>Sending new message");
+ producer.send(message);
+
+ assertEquals(txID,sessionState.getCurrentTxId());
+ System.out.println("TransactionID on client = " + txID);
+ log.info(">>Final commit");
+
+ JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+ connSecondServer.start();
+ JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
+
+ session.commit();
+
+ //assertNotNull(consumerSecondServer.receive(3000));
+ assertNotNull(consumerSecondServer.receive(3000));
+ assertNotNull(consumerSecondServer.receive(3000));
+ assertNull(consumerSecondServer.receive(3000));
+
+ log.info("Calling alternate receiver");
+ //assertNotNull(consumerHA.receive(1000));
+ assertNotNull(consumerHA.receive(1000));
+ assertNotNull(consumerHA.receive(1000));
+ assertNull(consumerHA.receive(1000));
+
+ }
+
+ public void testSimplestDurableSubscription() throws Exception
+ {
+ ConnectionFactory cf = factoryServer1;
+ Topic topic = (Topic)getCtx1().lookup("topic/testDistributedTopic");
+
+ Connection conn = cf.createConnection();
+
+ conn.setClientID("brookeburke");
+
+ conn.start();
+
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = s.createProducer(topic);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ log.info(" ****************** s.createDurableSubscriber(topic, \"monicabelucci\") ****************************");
+ MessageConsumer firstconsumer = (MessageConsumer)s.createDurableSubscriber(topic, "monicabelucci");
+
+
+ System.out.println("\n\n**************************************** consumer=" + firstconsumer.toString() + "\n\n");
+ Object message=firstconsumer.receive(2000);
+
+ log.info(" \n\n******************************* message Received=" + message + "\n\n");
+
+ prod.send(s.createTextMessage("k"));
+
+ conn.close();
+
+ conn = cf.createConnection();
+ conn.setClientID("brookeburke");
+
+ log.info(" ****************** conn.createSession(...); ****************************");
+ s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ log.info(" ****************** s.createDurableSubscriber(topic, \"monicabelucci\") ****************************");
+ MessageConsumer durable = s.createDurableSubscriber(topic, "monicabelucci");
+
+ log.info(" ****************** conn.start(); ****************************");
+ conn.start();
+
+ log.info(" ****************** TextMessage tm = (TextMessage)durable.receive(); ****************************");
+ TextMessage tm = (TextMessage)durable.receive();
+ assertEquals("k", tm.getText());
+
+ log.info(" ****************** Message m = durable.receive(1000); (expected to be null) ****************************");
+ Message m = durable.receive(1000);
+
+ prod = s.createProducer(topic);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.send(s.createTextMessage("Hello"));
+
+ Connection conn2 = factoryServer2.createConnection();
+ conn2.setClientID("clientTest");
+
+ Session session = conn2.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ conn2.start();
+
+ MessageConsumer consumer2 = session.createDurableSubscriber(topic,"spongebog");
+ System.out.println("Consumer2=" + consumer2);
+
+
+ }
+
+
+}
Deleted: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java 2006-10-20 22:34:19 UTC (rev 1509)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ReconnectTest.java 2006-10-21 00:33:54 UTC (rev 1510)
@@ -1,382 +0,0 @@
-package org.jboss.test.messaging.core.ha;
-
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.TextMessageProxy;
-
-import javax.jms.*;
-
-
-/** Start two JBoss instances (non clustered) to run these tests.
- * */
-public class ReconnectTest extends HATestBase
-{
-
- public void testSimpleReconnect() throws Exception
- {
- JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
- ConnectionState state = (ConnectionState)delegate.getState();
-
- assertFalse(state.isStarted());
- conn.start();
- assertTrue(state.isStarted());
-
- JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
- conn.getDelegate().failOver(conn2.getDelegate());
-
- conn.stop();
- assertFalse(state.isStarted());
-
- }
-
- public void testSimpleReconnectWithClientID() throws Exception
- {
- JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
- conn.setClientID("someClient");
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
- ConnectionState state = (ConnectionState)delegate.getState();
-
- assertFalse(state.isStarted());
- conn.start();
- assertTrue(state.isStarted());
-
- JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
- conn.getDelegate().failOver(conn2.getDelegate());
-
- // force recovering the clientID from server
- state.setClientID(null);
- assertEquals ("someClient",conn.getClientID());
-
- conn.stop();
- assertFalse(state.isStarted());
-
- }
-
- public void testWithSession() throws Exception
- {
- JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
- Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
- ConnectionState state = (ConnectionState)delegate.getState();
-
- JBossConnection conn2 = (JBossConnection)this.factoryServer1.createConnection();
- conn.getDelegate().failOver(conn2.getDelegate());
- }
-
- public void testSimpleWithOneProducerOnTopic() throws Exception
- {
- JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
- Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
- MessageProducer producer = session.createProducer(destination);
-
- Message message = session.createTextMessage("Hello Before");
- producer.send(message);
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
- ConnectionState state = (ConnectionState)delegate.getState();
-
- JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
- conn.getDelegate().failOver(conn2.getDelegate());
-
- System.out.println("Kill server1");
- Thread.sleep(10000);
-
-
- message = session.createTextMessage("Hello After");
- producer.send(message);
- }
-
- public void testSimpleWithOneProducerOnQueue() throws Exception
- {
- JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
- Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Destination destination = (Destination)getCtx1().lookup("queue/testQueue");
- MessageProducer producer = session.createProducer(destination);
-
- Message message = session.createTextMessage("Hello Before");
- producer.send(message);
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
- ConnectionState state = (ConnectionState)delegate.getState();
-
- JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
- conn.getDelegate().failOver(conn2.getDelegate());
-
- System.out.println("Kill server1");
- Thread.sleep(10000);
-
-
- message = session.createTextMessage("Hello After");
- producer.send(message);
- }
-
- public void testSimpleWithOneProducerTransacted() throws Exception
- {
- log.info("++testSimpleWithOneProducerTransacted");
-
- log.info(">>Lookup Queue");
- Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
-
- log.info("Creating connections used for assertion (not failed over)");
- JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
- connSecondServer.start();
- JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
-
- JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
- connFirstServer.start();
- JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerFirstServer = sessionFirstServer.createConsumer(destination);
-
-
- log.info("Creating connection server1");
- JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
-
- log.info("ConnectionCreated=" + conn);
- log.info(">>Creating Sessions");
-
- JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
- ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
- SessionState sessionState = (SessionState)clientSessionDelegate.getState();
- log.info(">>Creating Producer");
- MessageProducer producer = session.createProducer(destination);
- log.info(">>Creating Producer - ");
- log.info(">>creating Message");
- Message message = session.createTextMessage("Hello Before");
- log.info(">>sending Message");
- producer.send(message);
-
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- log.info("sending first commit");
- session.commit();
- Object txID = sessionState.getCurrentTxId();
-
- assertNotNull(consumerFirstServer.receive(2000));
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- producer.send(session.createTextMessage("Hello again before failover"));
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-
- JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
- ConnectionState state = (ConnectionState)delegate.getState();
-
- log.info(">>Creating alternate connection");
- JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
- log.info("NewConnectionCreated=" + conn2);
-
- log.info(">>Failling over");
- assertSame(originalRemoting,delegate.getRemotingConnection());
- conn.getDelegate().failOver(conn2.getDelegate());
- try {
- originalRemoting.stop();
- } catch (Throwable throwable) {
- throwable.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
- assertNotSame(originalRemoting,delegate.getRemotingConnection());
-
- //System.out.println("Kill server1"); Thread.sleep(10000);
-
- message = session.createTextMessage("Hello After");
- log.info(">>Sending new message");
- producer.send(message);
-
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- assertEquals(txID,sessionState.getCurrentTxId());
- System.out.println("TransactionID on client = " + txID);
- log.info(">>Final commit");
- session.commit();
-
- log.info("Checking receive on second server");
- assertNotNull(consumerSecondServer.receive(1000));
- assertNotNull(consumerSecondServer.receive(1000));
- log.info("Checking receive on first server");
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- }
-
- public void testSimpleWithOneProducerTransactedWithoutHA() throws Exception
- {
- log.info("++testSimpleWithOneProducerTransacted");
-
- log.info(">>Lookup Queue");
- Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
-
- log.info("Creating connections used for assertion (not failed over)");
- JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
- connSecondServer.start();
- JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
-
- JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
- connFirstServer.start();
- JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerFirstServer = sessionFirstServer.createConsumer(destination);
-
-
- log.info("Creating connection server1");
- JBossConnection conn = (JBossConnection)this.factoryServer2.createConnection();
-
- log.info("ConnectionCreated=" + conn);
- log.info(">>Creating Sessions");
-
- JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
- ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
- SessionState sessionState = (SessionState)clientSessionDelegate.getState();
- System.out.println("Size of callbackHandlers=" + sessionState.getCallbackHandlers().size());
- Object txID = sessionState.getCurrentTxId();
- log.info(">>Creating Producer");
- MessageProducer producer = session.createProducer(destination);
- log.info(">>Creating Producer - ");
- log.info(">>creating Message");
- Message message = session.createTextMessage("Hello Before");
- log.info(">>sending Message");
- producer.send(message);
-
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- log.info("sending first commit");
- //session.commit();
-
- assertNull(consumerFirstServer.receive(2000));
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- TextMessageProxy messagetxt = (TextMessageProxy)session.createTextMessage("Hello again before failover");
- producer.send(messagetxt);
- System.out.println("Id=" + messagetxt.getMessage().getConnectionID());
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-
- JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
- ConnectionState state = (ConnectionState)delegate.getState();
-
- log.info(">>Failling over");
- //System.out.println("Kill server1"); Thread.sleep(10000);
-
- message = session.createTextMessage("Hello After");
- log.info(">>Sending new message");
- producer.send(message);
-
- assertNull(consumerFirstServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- assertEquals(txID,sessionState.getCurrentTxId());
- System.out.println("TransactionID on client = " + txID);
- log.info(">>Final commit");
- session.commit();
-
- log.info("Checking receive on second server");
- assertNull(consumerFirstServer.receive(1000));
- assertNotNull(consumerSecondServer.receive(3000));
- assertNotNull(consumerSecondServer.receive(1000));
- assertNotNull(consumerSecondServer.receive(1000));
- log.info("Checking receive on first server");
- assertNull(consumerSecondServer.receive(1000));
-
- }
-
- public void testTopicSubscriber() throws Exception
- {
- log.info("++testSimpleWithOneProducerTransacted");
-
- log.info(">>Lookup Queue");
- Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
-
- log.info("Creating connections used for assertion (not failed over)");
- JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
- connSecondServer.start();
- JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination);
-
- JBossConnection connFirstServer = (JBossConnection)this.factoryServer1.createConnection();
- connFirstServer.start();
- JBossSession sessionFirstServer = (JBossSession)connFirstServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerFirstServer = sessionFirstServer.createConsumer(destination);
-
-
- log.info("Creating connection server1");
- JBossConnection conn = (JBossConnection)this.factoryServer1.createConnection();
- conn.start();
-
- log.info("ConnectionCreated=" + conn);
- log.info(">>Creating Sessions");
-
- JBossSession session = (JBossSession)conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
- ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate)session.getDelegate();
- SessionState sessionState = (SessionState)clientSessionDelegate.getState();
- MessageConsumer consumerHA = session.createConsumer(destination);
- log.info(">>Creating Producer");
- MessageProducer producer = session.createProducer(destination);
- log.info(">>creating Message");
- Message message = session.createTextMessage("Hello Before");
- log.info(">>sending Message");
- producer.send(message);
- log.info("sending first commit");
- session.commit();
- Object txID = sessionState.getCurrentTxId();
-
- assertNotNull(consumerHA.receive(1000));
- assertNotNull(consumerFirstServer.receive(2000));
- assertNull(consumerSecondServer.receive(1000));
- assertNull(consumerHA.receive(1000));
-
- producer.send(session.createTextMessage("Hello again before failover"));
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)conn.getDelegate();
-
- JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
-
- ConnectionState state = (ConnectionState)delegate.getState();
-
- log.info(">>Creating alternate connection");
- JBossConnection conn2 = (JBossConnection)this.factoryServer2.createConnection();
- log.info("NewConnectionCreated=" + conn2);
-
- log.info(">>Failling over");
- assertSame(originalRemoting,delegate.getRemotingConnection());
- conn.getDelegate().failOver(conn2.getDelegate());
- assertNotSame(originalRemoting,delegate.getRemotingConnection());
-
- //System.out.println("Kill server1"); Thread.sleep(10000);
-
- message = session.createTextMessage("Hello After");
- log.info(">>Sending new message");
- producer.send(message);
-
- assertEquals(txID,sessionState.getCurrentTxId());
- System.out.println("TransactionID on client = " + txID);
- log.info(">>Final commit");
- session.commit();
-
- assertNull(consumerFirstServer.receive(1000));
- assertNotNull(consumerSecondServer.receive(1000));
- assertNotNull(consumerSecondServer.receive(1000));
- assertNull(consumerSecondServer.receive(1000));
-
- assertNotNull(consumerHA.receive(1000));
- assertNotNull(consumerHA.receive(1000));
-
- }
-}
More information about the jboss-cvs-commits
mailing list