[jboss-cvs] JBoss Messaging SVN: r1516 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/tx tests/src/org/jboss/test/messaging/jms util
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 23 23:35:50 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-10-23 23:35:43 -0400 (Mon, 23 Oct 2006)
New Revision: 1516
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/BrowserState.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
branches/Branch_Client_Failover_Experiment/util/do-not-distribute.properties
Log:
various conding convention-related modifications; added a simple test that fails
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -43,6 +43,7 @@
import java.util.Iterator;
import java.util.ArrayList;
+import java.util.List;
/**
* Handles operations related to the connection
@@ -208,127 +209,78 @@
}
- public Object handleFailover(Invocation invocation) throws Throwable
- {
- if (log.isTraceEnabled())
- {
- log.trace("Calling handleFailover");
- }
+ public Object handleFailover(Invocation invocation) throws Throwable
+ {
+ if (trace) { log.trace("calling handleFailover"); }
- ClientConnectionDelegate currentDelegate = ((ClientConnectionDelegate)invocation.getTargetObject());
- ConnectionState currentConnectionState = (ConnectionState)currentDelegate.getState();
+ ClientConnectionDelegate failedDelegate =
+ ((ClientConnectionDelegate)invocation.getTargetObject());
+ ConnectionState failedState = (ConnectionState)failedDelegate.getState();
- int oldServerId = currentConnectionState.getServerID();
+ int oldServerID = failedState.getServerID();
- ClientConnectionDelegate otherConnection = (ClientConnectionDelegate)((MethodInvocation)invocation).getArguments()[0];
- ConnectionState newConnectionState = (ConnectionState)((ClientConnectionDelegate)otherConnection).getState();
+ ClientConnectionDelegate newDelegate =
+ (ClientConnectionDelegate)((MethodInvocation)invocation).getArguments()[0];
+ ConnectionState newState = (ConnectionState)newDelegate.getState();
- currentConnectionState.failOver(newConnectionState);
+ failedState.copy(newState);
- if (currentConnectionState.getClientID()!=null)
- {
- otherConnection.setClientID(currentConnectionState.getClientID());
- }
+ if (failedState.getClientID() != null)
+ {
+ newDelegate.setClientID(failedState.getClientID());
+ }
- // Transfering state from newDelegate to currentDelegate
- currentDelegate.transferHAState(otherConnection);
+ // Transfering state from newDelegate to currentDelegate
+ failedDelegate.copyState(newDelegate);
- if (currentConnectionState.isStarted())
- {
- currentDelegate.start();
- }
+ if (failedState.isStarted())
+ {
+ failedDelegate.start();
+ }
+ for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
+ {
+ SessionState failedSessionState = (SessionState)i.next();
- Iterator sessionsIterator = currentConnectionState.getChildren().iterator();
+ ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newDelegate.
+ createSessionDelegate(failedSessionState.isTransacted(),
+ failedSessionState.getAcknowledgeMode(),
+ failedSessionState.isXA());
- while(sessionsIterator.hasNext())
- {
- SessionState sessionState = (SessionState)sessionsIterator.next();
+ ClientSessionDelegate failedSessionDelegate =
+ (ClientSessionDelegate)failedSessionState.getDelegate();
- ClientSessionDelegate newSessionDelegate=(ClientSessionDelegate)otherConnection.createSessionDelegate(sessionState.isTransacted(),sessionState.getAcknowledgeMode(),sessionState.isXA());
+ failedSessionDelegate.copyState(newSessionDelegate);
- ClientSessionDelegate currentSessionDelegate = (ClientSessionDelegate)sessionState.getDelegate();
+ if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
- currentSessionDelegate.transferHAState(newSessionDelegate);
+ List children = new ArrayList();
+ children.addAll(failedSessionState.getChildren());
+ for(Iterator j = children.iterator(); j.hasNext(); )
+ {
+ HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)j.next();
- if (log.isTraceEnabled())
+ if (sessionChild instanceof ProducerState)
{
- log.trace("Replacing session (" + currentSessionDelegate + ") by a new session created on the new failed over connection (" + newSessionDelegate + ")");
+ handleFailoverOnProducer((ProducerState)sessionChild, failedSessionDelegate);
}
-
-
- ArrayList children = new ArrayList();
- children.addAll(sessionState.getChildren());
- Iterator sessionObjectsIterator = children.iterator();
- while (sessionObjectsIterator.hasNext())
+ else if (sessionChild instanceof ConsumerState)
{
- HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)sessionObjectsIterator.next();
-
- if (sessionChild instanceof ProducerState)
- {
- handleFailoverOnProducer((ProducerState)sessionChild, currentSessionDelegate);
- }
- else if (sessionChild instanceof ConsumerState)
- {
- handleFailoverOnConsumer(currentConnectionState,sessionState,(ConsumerState)sessionChild,currentSessionDelegate,oldServerId);
- }
+ handleFailoverOnConsumer(failedState,
+ failedSessionState,
+ (ConsumerState)sessionChild,
+ failedSessionDelegate,
+ oldServerID);
}
- }
+ }
+ }
+ return null;
+ }
+ // ConnectionListener implementation -----------------------------------------------------------
- return null;
- }
-
- private void handleFailoverOnConsumer(ConnectionState connectionState,SessionState sessionState, ConsumerState consumerState, ClientSessionDelegate sessionDelegate, int oldServerId) throws JMSException
- {
- ClientConsumerDelegate currentConsumerDelegate = (ClientConsumerDelegate)consumerState.getDelegate();
-
- if (trace)
- {
- log.trace("handleFailoverOnConsumer: creating alternate consumer");
- }
- ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)sessionDelegate.createConsumerDelegate((JBossDestination) consumerState.getDestination(),consumerState.getSelector(),consumerState.isNoLocal(),consumerState.getSubscriptionName(),false, currentConsumerDelegate.getChannelId());
- if (trace)
- {
- log.trace("handleFailoverOnConsumer: alternate consumer created");
- }
-
-
- currentConsumerDelegate.transferHAState(newConsumerDelegate);
-
- int oldConsumerId = consumerState.getConsumerID();
-
- ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
- consumerState.failOver(newState);
-
- connectionState.getResourceManager().handleFailover(sessionState.getCurrentTxId(),oldConsumerId,consumerState.getConsumerID());
-
- CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
-
- MessageCallbackHandler handler = cm.unregisterHandler(oldServerId,oldConsumerId);
- handler.setConsumerId(consumerState.getConsumerID());
-
- cm.registerHandler(connectionState.getServerID(),consumerState.getConsumerID(),handler);
- sessionState.addCallbackHandler(handler);
-
- }
-
- private void handleFailoverOnProducer(ProducerState producerState, ClientSessionDelegate sessionDelegate) throws JMSException
- {
- ClientProducerDelegate newProducerDelegate = (ClientProducerDelegate)sessionDelegate.createProducerDelegate((JBossDestination)producerState.getDestination());
- ClientProducerDelegate currentProducerDelegate = (ClientProducerDelegate)producerState.getDelegate();
- currentProducerDelegate.transferHAState(newProducerDelegate);
-
- if (log.isTraceEnabled())
- {
- log.trace("Handling failingOver on producerDelegate on currentDelegate=" + currentProducerDelegate + " destination=" + producerState.getDestination());
- }
- }
-
- // ConnectionListener implementation -----------------------------------------------------------
-
public void handleConnectionException(Throwable t, Client c)
{
log.error("Caught exception from connection", t);
@@ -368,16 +320,78 @@
// Private -------------------------------------------------------
- private ConnectionState getConnectionState(Invocation invocation) {
+ private ConnectionState getConnectionState(Invocation invocation)
+ {
+ if (state==null)
+ {
+ ClientConnectionDelegate currentDelegate =
+ ((ClientConnectionDelegate)invocation.getTargetObject());
- if (state==null)
- {
- ClientConnectionDelegate currentDelegate = ((ClientConnectionDelegate)invocation.getTargetObject());
- state = (ConnectionState)currentDelegate.getState();
- }
- return state;
- }
+ state = (ConnectionState)currentDelegate.getState();
+ }
+ return state;
+ }
+ private void handleFailoverOnConsumer(ConnectionState failedConnectionState,
+ SessionState failedSessionState,
+ ConsumerState failedConsumerState,
+ ClientSessionDelegate failedSessionDelegate,
+ int oldServerID)
+ throws JMSException
+ {
+ ClientConsumerDelegate failedConsumerDelegate =
+ (ClientConsumerDelegate)failedConsumerState.getDelegate();
+ if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
+
+ ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
+ createConsumerDelegate((JBossDestination)failedConsumerState.getDestination(),
+ failedConsumerState.getSelector(),
+ failedConsumerState.isNoLocal(),
+ failedConsumerState.getSubscriptionName(),
+ false,
+ failedConsumerDelegate.getChannelId());
+
+ if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
+
+ failedConsumerDelegate.copyState(newConsumerDelegate);
+
+ int oldConsumerID = failedConsumerState.getConsumerID();
+
+ ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
+ failedConsumerState.copy(newState);
+
+ failedConnectionState.getResourceManager().
+ handleFailover(failedSessionState.getCurrentTxId(),
+ oldConsumerID,
+ failedConsumerState.getConsumerID());
+
+ CallbackManager cm = failedConnectionState.getRemotingConnection().getCallbackManager();
+
+ MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
+ handler.setConsumerId(failedConsumerState.getConsumerID());
+
+ cm.registerHandler(failedConnectionState.getServerID(),
+ failedConsumerState.getConsumerID(),
+ handler);
+ failedSessionState.addCallbackHandler(handler);
+
+ }
+
+ private void handleFailoverOnProducer(ProducerState failedProducerState,
+ ClientSessionDelegate failedSessionDelegate)
+ throws JMSException
+ {
+ ClientProducerDelegate newProducerDelegate = (ClientProducerDelegate)failedSessionDelegate.
+ createProducerDelegate((JBossDestination)failedProducerState.getDestination());
+
+ ClientProducerDelegate failedProducerDelegate =
+ (ClientProducerDelegate)failedProducerState.getDelegate();
+
+ failedProducerDelegate.copyState(newProducerDelegate);
+
+ if (trace) { log.trace("handling fail over on producerDelegate " + failedProducerDelegate + " destination=" + failedProducerState.getDestination()); }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -206,10 +206,10 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- /** @see org.jboss.jms.client.container.ConnectionAspect#handleFailover(org.jboss.aop.joinpoint.Invocation) */
- public void failOver(ConnectionDelegate newConnection)
+ /** @see org.jboss.jms.client.container.ConnectionAspect#handleFailover(org.jboss.aop.joinpoint.Invocation) */
+ public void failOver(ConnectionDelegate newConnection)
{
- throw new IllegalStateException("This invocation should not be handled here!");
+ throw new IllegalStateException("This invocation should not be handled here!");
}
// Public --------------------------------------------------------
@@ -240,10 +240,10 @@
return ((ConnectionState)state).getRemotingConnection().getInvokingClient();
}
- public void transferHAState(DelegateSupport copyFrom) {
- super.transferHAState(copyFrom);
- ClientConnectionDelegate other = (ClientConnectionDelegate)copyFrom;
- this.setRemotingConnection(other.getRemotingConnection());
+ public void copyState(DelegateSupport newDelegate)
+ {
+ super.copyState(newDelegate);
+ setRemotingConnection(((ClientConnectionDelegate)newDelegate).getRemotingConnection());
}
// Package Private -----------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -191,18 +191,20 @@
getInvokingClient();
}
+ public void copyState(DelegateSupport newDelegate)
+ {
+ super.copyState(newDelegate);
+ this.channelId = ((ClientConsumerDelegate)newDelegate).channelId;
+ this.getMetaData().removeMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID);
+ this.getMetaData().addMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.CONSUMER_ID,
+ newDelegate.getMetaData().
+ getMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.CONSUMER_ID),
+ PayloadKey.TRANSIENT);
+ }
-
- public void transferHAState(DelegateSupport copyFrom)
- {
- super.transferHAState(copyFrom);
- this.channelId = ((ClientConsumerDelegate)copyFrom).channelId;
- this.getMetaData().removeMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID);
- this.getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID,copyFrom.getMetaData().getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID), PayloadKey.TRANSIENT);
- }
-
-
// Package Private -----------------------------------------------
// Private -------------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -141,12 +141,14 @@
return id;
}
- /** During HA events, a new object on the new server is created and all the state on that new object
- * has to be transfered to this actual object.
- * For example, the newConnection will have to assume the new ObjectID and the new RemotingConnection. */
- public void transferHAState(DelegateSupport copyFrom)
+ /**
+ * During HA events, a new object is created on the new server and the state on that new object
+ * has to be transfered to this actual object. For example, a Connection will have to assume the
+ * ObjectID of the new connection endpoint and the new RemotingConnection.
+ */
+ public void copyState(DelegateSupport newDelegate)
{
- this.id=copyFrom.getID();
+ id = newDelegate.getID();
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/BrowserState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/BrowserState.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/BrowserState.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -46,31 +46,28 @@
super(parent, (DelegateSupport)delegate);
}
+ public DelegateSupport getDelegate()
+ {
+ return (DelegateSupport)delegate;
+ }
+ public void setDelegate(DelegateSupport delegate)
+ {
+ this.delegate=(BrowserDelegate)delegate;
+ }
-
- public DelegateSupport getDelegate()
- {
- return (DelegateSupport)delegate;
- }
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate=(BrowserDelegate)delegate;
- }
-
-
public Version getVersionToUse()
{
return parent.getVersionToUse();
}
- public void setParent(HierarchicalState parent)
- {
- this.parent=(SessionState)parent;
- }
- public HierarchicalState getParent()
- {
- return parent;
- }
+ public void setParent(HierarchicalState parent)
+ {
+ this.parent=(SessionState)parent;
+ }
+ public HierarchicalState getParent()
+ {
+ return parent;
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -147,43 +147,53 @@
{
}
- public boolean isStarted() {
+ public boolean isStarted()
+ {
return started;
}
- public void setStarted(boolean started) {
+ public void setStarted(boolean started)
+ {
this.started = started;
}
- public String getClientID() {
+ public String getClientID()
+ {
return clientID;
}
- public void setClientID(String clientID) {
+ public void setClientID(String clientID)
+ {
this.clientID = clientID;
}
- public ExceptionListener getExceptionListener() {
+ public ExceptionListener getExceptionListener()
+ {
return exceptionListener;
}
- public void setExceptionListener(ExceptionListener exceptionListener) {
+ public void setExceptionListener(ExceptionListener exceptionListener)
+ {
this.exceptionListener = exceptionListener;
}
- public boolean isJustCreated() {
+ public boolean isJustCreated()
+ {
return justCreated;
}
- public void setJustCreated(boolean justCreated) {
+ public void setJustCreated(boolean justCreated)
+ {
this.justCreated = justCreated;
}
- public boolean isListenerAdded() {
+ public boolean isListenerAdded()
+ {
return listenerAdded;
}
- public void setListenerAdded(boolean listenerAdded) {
+ public void setListenerAdded(boolean listenerAdded)
+ {
this.listenerAdded = listenerAdded;
}
@@ -193,7 +203,7 @@
return null;
}
- public void failOver(ConnectionState newState)
+ public void copy(ConnectionState newState)
{
this.serverID = newState.serverID;
this.idGenerator = newState.idGenerator;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -42,19 +42,19 @@
public class ConsumerState extends HierarchicalStateSupport
{
private Destination destination;
-
+
private String selector;
String subscriptionName;
-
+
private boolean noLocal;
-
+
private int consumerID;
-
+
private boolean isConnectionConsumer;
-
+
private MessageCallbackHandler messageCallbackHandler;
-
+
private int prefetchSize;
private SessionState parent;
@@ -62,46 +62,46 @@
private ConsumerDelegate delegate;
public ConsumerState(SessionState parent, ConsumerDelegate delegate, Destination dest,
- String selector, boolean noLocal, String subscriptionName, int consumerID, boolean isCC,
- int prefetchSize)
+ String selector, boolean noLocal, String subscriptionName,
+ int consumerID, boolean isCC, int prefetchSize)
{
super(parent, (DelegateSupport)delegate);
children = Collections.EMPTY_SET;
this.destination = dest;
this.selector = selector;
- this.noLocal = noLocal;
+ this.noLocal = noLocal;
this.consumerID = consumerID;
this.isConnectionConsumer = isCC;
this.prefetchSize = prefetchSize;
}
- public DelegateSupport getDelegate()
- {
- return (DelegateSupport)delegate;
- }
+ public DelegateSupport getDelegate()
+ {
+ return (DelegateSupport)delegate;
+ }
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate = (ConsumerDelegate)delegate;
- }
+ public void setDelegate(DelegateSupport delegate)
+ {
+ this.delegate = (ConsumerDelegate)delegate;
+ }
public Destination getDestination()
{
return destination;
}
-
+
public String getSelector()
{
return selector;
}
-
+
public boolean isNoLocal()
{
return noLocal;
}
-
+
public int getConsumerID()
{
return consumerID;
@@ -111,46 +111,51 @@
{
return isConnectionConsumer;
}
-
+
public void setMessageCallbackHandler(MessageCallbackHandler handler)
{
this.messageCallbackHandler = handler;
}
-
+
public MessageCallbackHandler getMessageCallbackHandler()
{
return messageCallbackHandler;
}
-
+
public Version getVersionToUse()
{
return parent.getVersionToUse();
}
-
+
public int getPrefetchSize()
{
return prefetchSize;
}
- public HierarchicalState getParent() {
- return parent;
- }
- public void setParent(HierarchicalState parent) {
- this.parent=(SessionState)parent;
- }
+ public HierarchicalState getParent()
+ {
+ return parent;
+ }
- public String getSubscriptionName() {
- return subscriptionName;
- }
+ public void setParent(HierarchicalState parent)
+ {
+ this.parent=(SessionState)parent;
+ }
- public void setSubscriptionName(String subscriptionName) {
- this.subscriptionName = subscriptionName;
- }
+ public String getSubscriptionName()
+ {
+ return subscriptionName;
+ }
- public void failOver(ConsumerState newState)
- {
- this.consumerID=newState.consumerID;
- }
+ public void setSubscriptionName(String subscriptionName)
+ {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public void copy(ConsumerState newState)
+ {
+ this.consumerID = newState.consumerID;
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -54,8 +54,9 @@
ServerSessionPool sessionPool,
int maxMessages) throws JMSException;
- /** Reconnects the current connection hierarchy using newConnection's properties */
+ /**
+ * Reconnects the current connection hierarchy using newConnection's properties.
+ */
void failOver(ConnectionDelegate newConnection);
-
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -120,7 +120,9 @@
tx.getMessages().add(m);
}
- /** Navigate on ACK and change clientIDs on every ACK not sent yet */
+ /**
+ * Navigate on ACK and change clientIDs on every ACK not sent yet.
+ */
public void handleFailover(Object xid, int oldClientId, int newClientId)
{
if (trace) { log.trace("handleFailover:: Transfering clientIds on ACKs from " + oldClientId + " to " + newClientId); }
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java 2006-10-24 03:35:43 UTC (rev 1516)
@@ -38,6 +38,7 @@
import javax.naming.InitialContext;
import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.jms.client.JBossConnection;
/**
*
@@ -45,6 +46,8 @@
*
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
@@ -53,29 +56,29 @@
public class ManualClusteringTest extends MessagingTestCase
{
protected Context ic1;
-
+
protected Context ic2;
-
+
protected Context ic3;
-
+
protected Queue queue1;
-
+
protected Topic topic1;
-
+
protected Queue queue2;
-
+
protected Topic topic2;
-
+
protected Queue queue3;
-
+
protected Topic topic3;
-
+
protected ConnectionFactory cf1;
-
+
protected ConnectionFactory cf2;
-
+
protected ConnectionFactory cf3;
-
+
public ManualClusteringTest(String name)
{
super(name);
@@ -84,23 +87,23 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
Properties props1 = new Properties();
-
+
props1.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
props1.put(Context.PROVIDER_URL, "jnp://localhost:1199");
props1.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
-
+
ic1 = new InitialContext(props1);
-
+
Properties props2 = new Properties();
-
+
props2.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
props2.put(Context.PROVIDER_URL, "jnp://localhost:1299");
props2.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
-
+
ic2 = new InitialContext(props2);
-
+
// Properties props3 = new Properties();
//
// props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
@@ -108,23 +111,23 @@
// props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
//
// ic3 = new InitialContext(props3);
-
+
queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
-
+
queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
-
+
//queue3 = (Queue)ic3.lookup("queue/ClusteredQueue1");
-
+
topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
-
+
topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
-
+
//topic3 = (Topic)ic3.lookup("topic/ClusteredTopic1");
-
+
cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
+
cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
-
+
//cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
log.info("setup done");
@@ -133,157 +136,157 @@
protected void tearDown() throws Exception
{
super.tearDown();
-
+
ic1.close();
-
+
ic2.close();
}
-
+
/*
- * Each node had consumers, send message at node, make sure local consumer gets message
- */
+ * Each node had consumers, send message at node, make sure local consumer gets message
+ */
public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
{
log.info("starting test");
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn2 = cf2.createConnection();
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer cons1 = sess1.createConsumer(queue1);
-
+
MessageConsumer cons2 = sess2.createConsumer(queue2);
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(queue1);
-
+
prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)cons1.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
Message m = cons2.receive(2000);
-
+
assertNull(m);
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
+
public void testClusteredQueueLocalConsumerPersistent() throws Exception
{
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn2 = cf2.createConnection();
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer cons1 = sess1.createConsumer(queue1);
-
+
MessageConsumer cons2 = sess2.createConsumer(queue2);
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(queue1);
-
+
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)cons1.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
Message m = cons2.receive(2000);
-
+
assertNull(m);
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
+
// /*
// * No consumer on local node, send message at node, make sure remote consumer gets messages
// */
@@ -423,499 +426,551 @@
public void testClusteredTopicNonDurableNonPersistent() throws Exception
{
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn2 = cf2.createConnection();
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer cons1 = sess1.createConsumer(topic1);
-
+
MessageConsumer cons2 = sess2.createConsumer(topic2);
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(topic1);
-
+
prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)cons1.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)cons2.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
-
+
+
public void testClusteredTopicNonDurablePersistent() throws Exception
{
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn2 = cf2.createConnection();
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer cons1 = sess1.createConsumer(topic1);
-
+
MessageConsumer cons2 = sess2.createConsumer(topic2);
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(topic1);
-
+
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)cons1.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)cons2.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
-
+
+
public void testClusteredTopicDurableNonPersistentLocal() throws Exception
{
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn1.setClientID("id1");
-
+
conn2 = cf2.createConnection();
-
+
conn2.setClientID("id1");
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
-
+
MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(topic1);
-
+
prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
//All the messages should be on the local sub
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)durable1.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
Message m = durable2.receive(2000);
-
+
assertNull(m);
-
+
durable1.close();
-
+
durable2.close();
-
+
sess1.unsubscribe("sub1");
-
+
sess2.unsubscribe("sub1");
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
+
public void testClusteredTopicDurablePersistentLocal() throws Exception
{
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn1.setClientID("id1");
-
+
conn2 = cf2.createConnection();
-
+
conn2.setClientID("id1");
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
try
{
sess1.unsubscribe("sub1");
-
+
sess2.unsubscribe("sub1");
}
catch (Exception ignore)
- {
+ {
}
-
+
MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
-
+
MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(topic1);
-
+
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
//All the messages should be on the local sub
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)durable1.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
Message m = durable2.receive(2000);
-
+
assertNull(m);
-
+
sess1.unsubscribe("sub1");
-
+
sess2.unsubscribe("sub1");
-
+
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
-
+
+
public void testClusteredTopicDurableNonPersistentNotLocal() throws Exception
{
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn1.setClientID("id1");
-
+
conn2 = cf2.createConnection();
-
+
conn2.setClientID("id1");
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(topic1);
-
+
prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
//All the messages should be on the non local sub
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)durable2.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
durable2.close();
-
+
sess2.unsubscribe("sub1");
-
+
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
+
public void testClusteredTopicDurablePersistentNotLocal() throws Exception
{
Connection conn1 = null;
-
+
Connection conn2 = null;
try
{
conn1 = cf1.createConnection();
-
+
conn1.setClientID("id1");
-
+
conn2 = cf2.createConnection();
-
+
conn2.setClientID("id1");
-
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
-
+
conn1.start();
-
+
conn2.start();
-
+
MessageProducer prod1 = sess1.createProducer(topic1);
-
+
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
+
final int NUM_MESSAGES = 100;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess1.createTextMessage("message" + i);
-
+
prod1.send(tm);
}
-
+
log.info("sent messages");
-
+
//All the messages should be on the non local sub
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
log.info("i is " + i);
-
+
TextMessage tm = (TextMessage)durable2.receive(1000);
-
+
assertNotNull(tm);
-
+
log.info("Got message:" + tm);
-
+
assertEquals("message" + i, tm.getText());
}
-
+
durable2.close();
-
+
sess2.unsubscribe("sub1");
-
+
}
finally
- {
+ {
try
{
if (conn1 != null) conn1.close();
-
+
if (conn2 != null) conn2.close();
}
catch (Exception ignore)
{
-
+
}
}
}
-
+ public void testSimpleFailover() throws Exception
+ {
+
+ Connection conn = null;
+ Connection conn2 = null;
+
+ try
+ {
+ // Create a consumer on a distributed queue
+
+ conn = cf1.createConnection();
+ conn.setClientID("cid");
+
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer c = s.createConsumer(queue1);
+
+ conn.start();
+
+ // Fail it over
+
+ // Create by hand the "failover" connection
+ Connection failOverConn = cf2.createConnection();
+ ((JBossConnection)conn).getDelegate().
+ failOver(((JBossConnection)failOverConn).getDelegate());
+
+ // Send a message to the distributed queue, specifically on the "valid" node.
+
+ conn2 = cf2.createConnection();
+ Session s2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s2.createProducer(queue1);
+ p.send(s2.createTextMessage("test0"));
+
+ // I should be able to receive the message using the first connection
+
+ TextMessage tm = (TextMessage)c.receive(2000);
+ assertEquals("test0", tm.getText());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+
class MyListener implements MessageListener
{
private int i;
-
+
MyListener(int i)
{
this.i = i;
@@ -926,7 +981,7 @@
try
{
int count = m.getIntProperty("count");
-
+
log.info("Listener " + i + " received message " + count);
}
catch (Exception e)
@@ -934,7 +989,7 @@
e.printStackTrace();
}
}
-
+
}
-
+
}
Modified: branches/Branch_Client_Failover_Experiment/util/do-not-distribute.properties
===================================================================
--- branches/Branch_Client_Failover_Experiment/util/do-not-distribute.properties 2006-10-21 04:03:42 UTC (rev 1515)
+++ branches/Branch_Client_Failover_Experiment/util/do-not-distribute.properties 2006-10-24 03:35:43 UTC (rev 1516)
@@ -2,7 +2,7 @@
# This file provides values for in-workarea example testing.
# DO NOT DISTRIBUTE!
#
-jboss.home=/home/clebert/workspaces/jboss-4.0-tmp/build/output/jboss-4.0.5.GA
+#jboss.home=/home/clebert/workspaces/jboss-4.0-tmp/build/output/jboss-4.0.5.GA
messaging.config.name=messaging
main.artifact.location=../output/lib
auxiliary.artifacts.location=../src/etc/server/default/deploy
More information about the jboss-cvs-commits
mailing list