[jboss-cvs] JBoss Messaging SVN: r1981 - in trunk: src/etc/xmdesc and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 18 11:12:55 EST 2007
Author: clebert.suconic at jboss.com
Date: 2007-01-18 11:12:55 -0500 (Thu, 18 Jan 2007)
New Revision: 1981
Added:
trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java
trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java
Modified:
trunk/src/etc/server/default/deploy/connection-factories-service.xml
trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java
trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
trunk/src/main/org/jboss/jms/server/ConnectionManager.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-674
Modified: trunk/src/etc/server/default/deploy/connection-factories-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/connection-factories-service.xml 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/etc/server/default/deploy/connection-factories-service.xml 2007-01-18 16:12:55 UTC (rev 1981)
@@ -25,7 +25,7 @@
</attribute>
<attribute name="Clustered">true</attribute>
- <attribute name="LoadBalancingPolicy">org.jboss.jms.client.plugin.RoundRobinLoadBalancingPolicy</attribute>
+ <attribute name="LoadBalancingFactory">org.jboss.jms.client.plugin.RoundRobbinLoadBalancingFactory</attribute>
</mbean>
</server>
\ No newline at end of file
Modified: trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-01-18 16:12:55 UTC (rev 1981)
@@ -88,10 +88,10 @@
<type>boolean</type>
</attribute>
- <attribute access="read-write" getMethod="getLoadBalancingPolicy" setMethod="setLoadBalancingPolicy">
- <description>The pluggable load balancing policy that is used to decide the next cluster node to create a clustered connection to</description>
- <name>LoadBalancingPolicy</name>
- <type>org.jboss.jms.client.plugin.LoadBalancingPolicy</type>
+ <attribute access="read-write" getMethod="getLoadBalancingFactory" setMethod="setLoadBalancingFactory">
+ <description>The pluggable load balancing policy factory that is used to decide the next cluster node to create a clustered connection to</description>
+ <name>LoadBalancingFactory</name>
+ <type>org.jboss.jms.client.plugin.LoadBalancingFactory</type>
</attribute>
<!-- Managed operations -->
Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -71,14 +71,6 @@
if (clusteredDelegate == null)
{
clusteredDelegate = (ClientClusteredConnectionFactoryDelegate)invocation.getTargetObject();
-
- // TODO JBMESSAGING-674 - for the time being we assume that the cluster view is static, so
- // we only initialize the load balancing policy here. This is obviously not true,
- // we'll need to review this when working on
- // http://jira.jboss.org/jira/browse/JBMESSAGING-674
-
- clusteredDelegate.getLoadBalancingPolicy().
- updateView(Arrays.asList(clusteredDelegate.getDelegates()));
}
// the method handles both the case of a first connection creation attempt and a retry during
@@ -215,6 +207,15 @@
Map failoverMap = clusteredDelegate.getFailoverMap();
Integer failoverNodeID = (Integer)failoverMap.get(nodeID);
+ // FailoverNodeID is not on the map, that means the ConnectionFactory was updated
+ // by another connection in another server.. So we will have to guess the failoverID
+ // by numeric order. Case we guessed the new server wrongly we will have to rely on
+ // redirect from failover
+ if (failoverNodeID == null)
+ {
+ failoverNodeID = guessFailoverID(failoverMap, nodeID);
+ }
+
for (int i = 0; i < delegates.length; i++)
{
if (delegates[i].getServerID() == failoverNodeID.intValue())
@@ -226,6 +227,37 @@
return null;
}
+ /** FailoverNodeID is not on the map, that means the ConnectionFactory was updated
+ * by another connection in another server.. So we will have to guess the failoverID
+ * by numeric order. Case we guessed the new server wrongly we will have to rely on
+ * redirect from failover.
+ * (NOTE:
+ * There is a testcase that uses reflection to validate this method at
+ * org.jboss.test.messaging.jms.clustering.ClusteringAspectInternalTest
+ * Modify that testcase case you decide to refactor this method)
+ */
+ private static Integer guessFailoverID(Map failoverMap, Integer nodeID)
+ {
+ Integer failoverNodeID = null;
+ Integer[] nodes = (Integer[]) failoverMap.keySet().toArray(new Integer[failoverMap.size()]);
+ // We need to sort the array first
+ Arrays.sort(nodes);
+ for (int i = 0; i < nodes.length; i++)
+ {
+ if (nodeID.intValue() < nodes[i].intValue())
+ {
+ failoverNodeID = nodes[i];
+ break;
+ }
+ }
+ // if still null use the first node...
+ if (failoverNodeID == null)
+ {
+ failoverNodeID = nodes[0];
+ }
+ return failoverNodeID;
+ }
+
private synchronized ClientConnectionFactoryDelegate getDelegateForNode(int nodeID)
{
ClientConnectionFactoryDelegate[] delegates = clusteredDelegate.getDelegates();
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -132,6 +132,7 @@
public void setDelegates(ClientConnectionFactoryDelegate[] dels)
{
this.delegates = dels;
+ getLoadBalancingPolicy().updateView(dels);
}
public Map getFailoverMap()
@@ -149,6 +150,20 @@
return loadBalancingPolicy;
}
+ /** Method used to update the delegate and failoverMap during viewChange */
+ public synchronized void updateFailoverInfo(ClientConnectionFactoryDelegate[] delegates,
+ Map failoverMap)
+ {
+ for (int i = 0; i < delegates.length; i++)
+ {
+ delegates[i].init();
+ }
+ this.delegates = delegates;
+ this.failoverMap = failoverMap;
+ getLoadBalancingPolicy().updateView(delegates);
+ }
+
+
public String toString()
{
StringBuffer sb = new StringBuffer("ClusteredConnectionFactoryDelegate[");
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -69,7 +69,7 @@
//This data is needed in order to create a connection
private String serverLocatorURI;
-
+
private Version serverVersion;
private int serverID;
@@ -284,6 +284,8 @@
connectionDelegate.setRemotingConnection(remotingConnection);
connectionDelegate.setVersionToUse(version);
+
+ remotingConnection.getCallbackManager().setConnectionDelegate(connectionDelegate);
}
else
{
@@ -326,7 +328,7 @@
{
return serverVersion;
}
-
+
public void synchronizeWith(DelegateSupport newDelegate) throws Exception
{
super.synchronizeWith(newDelegate);
Added: trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.client.plugin;
+
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+import java.io.Serializable;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public abstract class LoadBalancingFactory implements Serializable
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final long serialVersionUID = -22502384102786964L;
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ static LoadBalancingFactory theInstance = new RoundRobbinLoadBalancingFactory();
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ // Public ---------------------------------------------------------------------------------------
+
+ /**
+ * A LoadBalancingPolicy will work on top of ConnectionFactoryDelegate
+ */
+ public abstract LoadBalancingPolicy createLoadBalancingPolicy(ConnectionFactoryDelegate[] view);
+
+ public static LoadBalancingFactory getDefaultFactory()
+ {
+ return theInstance;
+ }
+
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -9,7 +9,6 @@
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import java.io.Serializable;
-import java.util.List;
/**
* The interface that must be implemented by any load balancing policy plugin.
@@ -23,11 +22,13 @@
static final long serialVersionUID = 328573973957394573L;
ConnectionFactoryDelegate getNext();
-
+
/**
+ * This method should be called when updating the LoadBalancingFactory
* @param delegates - a List<ConnectionFactoryDelegate> representing the lastest cluster view
* to chose delegates from
*/
- void updateView(List delegates);
+ void updateView(ConnectionFactoryDelegate[] delegates);
+
}
Added: trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.client.plugin;
+
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * <p/>
+ * $Id$
+ */
+public class RoundRobbinLoadBalancingFactory extends LoadBalancingFactory
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final long serialVersionUID = -6304457304470491564L;
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public LoadBalancingPolicy createLoadBalancingPolicy(ConnectionFactoryDelegate[] view)
+ {
+ return new RoundRobinLoadBalancingPolicy(view);
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -21,6 +21,8 @@
{
// Constants ------------------------------------------------------------------------------------
+ private static final long serialVersionUID = 5215940403016586462L;
+
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
@@ -28,27 +30,31 @@
// The index of the next delegate to be used
private int next;
- // List<ConnectionFactoryDelegate>
- private List view;
+ private ConnectionFactoryDelegate[] delegates;
// Constructors ---------------------------------------------------------------------------------
+ public RoundRobinLoadBalancingPolicy(ConnectionFactoryDelegate[] delegates)
+ {
+ this.delegates = delegates;
+ }
+
// LoadBalancingPolicy implementation -----------------------------------------------------------
public synchronized ConnectionFactoryDelegate getNext()
{
- if (next >= view.size())
+ if (next >= delegates.length)
{
next = 0;
}
- return (ConnectionFactoryDelegate)view.get(next++);
+ return delegates[next++];
}
- public synchronized void updateView(List delegates)
+ public synchronized void updateView(ConnectionFactoryDelegate[] delegates)
{
- view = new ArrayList(delegates);
next = 0;
+ this.delegates = delegates;
}
// Public ---------------------------------------------------------------------------------------
Added: trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.client.remoting;
+
+import org.jboss.jms.message.MessageProxy;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public interface CallbackHandler
+{
+
+ void handleMessage(Object message);
+
+}
Property changes on: trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -26,6 +26,7 @@
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.remoting.MessagingMarshallable;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.logging.Logger;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.HandleCallbackException;
@@ -73,6 +74,7 @@
// Map<Long(lookup)-MessageCallbackHandler>
protected Map callbackHandlers;
+ protected ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler;
// Constructors ---------------------------------------------------------------------------------
@@ -85,23 +87,33 @@
public void handleCallback(Callback callback) throws HandleCallbackException
{
- MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
- ClientDelivery dr = (ClientDelivery)mm.getLoad();
- MessageProxy msg = dr.getMessage();
+ if (callback.getParameter() instanceof MessagingMarshallable)
+ {
+ MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+ ClientDelivery dr = (ClientDelivery)mm.getLoad();
+ MessageProxy msg = dr.getMessage();
- MessageCallbackHandler handler =
- (MessageCallbackHandler)callbackHandlers.get(new Integer(dr.getConsumerId()));
+ MessageCallbackHandler handler =
+ (MessageCallbackHandler)callbackHandlers.get(new Integer(dr.getConsumerId()));
- if (handler == null)
+ if (handler == null)
+ {
+ //This is OK and can happen if the callback handler is deregistered on consumer close,
+ //but there are messages still in transit which arrive later.
+ //In this case it is just safe to ignore the message
+ if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
+ return;
+ }
+
+ handler.handleMessage(msg);
+ } else
{
- //This is OK and can happen if the callback handler is deregistered on consumer close,
- //but there are messages still in transit which arrive later.
- //In this case it is just safe to ignore the message
- if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
- return;
+ log.trace("Receiving connectionFactoryUpdateMessage - " + callback.getParameter());
+ if (connectionfactoryCallbackHandler != null)
+ {
+ connectionfactoryCallbackHandler.handleMessage(callback.getParameter());
+ }
}
-
- handler.handleMessage(msg);
}
// Public ---------------------------------------------------------------------------------------
@@ -111,6 +123,12 @@
callbackHandlers.put(new Integer(consumerID), handler);
}
+ public void setConnectionDelegate (ClientConnectionDelegate connectionDelegate)
+ {
+ this.connectionfactoryCallbackHandler =
+ new ConnectionFactoryCallbackHandler(connectionDelegate);
+ }
+
public MessageCallbackHandler unregisterHandler(int consumerID)
{
return (MessageCallbackHandler)callbackHandlers.remove(new Integer(consumerID));
Added: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.client.remoting;
+
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.server.endpoint.ConnectionFactoryUpdateMessage;
+import org.jboss.logging.Logger;
+
+/**
+ * This class will manage ConnectionFactory messages updates
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * <p/>
+ * $Id$
+ */
+public class ConnectionFactoryCallbackHandler implements CallbackHandler
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ ClientConnectionDelegate connectionDelegate;
+ ConnectionState state;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ protected static final Logger log = Logger.getLogger(ConnectionFactoryCallbackHandler.class);
+ private static boolean trace = log.isTraceEnabled();
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ConnectionFactoryCallbackHandler(ClientConnectionDelegate connectionDelegate)
+ {
+ this.connectionDelegate = connectionDelegate;
+ this.state = (ConnectionState)connectionDelegate.getState();
+ }
+
+
+ // Implementation of CallbackHandler ------------------------------------------------------------
+ public void handleMessage(Object message)
+ {
+ ConnectionFactoryUpdateMessage updateMessage = (ConnectionFactoryUpdateMessage) message;
+
+
+ if (getState().getClusteredConnectionFactoryDelegate() != null &&
+ getState().getClusteredConnectionFactoryDelegate()
+ instanceof ClientClusteredConnectionFactoryDelegate)
+ {
+ ClientClusteredConnectionFactoryDelegate delegate =
+ (ClientClusteredConnectionFactoryDelegate) getState().
+ getClusteredConnectionFactoryDelegate();
+
+ delegate.updateFailoverInfo(updateMessage.getDelegates(), updateMessage.getFailoverMap());
+ }
+
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected ConnectionState getState()
+ {
+ if (state==null)
+ {
+ this.state = (ConnectionState)connectionDelegate.getState();
+ }
+ return this.state;
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+}
Property changes on: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -51,7 +51,7 @@
*
* $Id$
*/
-public class MessageCallbackHandler
+public class MessageCallbackHandler implements CallbackHandler
{
// Constants -----------------------------------------------------
@@ -250,10 +250,12 @@
/**
* Handles a message sent from the server
- * @param msg The message
+ * @param message The message
*/
- public void handleMessage(MessageProxy msg)
- {
+ public void handleMessage(Object message)
+ {
+ MessageProxy msg = (MessageProxy) message;
+
if (trace) { log.trace("Receiving message " + msg + " from the remoting layer"); }
synchronized (mainLock)
Modified: trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -22,7 +22,7 @@
package org.jboss.jms.server;
import org.jboss.jms.server.connectionfactory.JNDIBindings;
-import org.jboss.jms.client.plugin.LoadBalancingPolicy;
+import org.jboss.jms.client.plugin.LoadBalancingFactory;
import org.jboss.messaging.core.plugin.contract.MessagingComponent;
/**
@@ -45,7 +45,7 @@
int defaultTempQueuePageSize,
int defaultTempQueueDownCacheSize,
boolean clustered,
- LoadBalancingPolicy loadBalancingPolicy) throws Exception;
+ LoadBalancingFactory loadBalancingPolicy) throws Exception;
void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception;
}
Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -23,6 +23,7 @@
import org.jboss.jms.server.endpoint.ConnectionEndpoint;
import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+import java.util.List;
/**
@@ -44,4 +45,11 @@
void handleClientFailure(String remotingSessionID);
boolean containsSession(String remotingClientSessionID);
+
+ /**
+ * Returns a list of active connections on this Manager.
+ * The implementation should make a copy of the list to avoid ConcurrentModificationException
+ * @return
+ */
+ List getActiveConnectionsList();
}
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -12,8 +12,7 @@
import org.jboss.jms.server.ConnectorManager;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.util.ExceptionUtil;
-import org.jboss.jms.client.plugin.LoadBalancingPolicy;
-import org.jboss.jms.client.plugin.RoundRobinLoadBalancingPolicy;
+import org.jboss.jms.client.plugin.LoadBalancingFactory;
import org.jboss.remoting.InvokerLocator;
import org.jboss.system.ServiceMBeanSupport;
import org.w3c.dom.Element;
@@ -39,7 +38,7 @@
protected JNDIBindings jndiBindings;
protected int prefetchSize = 150;
protected boolean clustered;
- protected LoadBalancingPolicy loadBalancingPolicy;
+ protected LoadBalancingFactory loadBalancingFactory;
protected int defaultTempQueueFullSize = 75000;
protected int defaultTempQueuePageSize = 2000;
@@ -67,7 +66,7 @@
this.clientID = clientID;
// by default, a clustered connection uses a round-robin load balancing policy
- this.loadBalancingPolicy = new RoundRobinLoadBalancingPolicy();
+ this.loadBalancingFactory = LoadBalancingFactory.getDefaultFactory();
}
// ServiceMBeanSupport overrides ---------------------------------
@@ -123,7 +122,7 @@
locatorURI, enablePing, prefetchSize,
defaultTempQueueFullSize, defaultTempQueuePageSize,
defaultTempQueueDownCacheSize, clustered,
- loadBalancingPolicy);
+ loadBalancingFactory);
InvokerLocator locator = new InvokerLocator(locatorURI);
@@ -276,19 +275,19 @@
this.clustered = clustered;
}
- public LoadBalancingPolicy getLoadBalancingPolicy()
+ public LoadBalancingFactory getLoadBalancingFactory()
{
- return loadBalancingPolicy;
+ return loadBalancingFactory;
}
- public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy)
+ public void setLoadBalancingFactory(LoadBalancingFactory loadBalancingFactory)
{
if (started)
{
log.warn("Load balancing policy can only be changed when connection factory is stopped");
return;
}
- this.loadBalancingPolicy = loadBalancingPolicy;
+ this.loadBalancingFactory = loadBalancingFactory;
}
// JMX managed operations ----------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -38,12 +38,15 @@
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.plugin.LoadBalancingPolicy;
+import org.jboss.jms.client.plugin.LoadBalancingFactory;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
import org.jboss.jms.server.ConnectionFactoryManager;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.Version;
import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ConnectionFactoryUpdateMessage;
import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.util.JNDIUtil;
@@ -53,6 +56,7 @@
import org.jboss.messaging.core.plugin.contract.Replicator;
import org.jboss.messaging.core.plugin.contract.FailoverMapper;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.remoting.callback.Callback;
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -103,7 +107,7 @@
// ConnectionFactoryManager implementation -----------------------
/**
- * @param loadBalancingPolicy - ignored for non-clustered connection factories.
+ * @param loadBalancingFactory - ignored for non-clustered connection factories.
*/
public synchronized void registerConnectionFactory(String uniqueName,
String clientID,
@@ -115,7 +119,7 @@
int defaultTempQueuePageSize,
int defaultTempQueueDownCacheSize,
boolean clustered,
- LoadBalancingPolicy loadBalancingPolicy)
+ LoadBalancingFactory loadBalancingFactory)
throws Exception
{
log.debug(this + " registering connection factory '" + uniqueName +
@@ -170,7 +174,7 @@
// Create a clustered delegate
Map localDelegates = replicator.get(CF_PREFIX + uniqueName);
- delegate = createClusteredDelegate(localDelegates.values(), loadBalancingPolicy);
+ delegate = createClusteredDelegate(localDelegates.values(), loadBalancingFactory);
log.debug(this + " created clustered delegate " + delegate);
}
@@ -324,7 +328,8 @@
String uniqueName = sKey.substring(CF_PREFIX.length());
- log.debug(this + " received '" + uniqueName + "' connection factory update " + updatedReplicantMap);
+ log.debug(this + " received '" + uniqueName +
+ "' connection factory update " + updatedReplicantMap);
ClientClusteredConnectionFactoryDelegate del =
(ClientClusteredConnectionFactoryDelegate)delegates.get(uniqueName);
@@ -351,6 +356,8 @@
}
rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+
+ endpoint.updateClusteredClients(delArr, failoverMap);
}
}
catch (Exception e)
@@ -397,7 +404,7 @@
* @param localDelegates - Collection<ClientConnectionFactoryDelegate>
*/
private ClientClusteredConnectionFactoryDelegate
- createClusteredDelegate(Collection localDelegates, LoadBalancingPolicy loadBalancingPolicy)
+ createClusteredDelegate(Collection localDelegates, LoadBalancingFactory loadBalancingFactory)
throws Exception
{
log.trace(this + " creating a clustered ConnectionFactoryDelegate based on " + localDelegates);
@@ -425,7 +432,8 @@
return new ClientClusteredConnectionFactoryDelegate(delegates,
failoverMap,
- loadBalancingPolicy);
+ loadBalancingFactory.
+ createLoadBalancingPolicy(delegates));
}
private void rebindConnectionFactory(Context ic, JNDIBindings jndiBindings,
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -27,6 +27,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import javax.jms.JMSException;
@@ -58,7 +60,9 @@
protected Map jmsClients;
protected Map sessions;
-
+
+ protected Set activeConnections;
+
// Constructors --------------------------------------------------
public SimpleConnectionManager()
@@ -66,6 +70,8 @@
jmsClients = new HashMap();
sessions = new HashMap();
+
+ activeConnections = new HashSet();
}
// ConnectionManager ---------------------------------------------
@@ -83,6 +89,8 @@
endpoints.put(remotingClientSessionID, endpoint);
sessions.put(remotingClientSessionID, jmsClientVMId);
+
+ activeConnections.add(endpoint);
log.debug("registered connection " + endpoint + " as " +
Util.guidToString(remotingClientSessionID));
@@ -95,7 +103,12 @@
if (endpoints != null)
{
ConnectionEndpoint e = (ConnectionEndpoint)endpoints.remove(remotingClientSessionID);
-
+
+ if (e != null)
+ {
+ endpoints.remove(e);
+ }
+
log.debug("unregistered connection " + e + " with remoting session ID " +
Util.guidToString(remotingClientSessionID));
@@ -174,7 +187,15 @@
{
return sessions.containsKey(remotingClientSessionID);
}
-
+
+ public synchronized List getActiveConnectionsList()
+ {
+ // I will make a copy to avoid ConcurrentModification
+ ArrayList list = new ArrayList();
+ list.addAll(activeConnections);
+ return list;
+ }
+
/*
* Used in testing only
*/
Added: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import java.util.Map;
+import java.io.Serializable;
+
+/**
+ * This class holds the updated information about the server to ConnectionFactories
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ConnectionFactoryUpdateMessage implements Serializable
+{
+
+
+ // Constants ------------------------------------------------------------------------------------
+
+ static final long serialVersionUID = 7978093036163402989L;
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private ClientConnectionFactoryDelegate[] delegates;
+ private Map failoverMap;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ConnectionFactoryUpdateMessage(ClientConnectionFactoryDelegate[] delegates,
+ Map failoverMap)
+ {
+ this.delegates = delegates;
+ this.failoverMap = failoverMap;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public ClientConnectionFactoryDelegate[] getDelegates()
+ {
+ return delegates;
+ }
+
+ public void setDelegates(ClientConnectionFactoryDelegate[] delegates)
+ {
+ this.delegates = delegates;
+ }
+
+ public Map getFailoverMap()
+ {
+ return failoverMap;
+ }
+
+ public void setFailoverMap(Map failoverMap)
+ {
+ this.failoverMap = failoverMap;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -114,6 +114,7 @@
private int defaultTempQueueFullSize;
private int defaultTempQueuePageSize;
private int defaultTempQueueDownCacheSize;
+ private ServerConnectionFactoryEndpoint cfendpoint;
private byte usingVersion;
@@ -126,14 +127,17 @@
* @param failedNodeID - zero or positive values mean connection creation attempt is result of
* failover. Negative values are ignored (mean regular connection creation attempt).
*/
- protected ServerConnectionEndpoint(ServerPeer serverPeer, String clientID,
+ public ServerConnectionEndpoint(ServerPeer serverPeer, String clientID,
String username, String password, int prefetchSize,
int defaultTempQueueFullSize,
int defaultTempQueuePageSize,
int defaultTempQueueDownCacheSize,
- int failedNodeID) throws Exception
+ int failedNodeID,
+ ServerConnectionFactoryEndpoint cfendpoint) throws Exception
{
this.serverPeer = serverPeer;
+
+ this.cfendpoint = cfendpoint;
sm = serverPeer.getSecurityManager();
tr = serverPeer.getTxRepository();
@@ -162,7 +166,7 @@
this.failedNodeID = new Integer(failedNodeID);
}
}
-
+
// ConnectionDelegate implementation ------------------------------------------------------------
public SessionDelegate createSessionDelegate(boolean transacted,
@@ -203,7 +207,7 @@
log.debug("created and registered " + ep);
ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
-
+
log.debug("created " + d);
return d;
@@ -346,7 +350,7 @@
cm.unregisterConnection(jmsClientVMId, remotingClientSessionId);
JMSDispatcher.instance.unregisterTarget(new Integer(id));
-
+
closed = true;
}
catch (Throwable t)
@@ -483,7 +487,12 @@
"must be using pull callbacks");
}
}
-
+
+ public ServerInvokerCallbackHandler getCallbackHandler()
+ {
+ return callbackHandler;
+ }
+
// IOC
public void setRemotingInformation(String jmsClientVMId, String remotingClientSessionId)
{
@@ -504,7 +513,12 @@
{
return serverPeer;
}
-
+
+ public ServerConnectionFactoryEndpoint getConnectionFactoryEndpoint()
+ {
+ return cfendpoint;
+ }
+
public String toString()
{
return "ConnectionEndpoint[" + id + "]";
@@ -537,11 +551,6 @@
return defaultTempQueueDownCacheSize;
}
- ServerInvokerCallbackHandler getCallbackHandler()
- {
- return callbackHandler;
- }
-
int getConnectionID()
{
return id;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -23,6 +23,7 @@
import javax.jms.JMSException;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.connectionfactory.JNDIBindings;
@@ -31,6 +32,12 @@
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.IDBlock;
+import org.jboss.remoting.callback.Callback;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import java.util.List;
+import java.util.Map;
+import java.util.Iterator;
/**
* Concrete implementation of ConnectionFactoryEndpoint
@@ -151,7 +158,7 @@
int failedNodeID)
throws Exception
{
- log.debug("creating a new connection for user " + username);
+ log.debug("creating a new connection for user " + username);
// authenticate the user
serverPeer.getSecurityManager().authenticate(username, password);
@@ -173,7 +180,7 @@
ServerConnectionEndpoint endpoint =
new ServerConnectionEndpoint(serverPeer, clientID, username, password, prefetchSize,
defaultTempQueueFullSize, defaultTempQueuePageSize,
- defaultTempQueueDownCacheSize, failedNodeID);
+ defaultTempQueueDownCacheSize, failedNodeID, this);
int connectionID = endpoint.getConnectionID();
@@ -209,7 +216,6 @@
}
}
-
// Public ---------------------------------------------------------------------------------------
public int getID()
@@ -222,6 +228,42 @@
return jndiBindings;
}
+
+ /** Sends an update message on ClusteredConnectionFactories.
+ * Observation: I have placed here, because if we decide to lock the ServerEndpoint
+ * while we send updates, we would need the method here to perform WriteLocks on objects */
+ public void updateClusteredClients(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)
+ throws Exception
+ {
+
+ // Should we lock the CFEndpoint now allowing new connections to come while doing this?
+
+ List connectionList = serverPeer.getConnectionManager().getActiveConnectionsList();
+
+
+ log.info("Sending update list to active connections. It got " +
+ connectionList.size() + " elements");
+
+ ConnectionFactoryUpdateMessage message = new ConnectionFactoryUpdateMessage(delegates,
+ failoverMap);
+ Callback callback = new Callback(message);
+
+ for (Iterator iter = connectionList.iterator(); iter.hasNext();)
+ {
+ ServerConnectionEndpoint connEndpoint = (ServerConnectionEndpoint) iter.next();
+ log.trace("Updating connection " + connEndpoint);
+ try
+ {
+ connEndpoint.getCallbackHandler().handleCallback(callback);
+ }
+ catch (Exception e)
+ {
+ log.error("Callback failed on connection " + connEndpoint, e);
+ }
+ }
+
+ }
+
public String toString()
{
return "ConnectionFactoryEndpoint[" + id + "]";
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -162,7 +162,7 @@
handleVersion(obj, dos);
try
- {
+ {
if (obj instanceof InvocationRequest)
{
if (trace) { log.trace("writing InvocationRequest"); }
@@ -180,8 +180,15 @@
if (params != null && params.length > 0 && params[0] instanceof Callback)
{
Callback callback = (Callback) params[0];
- MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
- param = mm.getLoad();
+ if (callback.getParameter() instanceof MessagingMarshallable)
+ {
+ MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+ param = mm.getLoad();
+ }
+ else
+ {
+ param = callback.getParameter();
+ }
}
else
{
Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,105 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.jms.client.container.ClusteringAspect;
+import java.util.Map;
+import java.lang.reflect.Method;
+
+/**
+ * This class tests internal methods of ClusteringAspect
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ClusteringAspectInternalTest extends MessagingTestCase
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ClusteringAspectInternalTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void testGuessFailoverMap() throws Exception
+ {
+ Map failoverMapTest = new java.util.HashMap();
+ failoverMapTest.put(new Integer(1), new Integer(3));
+ failoverMapTest.put(new Integer(3), new Integer(4));
+ failoverMapTest.put(new Integer(4), new Integer(1));
+
+ assertEquals(new Integer(1), callGuessFailoverID(failoverMapTest, new Integer(0)));
+ assertEquals(new Integer(3), callGuessFailoverID(failoverMapTest, new Integer(2)));
+ assertEquals(new Integer(1), callGuessFailoverID(failoverMapTest, new Integer(5)));
+ assertEquals(new Integer(3), callGuessFailoverID(failoverMapTest, new Integer(1)));
+
+ }
+
+
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ /**
+ * guessFailoverID is a private method that I want to test.
+ * This method will use reflection to call that method instead of making it public
+ * @param map
+ * @param value
+ */
+ private Integer callGuessFailoverID(Map map, Integer value) throws Exception
+ {
+ Method method = ClusteringAspect.class.getDeclaredMethod("guessFailoverID",
+ new Class[]{Map.class, Integer.class});
+
+ method.setAccessible(true);
+
+ return (Integer) method.invoke(null, new Object[]{map, value});
+ }
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,156 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import javax.jms.Connection;
+import javax.jms.Session;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ConnectionFactoryUpdateTest extends ClusteringTestBase
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ConnectionFactoryUpdateTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+ /**
+ */
+ public void testUpdateConnectionFactory() throws Exception
+ {
+ Connection conn = cf.createConnection();
+ JBossConnectionFactory jbcf = (JBossConnectionFactory) cf;
+ ClientClusteredConnectionFactoryDelegate cfDelegate =
+ (ClientClusteredConnectionFactoryDelegate) jbcf.getDelegate();
+ assertEquals(3, cfDelegate.getDelegates().length);
+
+
+ Connection conn1 = cf.createConnection();
+
+ assertEquals(1, getServerId(conn1));
+
+ ServerManagement.killAndWait(1);
+
+ Thread.sleep(5000);
+
+ // first part of the test, verifies if the CF was updated
+ assertEquals(2, cfDelegate.getDelegates().length);
+ conn.close();
+
+ Thread.sleep(25000);
+
+ // Second part, verifies a possible racing condition on failoverMap and handleFilover
+
+ log.info("ServerId=" + getServerId(conn1));
+ assertTrue(1 != getServerId(conn1));
+
+ //Session sess = conn1.createSession(true, Session.SESSION_TRANSACTED);
+ conn1.close();
+
+ }
+
+ /**
+ * Test if an update on failoverMap on the connectionFactory would
+ * cause any problems during failover
+ */
+ public void testUpdateConnectionFactoryRaceCondition() throws Exception
+ {
+ // This connection needs to be opened, as we need the callback to update CF from this conn
+ Connection conn = cf.createConnection();
+ JBossConnectionFactory jbcf = (JBossConnectionFactory) cf;
+ ClientClusteredConnectionFactoryDelegate cfDelegate =
+ (ClientClusteredConnectionFactoryDelegate) jbcf.getDelegate();
+ assertEquals(3, cfDelegate.getDelegates().length);
+
+ Connection conn1 = cf.createConnection();
+
+ Connection conn2 = cf.createConnection();
+
+ assertEquals(2, getServerId(conn2));
+
+ assertEquals(1, getServerId(conn1));
+
+ ConnectionState state = this.getConnectionState(conn1);
+
+ // Disable Leasing for Failover
+ state.getRemotingConnection().removeConnectionListener();
+
+ ServerManagement.killAndWait(1);
+
+ Thread.sleep(15000);
+
+ // This will force Failover from Valve to kick in
+ Session sess = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+ // first part of the test, verifies if the CF was updated
+ assertEquals(2, cfDelegate.getDelegates().length);
+
+ log.info("ServerId=" + getServerId(conn1));
+ assertTrue(1 != getServerId(conn1));
+
+ conn.close();
+ conn1.close();
+ conn2.close();
+
+ }
+
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ nodeCount = 3;
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-18 16:12:55 UTC (rev 1981)
@@ -1635,6 +1635,9 @@
}
conn.start();
+ // Disable Lease for this test.. as the ValveAspect should capture this
+ getConnectionState(conn).getRemotingConnection().removeConnectionListener();
+
// make sure we're connecting to node 1
int nodeID = ((ConnectionState)((DelegateSupport)((JBossConnection)conn).
More information about the jboss-cvs-commits
mailing list