[jboss-cvs] JBoss Messaging SVN: r1486 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client: container remoting state
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 17 12:41:07 EDT 2006
Author: clebert.suconic at jboss.com
Date: 2006-10-17 12:41:04 -0400 (Tue, 17 Oct 2006)
New Revision: 1486
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 - MessageConsumers
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-10-17 16:41:04 UTC (rev 1486)
@@ -27,6 +27,8 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.JBossConnectionMetaData;
+import org.jboss.jms.client.remoting.CallbackManager;
+import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.ClientProducerDelegate;
@@ -40,6 +42,7 @@
import org.jboss.remoting.ConnectionListener;
import java.util.Iterator;
+import java.util.ArrayList;
/**
* Handles operations related to the connection
@@ -60,6 +63,7 @@
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(ConnectionAspect.class);
+ private static boolean trace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -212,24 +216,31 @@
}
ClientConnectionDelegate currentDelegate = ((ClientConnectionDelegate)invocation.getTargetObject());
- ConnectionState currentState = (ConnectionState)currentDelegate.getState();
+ ConnectionState currentConnectionState = (ConnectionState)currentDelegate.getState();
+ int oldServerId = currentConnectionState.getServerID();
ClientConnectionDelegate otherConnection = (ClientConnectionDelegate)((MethodInvocation)invocation).getArguments()[0];
- ConnectionState otherConnectionState = (ConnectionState)((ClientConnectionDelegate)otherConnection).getState();
+ ConnectionState newConnectionState = (ConnectionState)((ClientConnectionDelegate)otherConnection).getState();
- currentState.failOver(otherConnectionState);
+ currentConnectionState.failOver(newConnectionState);
- if (currentState.getClientID()!=null)
+ if (currentConnectionState.getClientID()!=null)
{
- otherConnection.setClientID(currentState.getClientID());
+ otherConnection.setClientID(currentConnectionState.getClientID());
}
// Transfering state from newDelegate to currentDelegate
currentDelegate.transferHAState(otherConnection);
- Iterator sessionsIterator = currentState.getChildren().iterator();
+ if (currentConnectionState.isStarted())
+ {
+ currentDelegate.start();
+ }
+
+ Iterator sessionsIterator = currentConnectionState.getChildren().iterator();
+
while(sessionsIterator.hasNext())
{
SessionState sessionState = (SessionState)sessionsIterator.next();
@@ -246,7 +257,10 @@
log.trace("Replacing session (" + currentSessionDelegate + ") by a new session created on the new failed over connection (" + newSessionDelegate + ")");
}
- Iterator sessionObjectsIterator = sessionState.getChildren().iterator();
+
+ ArrayList children = new ArrayList();
+ children.addAll(sessionState.getChildren());
+ Iterator sessionObjectsIterator = children.iterator();
while (sessionObjectsIterator.hasNext())
{
HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)sessionObjectsIterator.next();
@@ -257,7 +271,7 @@
}
else if (sessionChild instanceof ConsumerState)
{
- handleFailoverOnConsumer((ConsumerState)sessionChild,currentSessionDelegate);
+ handleFailoverOnConsumer(currentConnectionState,sessionState,(ConsumerState)sessionChild,currentSessionDelegate,oldServerId);
}
}
}
@@ -267,16 +281,34 @@
return null;
}
- private void handleFailoverOnConsumer(ConsumerState consumerState, ClientSessionDelegate sessionDelegate) throws JMSException
+ private void handleFailoverOnConsumer(ConnectionState connectionState,SessionState sessionState, ConsumerState consumerState, ClientSessionDelegate sessionDelegate, int oldServerId) throws JMSException
{
+ if (trace)
+ {
+ log.trace("handleFailoverOnConsumer: creating alternate consumer");
+ }
ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)sessionDelegate.createConsumerDelegate((JBossDestination) consumerState.getDestination(),consumerState.getSelector(),consumerState.isNoLocal(),consumerState.getSubscriptionName(),false);
+ if (trace)
+ {
+ log.trace("handleFailoverOnConsumer: alternate consumer created");
+ }
ClientConsumerDelegate currentConsumerDelegate = (ClientConsumerDelegate)consumerState.getDelegate();
currentConsumerDelegate.transferHAState(newConsumerDelegate);
+ int oldConsumerId = consumerState.getConsumerID();
+
ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
- consumerState.setConsumerID(newState.getConsumerID());
+ consumerState.failOver(newState);
+ CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+
+ MessageCallbackHandler handler = cm.unregisterHandler(oldServerId,oldConsumerId);
+ handler.setConsumerId(consumerState.getConsumerID());
+
+ cm.registerHandler(connectionState.getServerID(),consumerState.getConsumerID(),handler);
+ sessionState.addCallbackHandler(handler);
+
}
private void handleFailoverOnProducer(ProducerState producerState, ClientSessionDelegate sessionDelegate) throws JMSException
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-10-17 16:41:04 UTC (rev 1486)
@@ -63,11 +63,11 @@
callbackHandlers.put(lookup, handler);
}
- public void unregisterHandler(int serverId, int consumerId)
+ public MessageCallbackHandler unregisterHandler(int serverId, int consumerId)
{
Long lookup = calcLookup(serverId, consumerId);
- callbackHandlers.remove(lookup);
+ return (MessageCallbackHandler)callbackHandlers.remove(lookup);
}
private Long calcLookup(int serverId, int consumerId)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-10-17 16:41:04 UTC (rev 1486)
@@ -506,6 +506,11 @@
{
return consumerID;
}
+
+ public void setConsumerId(int consumerId)
+ {
+ this.consumerID=consumerId;
+ }
public void addToFrontOfBuffer(MessageProxy proxy)
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-10-17 16:41:04 UTC (rev 1486)
@@ -107,11 +107,6 @@
return consumerID;
}
- public void setConsumerID(int consumerID)
- {
- this.consumerID=consumerID;
- }
-
public boolean isConnectionConsumer()
{
return isConnectionConsumer;
@@ -151,6 +146,11 @@
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
+
+ public void failOver(ConsumerState newState)
+ {
+ this.consumerID=newState.consumerID;
+ }
}
More information about the jboss-cvs-commits
mailing list