[jboss-cvs] JBoss Messaging SVN: r3019 - in trunk: src/main/org/jboss/jms/client/delegate and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 21 00:19:49 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-08-21 00:19:48 -0400 (Tue, 21 Aug 2007)
New Revision: 3019
Added:
trunk/src/main/org/jboss/jms/delegate/TopologyResult.java
trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java
Modified:
trunk/build-messaging.xml
trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java
trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java
trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java
trunk/src/main/org/jboss/jms/server/ConnectionManager.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java
trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1027 and http://jira.jboss.org/jira/browse/JBMESSAGING-1038
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/build-messaging.xml 2007-08-21 04:19:48 UTC (rev 3019)
@@ -123,8 +123,8 @@
This module is based on Java 1.4
-->
- <property name="javac.target" value="1.4"/>
- <property name="javac.source" value="1.4"/>
+ <property name="javac.target" value="1.5"/>
+ <property name="javac.source" value="1.5"/>
<property name="javac.debug" value="true"/>
<property name="javac.optimize" value="false"/>
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -27,11 +27,20 @@
import javax.jms.JMSException;
import org.jboss.jms.client.plugin.LoadBalancingPolicy;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler;
+import org.jboss.jms.client.container.JMSClientVMIdentifier;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.IDBlock;
+import org.jboss.jms.delegate.TopologyResult;
import org.jboss.jms.exception.MessagingNetworkFailureException;
+import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyResponse;
+import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
import org.jboss.logging.Logger;
+import org.jboss.messaging.util.Version;
/**
* A ClientClusteredConnectionFactoryDelegate.
@@ -48,7 +57,7 @@
*
* $Id$
*/
-public class ClientClusteredConnectionFactoryDelegate
+public class ClientClusteredConnectionFactoryDelegate extends DelegateSupport
implements Serializable, ConnectionFactoryDelegate
{
// Constants ------------------------------------------------------------------------------------
@@ -57,11 +66,126 @@
private static final Logger log =
Logger.getLogger(ClientClusteredConnectionFactoryDelegate.class);
+ private static boolean trace = log.isTraceEnabled();
+ // Serialization and CallbackHandler code -------------------------------------------------------
+
+ private transient JMSRemotingConnection remoting;
+ private transient ClientConnectionFactoryDelegate currentDelegate;
+
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException
+ {
+ s.defaultReadObject();
+ establishCallback();
+ }
+
+ public synchronized void establishCallback()
+ {
+
+ log.debug(" Establishing CFCallback\n");
+
+ for (int server = delegates.length - 1; server >= 0; server--)
+ {
+ if (trace) log.trace("Closing current callback");
+ closeCallback();
+
+ if (trace) log.trace("Trying communication on server(" + server + ")=" + delegates[server].getServerLocatorURI());
+ try
+ {
+ remoting = new JMSRemotingConnection(delegates[server].getServerLocatorURI(), true);
+ remoting.start();
+ currentDelegate = delegates[server];
+ if (trace) log.trace("Adding callback");
+ addCallback(delegates[server]);
+ if (trace) log.trace("Getting topology");
+ TopologyResult topology = getTopology();
+ if (trace) log.trace("delegates.size = " + topology.getDelegates().length);
+
+ break;
+ }
+ catch (Throwable e)
+ {
+ log.warn("Server communication to server[" + server + "] (" +
+ delegates[server].getServerLocatorURI() + ") during establishCallback was broken, " +
+ "trying the next one", e);
+ if (remoting != null)
+ {
+ remoting.stop();
+ remoting = null;
+ currentDelegate = null;
+ }
+ }
+ }
+ }
+
+ private void addCallback(ClientConnectionFactoryDelegate delegate) throws Throwable
+ {
+ remoting.getCallbackManager().setConnectionfactoryCallbackHandler(new ConnectionFactoryCallbackHandler(this, remoting));
+
+ ConnectionFactoryAddCallbackRequest request =
+ new ConnectionFactoryAddCallbackRequest (JMSClientVMIdentifier.instance,
+ remoting.getRemotingClient().getSessionId(),
+ delegate.getID(),
+ Version.instance().getProviderIncrementingVersion());
+
+ remoting.getRemotingClient().invoke(request, null);
+
+ }
+
+ private void removeCallback() throws Throwable
+ {
+ ConnectionFactoryRemoveCallbackRequest request =
+ new ConnectionFactoryRemoveCallbackRequest (JMSClientVMIdentifier.instance,
+ remoting.getRemotingClient().getSessionId(),
+ currentDelegate.getID(),
+ Version.instance().getProviderIncrementingVersion());
+
+ remoting.getRemotingClient().invoke(request, null);
+ }
+
+ protected void finalize() throws Throwable
+ {
+ super.finalize();
+ closeCallback();
+
+ }
+
+ public void closeCallback()
+ {
+ if (remoting != null)
+ {
+ try
+ {
+ removeCallback();
+ }
+ catch (Throwable warn)
+ {
+ log.warn(warn, warn);
+ }
+
+ try
+ {
+ remoting.removeConnectionListener();
+ remoting.stop();
+ currentDelegate = null;
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ remoting = null;
+ }
+ }
+ // Serialization and CallbackHandler code -------------------------------------------------------
+
+
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
+ private String uniqueName;
+
private ClientConnectionFactoryDelegate[] delegates;
// Map <Integer(nodeID)->Integer(failoverNodeID)>
@@ -75,11 +199,13 @@
// Constructors ---------------------------------------------------------------------------------
- public ClientClusteredConnectionFactoryDelegate(ClientConnectionFactoryDelegate[] delegates,
+ public ClientClusteredConnectionFactoryDelegate(String uniqueName,
+ ClientConnectionFactoryDelegate[] delegates,
Map failoverMap,
LoadBalancingPolicy loadBalancingPolicy,
boolean supportsFailover)
{
+ this.uniqueName = uniqueName;
this.delegates = delegates;
this.failoverMap = failoverMap;
this.loadBalancingPolicy = loadBalancingPolicy;
@@ -167,7 +293,36 @@
{
return supportsFailover;
}
-
+
+ public String getUniqueName()
+ {
+ return uniqueName;
+ }
+
+
+ public TopologyResult getTopology() throws JMSException
+ {
+
+ try
+ {
+ ConnectionFactoryGetTopologyRequest request =
+ new ConnectionFactoryGetTopologyRequest(currentDelegate.getID());
+
+ ConnectionFactoryGetTopologyResponse response = (ConnectionFactoryGetTopologyResponse)remoting.getRemotingClient().invoke(request, null);
+
+
+ TopologyResult topology = (TopologyResult)response.getResponse();
+
+ updateFailoverInfo(topology.getDelegates(), topology.getFailoverMap());
+
+ return topology;
+ }
+ catch (Throwable e)
+ {
+ throw handleThrowable(e);
+ }
+ }
+
//Only used in testing
public void setSupportsFailover(boolean failover)
{
@@ -186,10 +341,7 @@
failoverMap.putAll(newFailoverMap);
- if (supportsLoadBalancing)
- {
- loadBalancingPolicy.updateView(delegates);
- }
+ loadBalancingPolicy.updateView(delegates);
}
public String toString()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -132,11 +132,6 @@
// There is one RM per server, so we need to merge the rms if necessary
ResourceManagerFactory.instance.handleFailover(serverID, newDelegate.getServerID());
-
- // The remoting connection was replaced by a new one..
- // we have to set the connection Delegate on the CallbackManager to avoid leaks
- remotingConnection.getCallbackManager().setConnectionDelegate(this);
-
client = thisState.getRemotingConnection().getRemotingClient();
serverID = newDelegate.getServerID();
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -33,6 +33,7 @@
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.jms.delegate.TopologyResult;
import org.jboss.jms.exception.MessagingNetworkFailureException;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
@@ -63,6 +64,9 @@
// Attributes -----------------------------------------------------------------------------------
//This data is needed in order to create a connection
+
+ private String uniqueName;
+
private String serverLocatorURI;
private Version serverVersion;
@@ -99,11 +103,12 @@
// Constructors ---------------------------------------------------------------------------------
- public ClientConnectionFactoryDelegate(String objectID, int serverID, String serverLocatorURI,
+ public ClientConnectionFactoryDelegate(String uniqueName, String objectID, int serverID, String serverLocatorURI,
Version serverVersion, boolean clientPing)
{
super(objectID);
-
+
+ this.uniqueName = uniqueName;
this.serverID = serverID;
this.serverLocatorURI = serverLocatorURI;
this.serverVersion = serverVersion;
@@ -115,7 +120,7 @@
}
// ConnectionFactoryDelegate implementation -----------------------------------------------------
-
+
public CreateConnectionResult createConnectionDelegate(String username,
String password,
int failedNodeID)
@@ -186,8 +191,6 @@
connectionDelegate.setRemotingConnection(remotingConnection);
connectionDelegate.setVersionToUse(version);
-
- remotingConnection.getCallbackManager().setConnectionDelegate(connectionDelegate);
}
else
{
@@ -226,7 +229,12 @@
return (byte[])doInvoke(theClient, req);
}
-
+
+ public TopologyResult getTopology() throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Public ---------------------------------------------------------------------------------------
public String toString()
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -62,12 +62,9 @@
private static boolean trace = log.isTraceEnabled();
- protected static CallbackManager theManager;
-
// Attributes -----------------------------------------------------------------------------------
- // Map<Long(lookup)-ClientConsumer>
- protected Map callbackHandlers;
+ protected Map<String, ClientConsumer> callbackHandlers;
protected ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler;
// Constructors ---------------------------------------------------------------------------------
@@ -115,12 +112,17 @@
}
else if (parameter instanceof ConnectionFactoryUpdate)
{
- ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)parameter;
- if (trace) { log.trace(this + " receiving cluster view change " + viewChange); }
+ if (connectionfactoryCallbackHandler == null)
+ {
+ log.warn("ConnectionFactoryUpdate was received but there is no callbackHandler set");
+ }
+ else
+ {
+ ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)parameter;
- if (connectionfactoryCallbackHandler != null)
- {
+ if (trace) { log.trace(this + " receiving cluster view change " + viewChange); }
+
connectionfactoryCallbackHandler.handleMessage(viewChange);
}
}
@@ -137,17 +139,22 @@
callbackHandlers.put(consumerID, handler);
}
- public void setConnectionDelegate(ClientConnectionDelegate connectionDelegate)
- {
- this.connectionfactoryCallbackHandler =
- new ConnectionFactoryCallbackHandler(connectionDelegate);
- }
-
public ClientConsumer unregisterHandler(String consumerID)
{
return (ClientConsumer)callbackHandlers.remove(consumerID);
}
+
+ public ConnectionFactoryCallbackHandler getConnectionfactoryCallbackHandler()
+ {
+ return connectionfactoryCallbackHandler;
+ }
+
+ public void setConnectionfactoryCallbackHandler(ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler)
+ {
+ this.connectionfactoryCallbackHandler = connectionfactoryCallbackHandler;
+ }
+
public String toString()
{
return "CallbackManager[" + Integer.toHexString(hashCode()) + "]";
Modified: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -23,10 +23,11 @@
package org.jboss.jms.client.remoting;
import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
import org.jboss.logging.Logger;
+import org.jboss.remoting.ConnectionListener;
+import org.jboss.remoting.Client;
+import java.lang.ref.WeakReference;
/**
* This class will manage ConnectionFactory messages updates
@@ -43,8 +44,9 @@
// Attributes -----------------------------------------------------------------------------------
- private ClientConnectionDelegate connectionDelegate;
- private ConnectionState state;
+ // Without a WeakReference here, the CF would never be released!
+ private WeakReference<ClientClusteredConnectionFactoryDelegate> delegateRef;
+ private JMSRemotingConnection remotingConnection;
// Static ---------------------------------------------------------------------------------------
@@ -52,9 +54,12 @@
// Constructors ---------------------------------------------------------------------------------
- public ConnectionFactoryCallbackHandler(ClientConnectionDelegate connectionDelegate)
+ public ConnectionFactoryCallbackHandler(ClientClusteredConnectionFactoryDelegate cfDelegate,
+ JMSRemotingConnection remotingConnection)
{
- this.connectionDelegate = connectionDelegate;
+ this.delegateRef = new WeakReference<ClientClusteredConnectionFactoryDelegate>(cfDelegate);
+ this.remotingConnection = remotingConnection;
+ this.remotingConnection.addPlainConnectionListener(new CallbackConnectionListener());
}
// Public ---------------------------------------------------------------------------------------
@@ -65,44 +70,42 @@
ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)message;
- ConnectionState state = getState();
-
- if (state != null)
- {
- Object d = state.getClusteredConnectionFactoryDelegate();
-
- if (d instanceof ClientClusteredConnectionFactoryDelegate)
- {
- ClientClusteredConnectionFactoryDelegate clusteredDelegate =
- (ClientClusteredConnectionFactoryDelegate)d;
+ ClientClusteredConnectionFactoryDelegate delegate = delegateRef.get();
- clusteredDelegate.updateFailoverInfo(viewChange.getDelegates(),
- viewChange.getFailoverMap());
- }
- }
+ if (delegate!=null)
+ {
+ delegate.updateFailoverInfo(viewChange.getTopology().getDelegates(),
+ viewChange.getTopology().getFailoverMap());
+ }
}
+
public String toString()
{
- return "ConnectionFactoryCallbackHandler[" + connectionDelegate + "]";
+ return "ConnectionFactoryCallbackHandler[" + delegateRef.get() + "]";
}
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
- // When ConnectionFactoryCallbackHandler is created, is not guaranteed that state is set
- // as this could be later initialized by the aop stack
- protected ConnectionState getState()
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ class CallbackConnectionListener implements ConnectionListener
{
- if (state==null)
+
+ public void handleConnectionException(Throwable throwable, Client client)
{
- this.state = (ConnectionState)connectionDelegate.getState();
+ ClientClusteredConnectionFactoryDelegate delegate = delegateRef.get();
+
+ if (delegate!=null)
+ {
+ delegate.establishCallback();
+ }
+
+ //remotingConnection.removePlainConnectionListener(this);
}
- return this.state;
}
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.net.MalformedURLException;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.wireformat.JMSWireFormat;
@@ -31,6 +32,7 @@
import org.jboss.remoting.Client;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.ConnectionListener;
import org.jboss.remoting.callback.CallbackPoller;
import org.jboss.remoting.callback.InvokerCallbackHandler;
import org.jboss.remoting.transport.bisocket.Bisocket;
@@ -244,7 +246,7 @@
// Constructors ---------------------------------------------------------------------------------
- public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Throwable
+ public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Exception
{
serverLocator = new InvokerLocator(serverLocatorURI);
this.clientPing = clientPing;
@@ -382,6 +384,16 @@
return true;
}
+ public synchronized void addPlainConnectionListener(ConnectionListener listener)
+ {
+ client.addConnectionListener(listener);
+ }
+
+ public synchronized void removePlainConnectionListener(ConnectionListener listener)
+ {
+ client.removeConnectionListener(listener);
+ }
+
public synchronized ConsolidatedRemotingConnectionListener getConnectionListener()
{
return remotingConnectionListener;
Modified: trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -43,7 +43,10 @@
String password,
int failedNodeID)
throws JMSException;
+
+ /** Get the current Cluster topology associated with a ClusteredConnectionFactory */
+ TopologyResult getTopology() throws JMSException;
- byte[] getClientAOPStack() throws JMSException;
+ byte[] getClientAOPStack() throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -43,18 +43,10 @@
*/
public interface SessionEndpoint extends Closeable
{
- /**
- * @param failoverChannelID - the ID of the channel for which there is a failover process in
- * progress. -1 means regular (non-failover) consumer delegate creation.
- */
ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
boolean connectionConsumer, boolean autoFlowControl) throws JMSException;
- /**
- * @param failoverChannelID - the ID of the channel for which there is a failover process in
- * progress. -1 means regular (non-failover) browser delegate creation.
- */
BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector) throws JMSException;
/**
Added: trunk/src/main/org/jboss/jms/delegate/TopologyResult.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/TopologyResult.java (rev 0)
+++ trunk/src/main/org/jboss/jms/delegate/TopologyResult.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.delegate;
+
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.wireformat.PacketSupport;
+import org.jboss.messaging.util.Streamable;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class TopologyResult implements Streamable
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+ private ClientConnectionFactoryDelegate[] delegates;
+
+ private Map failoverMap;
+
+ String uniqueName;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+
+ public TopologyResult()
+ {
+ }
+
+ public TopologyResult(String uniqueName, ClientConnectionFactoryDelegate[] delegates,
+ Map failoverMap)
+ {
+ this.uniqueName = uniqueName;
+
+ this.delegates = delegates;
+
+ this.failoverMap = failoverMap;
+ }
+
+
+
+ // Public ---------------------------------------------------------------------------------------
+
+
+ public ClientConnectionFactoryDelegate[] getDelegates()
+ {
+ return delegates;
+ }
+
+ public void setDelegates(ClientConnectionFactoryDelegate[] delegates)
+ {
+ this.delegates = delegates;
+ }
+
+ public Map getFailoverMap()
+ {
+ return failoverMap;
+ }
+
+ public void setFailoverMap(Map failoverMap)
+ {
+ this.failoverMap = failoverMap;
+ }
+
+ public String getUniqueName()
+ {
+ return uniqueName;
+ }
+
+ public void setUniqueName(String uniqueName)
+ {
+ this.uniqueName = uniqueName;
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ uniqueName = is.readUTF();
+
+ int len = is.readInt();
+
+ delegates = new ClientConnectionFactoryDelegate[len];
+
+ for (int i = 0; i < len; i++)
+ {
+ delegates[i] = new ClientConnectionFactoryDelegate();
+
+ delegates[i].read(is);
+ }
+
+ len = is.readInt();
+
+ failoverMap = new HashMap(len);
+
+ for (int c = 0; c < len; c++)
+ {
+ Integer i = new Integer(is.readInt());
+
+ Integer j = new Integer(is.readInt());
+
+ failoverMap.put(i, j);
+ }
+ }
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ os.writeUTF(uniqueName);
+
+ int len = delegates.length;
+
+ os.writeInt(len);
+
+ for (int i = 0; i < len; i++)
+ {
+ delegates[i].write(os);
+ }
+
+ os.writeInt(failoverMap.size());
+
+ Iterator iter = failoverMap.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ Integer i = (Integer)entry.getKey();
+
+ Integer j = (Integer)entry.getValue();
+
+ os.writeInt(i.intValue());
+
+ os.writeInt(j.intValue());
+ }
+
+ }
+
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer("UpdateConnectionFactoryResult[");
+
+ if (delegates != null)
+ {
+ for(int i = 0; i < delegates.length; i++)
+ {
+ sb.append(delegates[i]);
+ if (i < delegates.length - 1)
+ {
+ sb.append(',');
+ }
+ }
+ }
+
+ sb.append("]");
+
+ return sb.toString();
+
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+}
Property changes on: trunk/src/main/org/jboss/jms/delegate/TopologyResult.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java
===================================================================
--- trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -46,7 +46,7 @@
this(reason, null, null);
}
- public MessagingJMSException(Exception cause) {
+ public MessagingJMSException(Throwable cause) {
this(null, null, cause);
}
Modified: trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java
===================================================================
--- trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -34,7 +34,7 @@
{
private static final long serialVersionUID = 1255764532063281353L;
- public MessagingNetworkFailureException(Exception cause)
+ public MessagingNetworkFailureException(Throwable cause)
{
super(cause);
}
Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,6 +25,7 @@
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.messaging.core.contract.MessagingComponent;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
@@ -59,6 +60,12 @@
*/
List getActiveConnections();
+ void addConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
+
+ void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
+
+ ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName);
+
/**
* @param clientToServer - true if the failure has been detected on a direct connection from
* client to this server, false if the failure has been detected while trying to send a
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -86,7 +86,7 @@
private Map delegates;
private Replicator replicator;
-
+
// Constructors ---------------------------------------------------------------------------------
public ConnectionFactoryJNDIMapper(ServerPeer serverPeer) throws Exception
@@ -118,7 +118,7 @@
{
log.debug(this + " registering connection factory '" + uniqueName +
"', bindings: " + jndiBindings);
-
+
// Sanity check
if (delegates.containsKey(uniqueName))
{
@@ -126,11 +126,14 @@
"registered with name " + uniqueName);
}
- String id = GUIDGenerator.generateGUID();
+ // See http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4076040#4076040
+ //String id = GUIDGenerator.generateGUID();
+ String id = uniqueName;
+
Version version = serverPeer.getVersion();
ServerConnectionFactoryEndpoint endpoint =
- new ServerConnectionFactoryEndpoint(id, serverPeer, clientID,
+ new ServerConnectionFactoryEndpoint(uniqueName, id, serverPeer, clientID,
jndiBindings, prefetchSize,
defaultTempQueueFullSize,
defaultTempQueuePageSize,
@@ -145,13 +148,13 @@
{
setupReplicator();
}
-
+
if (supportsFailover && replicator == null)
{
log.warn("supportsFailover attribute is true on connection factory: " + uniqueName + " but post office is non clustered. " +
"So connection factory will *not* support failover");
}
-
+
if (supportsLoadBalancing && replicator == null)
{
log.warn("supportsLoadBalancing attribute is true on connection factory: " + uniqueName + " but post office is non clustered. " +
@@ -161,7 +164,7 @@
boolean creatingClustered = (supportsFailover || supportsLoadBalancing) && replicator != null;
ClientConnectionFactoryDelegate localDelegate =
- new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+ new ClientConnectionFactoryDelegate(uniqueName, id, serverPeer.getServerPeerID(),
locatorURI, version, clientPing);
log.debug(this + " created local delegate " + localDelegate);
@@ -172,18 +175,18 @@
// connection factory list. This will happen locally too, so we will get the replication
// message locally - to avoid updating it again we can ignore any "add" replication updates
// that originate from the current node.
-
+
if (creatingClustered)
{
// Create a clustered delegate
-
+
if (!supportsLoadBalancing)
{
loadBalancingFactory = new NoLoadBalancingLoadBalancingFactory(localDelegate);
}
Map localDelegates = replicator.get(Replicator.CF_PREFIX + uniqueName);
- delegate = createClusteredDelegate(localDelegates.values(), loadBalancingFactory, supportsFailover);
+ delegate = createClusteredDelegate(uniqueName, localDelegates.values(), loadBalancingFactory, endpoint, supportsFailover);
log.debug(this + " created clustered delegate " + delegate);
}
@@ -198,13 +201,13 @@
// Now bind it in JNDI
rebindConnectionFactory(initialContext, jndiBindings, delegate);
-
+
ConnectionFactoryAdvised advised;
-
+
// Need to synchronized to prevent a deadlock
// See http://jira.jboss.com/jira/browse/JBMESSAGING-797
synchronized (AspectManager.instance())
- {
+ {
advised = new ConnectionFactoryAdvised(endpoint);
}
@@ -213,8 +216,8 @@
Dispatcher.instance.registerTarget(id, advised);
// Replicate the change - we will ignore this locally
-
- if (replicator != null) replicator.put(Replicator.CF_PREFIX + uniqueName, localDelegate);
+
+ if (replicator != null) replicator.put(Replicator.CF_PREFIX + uniqueName, localDelegate);
}
public synchronized void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing)
@@ -254,11 +257,11 @@
if (replicator != null)
{
if (!replicator.remove(Replicator.CF_PREFIX + uniqueName))
- {
+ {
throw new IllegalStateException("Cannot find replicant to remove: " +
Replicator.CF_PREFIX + uniqueName);
}
- }
+ }
Dispatcher.instance.unregisterTarget(endpoint.getID(), endpoint);
}
@@ -286,7 +289,7 @@
log.debug(this + " received notification from node " + notification.nodeID );
try
- {
+ {
if (notification.type == ClusterNotification.TYPE_NODE_JOIN || notification.type == ClusterNotification.TYPE_NODE_LEAVE)
{
// We respond to changes in the node-address mapping. This will be replicated whan a
@@ -318,22 +321,25 @@
else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
(notification.data instanceof String) && ((String)notification.data).startsWith(Replicator.CF_PREFIX))
{
+
+ log.debug("Updating CF information for " + notification.data);
// A connection factory has been deployed / undeployed
// NOTE! All connection factories MUST be deployed on all nodes!
// Otherwise the server might failover onto a node which doesn't have that connection factory deployed
// so the connection won't be able to recconnect.
-
+
String key = (String)notification.data;
-
+
String uniqueName = key.substring(Replicator.CF_PREFIX.length());
log.debug(this + " received '" + uniqueName + "' connection factory deploy / undeploy");
ConnectionFactoryDelegate cfd = (ConnectionFactoryDelegate)delegates.get(uniqueName);
-
+
if (cfd == null)
{
+ log.info("cfd == null");
//This is ok - connection factory a might be deployed on node A before being deployed on node B so
//node B might get the notification before it has deployed a itself
}
@@ -341,23 +347,24 @@
{
if (cfd instanceof ClientConnectionFactoryDelegate)
{
- //Non clustered - ignore
-
+ log.info("Non Clustered!!!");
+ //Non clustered - ignore
+
//We still replicate non clustered connection factories since the ClusterPullConnectionFactory
//is non clustered but needs to be available across the cluster
}
else
{
ClientClusteredConnectionFactoryDelegate del = (ClientClusteredConnectionFactoryDelegate)cfd;
-
+
Map updatedReplicantMap = replicator.get(key);
-
+
List newDels = sortDelegatesOnServerID(updatedReplicantMap.values());
-
+
ClientConnectionFactoryDelegate[] delArr =
(ClientConnectionFactoryDelegate[])newDels.
toArray(new ClientConnectionFactoryDelegate[newDels.size()]);
-
+
Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
del.setDelegates(delArr);
@@ -365,12 +372,12 @@
ServerConnectionFactoryEndpoint endpoint =
(ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
-
+
if (endpoint == null)
{
throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
}
-
+
rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
endpoint.updateClusteredClients(delArr, failoverMap);
@@ -410,8 +417,9 @@
/**
* @param localDelegates - Collection<ClientConnectionFactoryDelegate>
*/
- private ClientClusteredConnectionFactoryDelegate createClusteredDelegate(Collection localDelegates, LoadBalancingFactory loadBalancingFactory,
- boolean supportsFailover)
+ private ClientClusteredConnectionFactoryDelegate createClusteredDelegate(String uniqueName, Collection localDelegates, LoadBalancingFactory loadBalancingFactory,
+ ServerConnectionFactoryEndpoint endpoint,
+ boolean supportsFailover)
throws Exception
{
log.trace(this + " creating a clustered ConnectionFactoryDelegate based on " + localDelegates);
@@ -426,8 +434,10 @@
Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
LoadBalancingPolicy lbp = loadBalancingFactory.createLoadBalancingPolicy(delegates);
-
- return new ClientClusteredConnectionFactoryDelegate(delegates, failoverMap, lbp, supportsFailover);
+
+ endpoint.updateTopology(delegates, failoverMap);
+
+ return new ClientClusteredConnectionFactoryDelegate(uniqueName, delegates, failoverMap, lbp, supportsFailover);
}
private void rebindConnectionFactory(Context ic, JNDIBindings jndiBindings,
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,10 +25,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
@@ -39,9 +39,11 @@
import org.jboss.messaging.core.contract.ClusterNotificationListener;
import org.jboss.messaging.core.contract.Replicator;
import org.jboss.messaging.util.Util;
+import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientDisconnectedException;
import org.jboss.remoting.ConnectionListener;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -62,14 +64,15 @@
// Attributes -----------------------------------------------------------------------------------
- // Map<jmsClientVMID<String> - Map<remotingClientSessionID<String> - ConnectionEndpoint>>
- private Map jmsClients;
+ private Map</** VMID */String, Map</** RemoteSessionID */String, ConnectionEndpoint>> jmsClients;
// Map<remotingClientSessionID<String> - jmsClientVMID<String>
private Map remotingSessions;
// Set<ConnectionEndpoint>
private Set activeConnectionEndpoints;
+
+ private Map</** CFUniqueName*/ String, ConnectionFactoryCallbackInformation> cfCallbackInfo;
private Replicator replicator;
@@ -80,6 +83,7 @@
jmsClients = new HashMap();
remotingSessions = new HashMap();
activeConnectionEndpoints = new HashSet();
+ cfCallbackInfo = new ConcurrentHashMap<String, ConnectionFactoryCallbackInformation>();
}
// ConnectionManager implementation -------------------------------------------------------------
@@ -88,7 +92,7 @@
String remotingClientSessionID,
ConnectionEndpoint endpoint)
{
- Map endpoints = (Map)jmsClients.get(jmsClientVMID);
+ Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientVMID);
if (endpoints == null)
{
@@ -109,11 +113,11 @@
public synchronized ConnectionEndpoint unregisterConnection(String jmsClientVMId,
String remotingClientSessionID)
{
- Map endpoints = (Map)jmsClients.get(jmsClientVMId);
+ Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientVMId);
if (endpoints != null)
{
- ConnectionEndpoint e = (ConnectionEndpoint)endpoints.remove(remotingClientSessionID);
+ ConnectionEndpoint e = endpoints.remove(remotingClientSessionID);
if (e != null)
{
@@ -148,7 +152,7 @@
{
String jmsClientID = (String)remotingSessions.get(remotingSessionID);
- if (jmsClientID != null)
+ if (jmsClientID == null)
{
log.warn(this + " cannot look up remoting session ID " + remotingSessionID);
}
@@ -191,7 +195,25 @@
handleClientFailure(remotingSessionID, true);
}
}
-
+
+ /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
+ public synchronized void addConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
+ {
+ getCFInfo(uniqueName).addClient(JVMID, handler);
+ }
+
+ /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
+ public synchronized void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
+ {
+ getCFInfo(uniqueName).removeHandler(JVMID, handler);
+ }
+
+ /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
+ public synchronized ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName)
+ {
+ return getCFInfo(uniqueName).getAllHandlers();
+ }
+
// ClusterNotificationListener implementation ---------------------------------------------------
@@ -292,22 +314,34 @@
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
-
+
+ private ConnectionFactoryCallbackInformation getCFInfo(String uniqueName)
+ {
+ ConnectionFactoryCallbackInformation callback = cfCallbackInfo.get(uniqueName);
+ if (callback == null)
+ {
+ callback = new ConnectionFactoryCallbackInformation(uniqueName);
+ cfCallbackInfo.put(uniqueName, callback);
+ callback = cfCallbackInfo.get(uniqueName);
+ }
+ return callback;
+ }
+
+
private synchronized void closeConsumersForClientVMID(String jmsClientID)
{
// Remoting only provides one pinger per invoker, not per connection therefore when the pinger
// dies we must close ALL connections corresponding to that jms client ID.
- Map endpoints = (Map)jmsClients.get(jmsClientID);
+ Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientID);
if (endpoints != null)
{
- List sces = new ArrayList();
+ List<ConnectionEndpoint> sces = new ArrayList();
- for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
+ for(Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
{
- Map.Entry entry = (Map.Entry)i.next();
- ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
+ ConnectionEndpoint sce = entry.getValue();
sces.add(sce);
}
@@ -315,13 +349,11 @@
// to remove the data from the jmsClients and sessions maps.
// Note we do this outside the loop to prevent ConcurrentModificationException
- for(Iterator i = sces.iterator(); i.hasNext(); )
+ for(ConnectionEndpoint sce: sces )
{
- ConnectionEndpoint sce = (ConnectionEndpoint)i.next();
-
try
{
- log.debug("clearing up state for connection " + sce);
+ log.debug("clPearing up state for connection " + sce);
sce.closing();
sce.close();
log.debug("cleared up state for connection " + sce);
@@ -332,8 +364,94 @@
}
}
}
+
+ for (ConnectionFactoryCallbackInformation cfInfo: cfCallbackInfo.values())
+ {
+ ServerInvokerCallbackHandler[] handlers = cfInfo.getAllHandlers(jmsClientID);
+ for (ServerInvokerCallbackHandler handler: handlers)
+ {
+ try
+ {
+ handler.getCallbackClient().disconnect();
+ }
+ catch (Throwable e)
+ {
+ log.warn (e, e);
+ }
+
+ try
+ {
+ handler.destroy();
+ }
+ catch (Throwable e)
+ {
+ log.warn (e, e);
+ }
+
+ cfInfo.removeHandler(jmsClientID, handler);
+ }
+
+ }
+
}
// Inner classes --------------------------------------------------------------------------------
+ /** Class used to organize Callbacks on ClusteredConnectionFactories */
+ static class ConnectionFactoryCallbackInformation
+ {
+
+ // We keep two lists, one containing all clients a CF will have to maintain and another
+ // organized by JVMId as we will need that organization when cleaning up dead clients
+ String uniqueName;
+ Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<ServerInvokerCallbackHandler>> clientHandlersByVM;
+ ConcurrentHashSet<ServerInvokerCallbackHandler> clientHandlers;
+
+
+ public ConnectionFactoryCallbackInformation(String uniqueName)
+ {
+ this.uniqueName = uniqueName;
+ this.clientHandlersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<ServerInvokerCallbackHandler>>();
+ this.clientHandlers = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
+ }
+
+ public void addClient(String vmID, ServerInvokerCallbackHandler handler)
+ {
+ clientHandlers.add(handler);
+ getHandlersList(vmID).add(handler);
+ }
+
+ public ServerInvokerCallbackHandler[] getAllHandlers(String vmID)
+ {
+ Set<ServerInvokerCallbackHandler> list = getHandlersList(vmID);
+ ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[list.size()];
+ return (ServerInvokerCallbackHandler[])list.toArray(array);
+ }
+
+ public ServerInvokerCallbackHandler[] getAllHandlers()
+ {
+ ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[clientHandlers.size()];
+ return (ServerInvokerCallbackHandler[])clientHandlers.toArray(array);
+ }
+
+ public void removeHandler(String vmID, ServerInvokerCallbackHandler handler)
+ {
+ clientHandlers.remove(handler);
+ getHandlersList(vmID).remove(handler);
+ }
+
+ private ConcurrentHashSet<ServerInvokerCallbackHandler> getHandlersList(String vmID)
+ {
+ ConcurrentHashSet<ServerInvokerCallbackHandler> perVMList = clientHandlersByVM.get(vmID);
+ if (perVMList == null)
+ {
+ perVMList = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
+ clientHandlersByVM.put(vmID, perVMList);
+ perVMList = clientHandlersByVM.get(vmID);
+ }
+ return perVMList;
+ }
+
+ }
+
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -32,6 +32,7 @@
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.jms.delegate.TopologyResult;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.connectionfactory.JNDIBindings;
import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
@@ -39,6 +40,7 @@
import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import org.jboss.security.SecurityAssociation;
@@ -66,6 +68,8 @@
private String clientID;
+ private String uniqueName;
+
private String id;
private JNDIBindings jndiBindings;
@@ -81,14 +85,24 @@
private int dupsOKBatchSize;
private boolean supportsFailover;
+
+ /** Cluster Topology on ClusteredConnectionFactories
+ Information to failover to other connections on clients **/
+ ClientConnectionFactoryDelegate[] delegates;
+
+ /** Cluster Topology on ClusteredConnectionFactories
+ Information to failover to other connections on clients **/
+ Map failoverMap;
+
+
// Constructors ---------------------------------------------------------------------------------
/**
* @param jndiBindings - names under which the corresponding JBossConnectionFactory is bound in
* JNDI.
*/
- public ServerConnectionFactoryEndpoint(String id, ServerPeer serverPeer,
+ public ServerConnectionFactoryEndpoint(String uniqueName, String id, ServerPeer serverPeer,
String defaultClientID,
JNDIBindings jndiBindings,
int preFetchSize,
@@ -98,6 +112,7 @@
int dupsOKBatchSize,
boolean supportsFailover)
{
+ this.uniqueName = uniqueName;
this.serverPeer = serverPeer;
this.clientID = defaultClientID;
this.id = id;
@@ -263,6 +278,25 @@
}
}
+ public void addCallback(String VMID, String remotingSessionID,
+ ServerInvokerCallbackHandler callbackHandler) throws JMSException
+ {
+ log.debug("Adding callbackHandler on ConnectionFactory");
+ serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
+ }
+
+ public void removeCallback(String VMID, String remotingSessionID,
+ ServerInvokerCallbackHandler callbackHandler) throws JMSException
+ {
+ log.debug("Removing callbackHandler on ConnectionFactory");
+ serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
+ }
+
+ public TopologyResult getTopology() throws JMSException
+ {
+ return new TopologyResult(uniqueName, delegates, failoverMap);
+ }
+
// Public ---------------------------------------------------------------------------------------
public String getID()
@@ -289,35 +323,29 @@
public void updateClusteredClients(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)
throws Exception
{
- // TODO Should we lock the CFEndpoint now allowing new connections to come while doing this?
+ updateTopology(delegates, failoverMap);
- List activeConnections = serverPeer.getConnectionManager().getActiveConnections();
+ ServerInvokerCallbackHandler[] clientFactoriesToUpdate = serverPeer.getConnectionManager().getConnectionFactoryCallback(this.uniqueName);
+ log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + clientFactoriesToUpdate.length);
ConnectionFactoryUpdate message =
- new ConnectionFactoryUpdate(delegates, failoverMap);
+ new ConnectionFactoryUpdate(uniqueName, delegates, failoverMap);
Callback callback = new Callback(message);
- for (Iterator i = activeConnections.iterator(); i.hasNext();)
+ for (ServerInvokerCallbackHandler o: clientFactoriesToUpdate)
{
- ServerConnectionEndpoint connEndpoint = (ServerConnectionEndpoint)i.next();
-
- if (connEndpoint.getConnectionFactoryEndpoint() == this)
- {
- log.trace(this + " sending cluster view update to " + connEndpoint);
-
- try
- {
- connEndpoint.getCallbackHandler().handleCallbackOneway(callback);
- }
- catch (Exception e)
- {
- log.error("Callback failed on connection " + connEndpoint, e);
- }
- }
+ log.debug("Updating CF on callback " + o);
+ o.handleCallbackOneway(callback);
}
}
+ public void updateTopology(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)
+ {
+ this.delegates = delegates;
+ this.failoverMap = failoverMap;
+ }
+
public String toString()
{
return "ConnectionFactoryEndpoint[" + id + "]";
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,6 +25,7 @@
import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.jms.delegate.TopologyResult;
import org.jboss.jms.server.endpoint.ConnectionFactoryInternalEndpoint;
import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
@@ -73,7 +74,26 @@
{
return endpoint.getClientAOPStack();
}
-
+
+ public void addCallback(String vmID, String remotingSessionID,
+ ServerInvokerCallbackHandler callbackHandler) throws JMSException
+ {
+ ((ServerConnectionFactoryEndpoint)endpoint).addCallback(vmID, remotingSessionID,
+ callbackHandler);
+ }
+
+ public void removeCallback(String vmID, String remotingSessionID,
+ ServerInvokerCallbackHandler callbackHandler) throws JMSException
+ {
+ ((ServerConnectionFactoryEndpoint)endpoint).removeCallback(vmID, remotingSessionID,
+ callbackHandler);
+ }
+
+ public TopologyResult getTopology() throws JMSException
+ {
+ return endpoint.getTopology();
+ }
+
// ConnectionFactoryInternalEndpoint implementation ---------------------------------------------
public CreateConnectionResult
createConnectionDelegate(String username,
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -31,6 +31,7 @@
import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
import org.jboss.jms.wireformat.RequestSupport;
+import org.jboss.jms.wireformat.CallbackRequestSupport;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.Util;
import org.jboss.remoting.InvocationRequest;
@@ -135,31 +136,9 @@
RequestSupport request = (RequestSupport)invocation.getParameter();
- if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
+ if (request instanceof CallbackRequestSupport)
{
- //Create connection request
-
- ConnectionFactoryCreateConnectionDelegateRequest cReq =
- (ConnectionFactoryCreateConnectionDelegateRequest)request;
-
- String remotingSessionId = cReq.getRemotingSessionID();
-
- ServerInvokerCallbackHandler callbackHandler = null;
- synchronized(callbackHandlers)
- {
- callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
- }
- if (callbackHandler != null)
- {
- log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
-
- cReq.setCallbackHandler(callbackHandler);
- }
- else
- {
- throw new IllegalStateException("Cannot find callback handler " +
- "for session id " + remotingSessionId);
- }
+ performCallbackRequest(request);
}
return request.serverInvoke();
@@ -238,6 +217,32 @@
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
-
+
+ private void performCallbackRequest(RequestSupport request)
+ {
+ CallbackRequestSupport cReq =
+ (CallbackRequestSupport)request;
+
+ String remotingSessionId = cReq.getRemotingSessionID();
+
+ ServerInvokerCallbackHandler callbackHandler = null;
+ synchronized(callbackHandlers)
+ {
+ callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+ }
+ if (callbackHandler != null)
+ {
+ log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+
+ cReq.setCallbackHandler(callbackHandler);
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot find callback handler " +
+ "for session id " + remotingSessionId);
+ }
+ }
+
+
// Inner classes --------------------------------------------------------------------------------
}
Added: trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,124 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.wireformat;
+
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+
+/**
+ * Support of establishing server2client callback mechanism.
+ *
+ * (JMSServerInvocationHandler looks up for the callbackHandler based on the remoteSessionId.
+ * That routine used to be dependent on ConnectionFactoryCreateConnectionDelegateRequest
+ * but we also needed the same thing to establish callback on ConnectionFactory updates,
+ * so we created another level for RequestSupport having the callback information)
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public abstract class CallbackRequestSupport extends RequestSupport
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private String remotingSessionId;
+
+ private transient ServerInvokerCallbackHandler callbackHandler;
+
+ private String clientVMId;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ protected CallbackRequestSupport()
+ {
+ }
+
+ protected CallbackRequestSupport(String clientVMId, String remotingSessionId, String objectId, int methodId, byte version)
+ {
+ super(objectId, methodId, version);
+ this.remotingSessionId = remotingSessionId;
+ this.clientVMId = clientVMId;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public String getRemotingSessionID()
+ {
+ return remotingSessionId;
+ }
+
+
+ public String getClientVMID()
+ {
+ return clientVMId;
+ }
+
+ public void setRemotingSessionId(String remotingSessionId)
+ {
+ this.remotingSessionId = remotingSessionId;
+ }
+
+ public ServerInvokerCallbackHandler getCallbackHandler()
+ {
+ return callbackHandler;
+ }
+
+ public void setCallbackHandler(ServerInvokerCallbackHandler callbackHandler)
+ {
+ this.callbackHandler = callbackHandler;
+ }
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+
+ os.writeUTF(remotingSessionId);
+
+ os.writeUTF(clientVMId);
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ super.read(is);
+
+ remotingSessionId = is.readUTF();
+
+ clientVMId = is.readUTF();
+
+
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.wireformat;
+
+import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
+import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.logging.Logger;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class ConnectionFactoryAddCallbackRequest extends CallbackRequestSupport
+{
+
+ private static final Logger log = Logger.getLogger(ConnectionFactoryAddCallbackRequest.class);
+
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ConnectionFactoryAddCallbackRequest()
+ {
+ }
+
+ public ConnectionFactoryAddCallbackRequest(String jvmSessionId, String remotingSessionId, String objectId, byte version)
+ {
+ super(jvmSessionId, remotingSessionId, objectId, PacketSupport.REQ_CONNECTIONFACTORY_ADDCALLBACK, version);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ public ResponseSupport serverInvoke() throws Exception
+ {
+ log.debug("serverInvoke callbackHandler=" + this.getCallbackHandler());
+ ConnectionFactoryAdvised advised =
+ (ConnectionFactoryAdvised)Dispatcher.instance.getTarget(objectId);
+
+ if (advised == null)
+ {
+ throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+ }
+
+ advised.addCallback(getClientVMID(), getRemotingSessionID(), this.getCallbackHandler());
+
+
+
+ return null;
+ }
+
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+ os.flush();
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ super.read(is);
+ }
+}
Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -37,7 +37,7 @@
* $Id$
*
*/
-public class ConnectionFactoryCreateConnectionDelegateRequest extends RequestSupport
+public class ConnectionFactoryCreateConnectionDelegateRequest extends CallbackRequestSupport
{
private String username;
@@ -45,10 +45,6 @@
private int failedNodeId;
- private String remotingSessionId;
-
- private String clientVMId;
-
private transient ServerInvokerCallbackHandler callbackHandler;
public ConnectionFactoryCreateConnectionDelegateRequest()
@@ -62,12 +58,8 @@
String username, String password,
int failedNodeId)
{
- super(objectId, PacketSupport.REQ_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE, version);
+ super(clientVMId, remotingSessionId, objectId, PacketSupport.REQ_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE, version);
- this.remotingSessionId = remotingSessionId;
-
- this.clientVMId = clientVMId;
-
this.username = username;
this.password = password;
@@ -79,10 +71,6 @@
{
super.read(is);
- remotingSessionId = is.readUTF();
-
- clientVMId = is.readUTF();
-
username = readNullableString(is);
password = readNullableString(is);
@@ -102,7 +90,7 @@
CreateConnectionResult del =
advised.createConnectionDelegate(username, password, failedNodeId,
- remotingSessionId, clientVMId, version,
+ getRemotingSessionID(), getClientVMID(), version,
callbackHandler);
return new ConnectionFactoryCreateConnectionDelegateResponse(del);
@@ -112,10 +100,6 @@
{
super.write(os);
- os.writeUTF(remotingSessionId);
-
- os.writeUTF(clientVMId);
-
//Write the args
writeNullableString(username, os);
@@ -127,16 +111,6 @@
os.flush();
}
- public String getRemotingSessionID()
- {
- return remotingSessionId;
- }
-
- public String getClientVMID()
- {
- return clientVMId;
- }
-
public ServerInvokerCallbackHandler getCallbackHandler()
{
return this.callbackHandler;
Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.wireformat;
+
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import org.jboss.messaging.util.Version;
+import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class ConnectionFactoryGetTopologyRequest extends RequestSupport
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ConnectionFactoryGetTopologyRequest()
+ {
+ }
+
+ public ConnectionFactoryGetTopologyRequest(String objectId)
+ {
+ super(objectId, REQ_CONNECTIONFACTORY_GETTOPOLOGY, Version.instance().getProviderIncrementingVersion());
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+ os.flush();
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ super.read(is);
+ }
+
+ public ResponseSupport serverInvoke() throws Exception
+ {
+ ConnectionFactoryEndpoint endpoint =
+ (ConnectionFactoryEndpoint)Dispatcher.instance.getTarget(objectId);
+
+ if (endpoint == null)
+ {
+ throw new IllegalStateException("Cannot find object with ID " + objectId + " in dispatcher");
+ }
+
+ return new ConnectionFactoryGetTopologyResponse(endpoint.getTopology());
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.wireformat;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import org.jboss.jms.delegate.TopologyResult;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class ConnectionFactoryGetTopologyResponse extends ResponseSupport
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ TopologyResult result;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ConnectionFactoryGetTopologyResponse(TopologyResult result)
+ {
+ super(RESP_CONNECTIONFACTORY_GETTOPOLOGY);
+ this.result = result;
+ }
+
+ public ConnectionFactoryGetTopologyResponse()
+ {
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ public void read(DataInputStream is) throws Exception
+ {
+ result = new TopologyResult();
+ result.read(is);
+ }
+
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+ result.write(os);
+ os.flush();
+ }
+
+ public Object getResponse()
+ {
+ return result;
+ }
+}
Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.jms.wireformat;
+
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
+import org.jboss.messaging.util.Version;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class ConnectionFactoryRemoveCallbackRequest extends CallbackRequestSupport
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ConnectionFactoryRemoveCallbackRequest()
+ {
+ }
+
+ public ConnectionFactoryRemoveCallbackRequest(String jvmSessionId, String remotingSessionId, String objectId, byte version)
+ {
+ super(jvmSessionId, remotingSessionId, objectId, PacketSupport.REQ_CONNECTIONFACTORY_REMOVECALLBACK, Version.instance().getProviderIncrementingVersion());
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+ os.flush();
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ super.read(is);
+ }
+
+ public ResponseSupport serverInvoke() throws Exception
+ {
+ ConnectionFactoryAdvised advised =
+ (ConnectionFactoryAdvised)Dispatcher.instance.getTarget(objectId);
+
+ if (advised == null)
+ {
+ throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+ }
+
+ advised.removeCallback(getClientVMID(), getRemotingSessionID(), this.getCallbackHandler());
+
+
+ return null;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -24,11 +24,10 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.delegate.TopologyResult;
/**
* This class holds the update cluster view sent by the server to client-side clustered connection
@@ -47,22 +46,18 @@
// Attributes -----------------------------------------------------------------------------------
- private ClientConnectionFactoryDelegate[] delegates;
-
- private Map failoverMap;
+ TopologyResult topology;
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
- public ConnectionFactoryUpdate(ClientConnectionFactoryDelegate[] delegates,
+ public ConnectionFactoryUpdate(String uniqueName, ClientConnectionFactoryDelegate[] delegates,
Map failoverMap)
{
super(PacketSupport.CONNECTIONFACTORY_UPDATE);
-
- this.delegates = delegates;
-
- this.failoverMap = failoverMap;
+
+ topology = new TopologyResult(uniqueName, delegates, failoverMap);
}
public ConnectionFactoryUpdate()
@@ -71,108 +66,36 @@
// Public ---------------------------------------------------------------------------------------
- public ClientConnectionFactoryDelegate[] getDelegates()
+ public String toString()
{
- return delegates;
+ return "ConnectionFactoryUpdateMessage{" + topology + "}";
}
- public void setDelegates(ClientConnectionFactoryDelegate[] delegates)
+ public TopologyResult getTopology()
{
- this.delegates = delegates;
+ return topology;
}
- public Map getFailoverMap()
+ public void setTopology(TopologyResult topology)
{
- return failoverMap;
+ this.topology = topology;
}
- public void setFailoverMap(Map failoverMap)
- {
- this.failoverMap = failoverMap;
- }
-
-
- public String toString()
- {
- StringBuffer sb = new StringBuffer("ConnectionFactoryUpdateMessage[");
-
- if (delegates != null)
- {
- for(int i = 0; i < delegates.length; i++)
- {
- sb.append(delegates[i]);
- if (i < delegates.length - 1)
- {
- sb.append(',');
- }
- }
- }
-
- sb.append("]");
-
- return sb.toString();
- }
-
// Streamable implementation
// ---------------------------------------------------------------
public void read(DataInputStream is) throws Exception
- {
- int len = is.readInt();
-
- delegates = new ClientConnectionFactoryDelegate[len];
-
- for (int i = 0; i < len; i++)
- {
- delegates[i] = new ClientConnectionFactoryDelegate();
-
- delegates[i].read(is);
- }
-
- len = is.readInt();
-
- failoverMap = new HashMap(len);
-
- for (int c = 0; c < len; c++)
- {
- Integer i = new Integer(is.readInt());
-
- Integer j = new Integer(is.readInt());
-
- failoverMap.put(i, j);
- }
+ {
+ topology = new TopologyResult();
+ topology.read(is);
}
public void write(DataOutputStream os) throws Exception
{
super.write(os);
-
- int len = delegates.length;
-
- os.writeInt(len);
-
- for (int i = 0; i < len; i++)
- {
- delegates[i].write(os);
- }
-
- os.writeInt(failoverMap.size());
-
- Iterator iter = failoverMap.entrySet().iterator();
-
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
-
- Integer i = (Integer)entry.getKey();
-
- Integer j = (Integer)entry.getValue();
-
- os.writeInt(i.intValue());
-
- os.writeInt(j.intValue());
- }
-
+
+ topology.write(os);
+
os.flush();
}
Modified: trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -77,6 +77,9 @@
public static final int REQ_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE = 100;
public static final int REQ_CONNECTIONFACTORY_GETIDBLOCK = 101;
public static final int REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK = 102;
+ public static final int REQ_CONNECTIONFACTORY_ADDCALLBACK = 103;
+ public static final int REQ_CONNECTIONFACTORY_REMOVECALLBACK = 104;
+ public static final int REQ_CONNECTIONFACTORY_GETTOPOLOGY = 105;
// Connection
// ----------
@@ -137,8 +140,9 @@
public static final int RESP_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE = 100100;
public static final int RESP_CONNECTIONFACTORY_GETIDBLOCK = 100101;
public static final int RESP_CONNECTIONFACTORY_GETCLIENTAOPSTACK = 100102;
+ public static final int RESP_CONNECTIONFACTORY_GETTOPOLOGY = 100105;
- // Connection
+ // Connection
// -------------------------------------
public static final int RESP_CONNECTION_CREATESESSIONDELEGATE = 100200;
@@ -207,8 +211,17 @@
case REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK:
packet = new ConnectionFactoryGetClientAOPStackRequest();
break;
-
- // Connection
+ case REQ_CONNECTIONFACTORY_ADDCALLBACK:
+ packet = new ConnectionFactoryAddCallbackRequest();
+ break;
+ case REQ_CONNECTIONFACTORY_REMOVECALLBACK:
+ packet = new ConnectionFactoryRemoveCallbackRequest();
+ break;
+ case REQ_CONNECTIONFACTORY_GETTOPOLOGY:
+ packet = new ConnectionFactoryGetTopologyRequest();
+ break;
+
+ // Connection
case REQ_CONNECTION_CREATESESSIONDELEGATE:
packet = new ConnectionCreateSessionDelegateRequest();
break;
@@ -302,9 +315,12 @@
break;
case RESP_CONNECTIONFACTORY_GETCLIENTAOPSTACK:
packet = new ConnectionFactoryGetClientAOPStackResponse();
- break;
+ break;
+ case RESP_CONNECTIONFACTORY_GETTOPOLOGY:
+ packet = new ConnectionFactoryGetTopologyResponse();
+ break;
- // Connection
+ // Connection
case RESP_CONNECTION_CREATESESSIONDELEGATE:
packet = new ConnectionCreateSessionDelegateResponse();
break;
Modified: trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -39,9 +39,9 @@
*
* $Id: ConcurrentReaderHashSet.java 1935 2007-01-09 23:29:20Z clebert.suconic at jboss.com $
*/
-public class ConcurrentHashSet extends AbstractSet
+public class ConcurrentHashSet<E> extends AbstractSet<E>
{
- private Map theMap;
+ private Map<E, Object> theMap;
private static Object dummy = new Object();
@@ -77,7 +77,7 @@
return theMap.isEmpty();
}
- public boolean add(Object o)
+ public boolean add(E o)
{
return theMap.put(o, dummy) == dummy;
}
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/build.xml 2007-08-21 04:19:48 UTC (rev 3019)
@@ -189,8 +189,8 @@
<mkdir dir="${build.tests.classes}"/>
<javac destdir="${build.tests.classes}"
optimize="${javac.optimize}"
- target="1.4"
- source="1.4"
+ target="1.5"
+ source="1.5"
debug="${javac.debug}"
depend="${javac.depend}"
verbose="${javac.verbose}"
@@ -758,7 +758,6 @@
<fileset dir="${build.tests.classes}">
<include name="**/jms/clustering/${test-mask}.class"/>
<exclude name="**/jms/clustering/ClusterLeakTest.class"/>
- <exclude name="**/jms/clustering/ClusterViewUpdateTest.class"/>
</fileset>
</batchtest>
</junit>
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -95,6 +95,10 @@
import org.jboss.jms.wireformat.SessionRecoverDeliveriesRequest;
import org.jboss.jms.wireformat.SessionSendRequest;
import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyResponse;
+import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
import org.jboss.remoting.InvocationRequest;
/**
@@ -150,6 +154,11 @@
{
wf.testConnectionFactoryGetClientAOPStack();
}
+
+ public void testConnectionFactoryGetTopology() throws Exception
+ {
+ wf.testConnectionFactoryGetTopology();
+ }
// Connection
@@ -309,7 +318,17 @@
{
wf.testConnectionFactoryGetClientAOPStackResponse();
}
-
+
+ public void testConnectionFactoryAddCabllack() throws Exception
+ {
+ wf.testConnectionFactoryAddCabllack();
+ }
+
+ public void testConnectionFactoryRemoveCabllack() throws Exception
+ {
+ wf.testConnectionFactoryRemoveCabllack();
+ }
+
// Connection
public void testConnectionCreateSessionDelegateResponse() throws Exception
@@ -378,8 +397,8 @@
{
wf.testClosingResponse();
}
-
-
+
+
//We just check the first byte to make sure serialization is not be used.
private class TestWireFormat extends JMSWireFormat
@@ -452,10 +471,34 @@
{
RequestSupport req =
new ConnectionFactoryGetClientAOPStackRequest("23", (byte)77);;
-
- testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK);
+
+ testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK);
}
-
+
+ public void testConnectionFactoryAddCabllack() throws Exception
+ {
+ RequestSupport req =
+ new ConnectionFactoryAddCallbackRequest("12", "23", "24",(byte)0);
+
+ testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_ADDCALLBACK);
+ }
+
+ public void testConnectionFactoryRemoveCabllack() throws Exception
+ {
+ RequestSupport req =
+ new ConnectionFactoryRemoveCallbackRequest("12", "23", "24",(byte)0);
+
+ testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_REMOVECALLBACK);
+ }
+
+ public void testConnectionFactoryGetTopology() throws Exception
+ {
+ RequestSupport req =
+ new ConnectionFactoryGetTopologyRequest("123");
+
+ testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_GETTOPOLOGY);
+ }
+
// Connection
public void testConnectionCreateSessionDelegateRequest() throws Exception
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -24,12 +24,19 @@
import javax.jms.Connection;
import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.container.ClusteringAspect;
+import org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler;
import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.delegate.TopologyResult;
import org.jboss.test.messaging.tools.ServerManagement;
+import java.lang.ref.WeakReference;
/**
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -61,47 +68,223 @@
super.setUp();
}
-
+
+ /** This method is to make sure CFs are being released on GC, validating if the callbacks
+ * are not making any hard references */
+ public void testUpdateTopology() throws Throwable
+ {
+
+ JBossConnectionFactory cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+ ClientClusteredConnectionFactoryDelegate clusterDelegate = (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+ assertEquals(2, clusterDelegate.getDelegates().length);
+ clusterDelegate.getTopology();
+ assertEquals(2, clusterDelegate.getDelegates().length);
+
+ // Kill the same node as the CF is connected to
+ for (int i=5;i>0;i--)
+ {
+ log.info("kill in " + i);
+ Thread.sleep(1000);
+ }
+ ServerManagement.kill(1);
+ Thread.sleep(5000);
+ assertEquals(1, clusterDelegate.getDelegates().length);
+ TopologyResult topology = clusterDelegate.getTopology();
+ assertEquals(1, topology.getDelegates().length);
+
+ clusterDelegate.closeCallback();
+ }
+
+
+ /** This method is to make sure CFs are being released on GC, validating if the callbacks
+ * are not making any hard references.
+ *
+ * Note: Even though this test is looking for memory leaks, it's important this test
+ * runs as part of the validation of ConnectionFactory updates, as a leak on CF would
+ * cause problems with excessive number of connections.
+ * */
+ public void testGarbageCollectionOnClusteredCF() throws Throwable
+ {
+
+ JBossConnectionFactory cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+ ClientClusteredConnectionFactoryDelegate clusterDelegate = (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+ WeakReference<ClientClusteredConnectionFactoryDelegate> ref = new WeakReference<ClientClusteredConnectionFactoryDelegate>(clusterDelegate);
+
+ // Using a separate block, as everything on this block has to be released (no references from the method)
+ {
+ Connection conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer prod = session.createProducer(queue[0]);
+ MessageConsumer cons = session.createConsumer(queue[0]);
+ prod.send(session.createTextMessage("Hello"));
+ session.commit();
+ TextMessage message = (TextMessage)cons.receive(1000);
+ assertEquals("Hello", message.getText());
+ log.info("Received message " + message.getText());
+ session.commit();
+ conn.close();
+ }
+
+
+ // cf = null;
+ clusterDelegate = null;
+
+ int loops = 0;
+ // Stays on loop until GC is done
+ while (ref.get()!=null)
+ {
+ for (int i=0;i<10000;i++)
+ {
+ /// Just throwing extra garbage on the memory.. to make sure GC will happen
+
+ String str = "GARBAGE GARBAGE GARBAGE GARBAGE GARBAGE GARBAGE GARBAGE " + i;
+ }
+
+ log.info("Calling system.gc()");
+ System.gc();
+ Thread.sleep(1000);
+ if (loops++ > 10)
+ {
+ // This should be more than enough already... even the object wasn't cleared on more than
+ // 2 GC cycles we have a leak already.
+
+ // Note! Due to AOP references the CFDelegate will be released from AOP instances
+ // only after 2 or more GC cycles.
+ break;
+ }
+ }
+
+ // Case there are still references to the ConnectionFactory, uncomment this code,
+ // add -agentlib:jbossAgent to your JVM arguments (with jboss-profiler lib on path or LD_LIBRARY_PATH)
+ // and this will tell you where the code is leaking.
+// org.jboss.profiler.jvmti.JVMTIInterface jvmti = new org.jboss.profiler.jvmti.JVMTIInterface();
+//
+//
+// if (ref.get() != null)
+// {
+// if (jvmti.isActive())
+// {
+// log.info("There are still references to CF");
+// HashMap refMap = jvmti.createIndexMatrix();
+// log.info(jvmti.exploreObjectReferences(refMap, ref.get(), 5, true));
+// }
+// else
+// {
+// log.info("Profiler is not active");
+// }
+// }
+//
+ assertNull("There is a memory leak on ClientClusteredConnectionFactoryDelegate", ref.get());
+ }
+
public void testUpdateConnectionFactoryWithNoConnections() throws Exception
{
- ServerManagement.kill(1);
+
+ ServerManagement.kill(1);
Thread.sleep(5000);
Connection conn = createConnectionOnServer(cf, 0);
-
- ClientClusteredConnectionFactoryDelegate cfDelegate =
- (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+ try
+ {
- assertEquals(1, cfDelegate.getDelegates().length);
-
- conn.close();
+ ClientClusteredConnectionFactoryDelegate cfDelegate =
+ (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+
+ assertEquals(1, cfDelegate.getDelegates().length);
+ }
+ finally
+ {
+ conn.close();
+ }
}
public void testUpdateConnectionFactoryWithNoInitialConnections() throws Exception
{
- //We kill all the servers - this tests the connection factory's ability to create a first connection
- //when its entire cached set of delegates is stale
-
- ServerManagement.kill(1);
-
- ServerManagement.kill(0);
-
- ServerManagement.start(0, "all", false);
-
- Thread.sleep(5000);
-
- Connection conn = createConnectionOnServer(cf, 0);
-
- ClientClusteredConnectionFactoryDelegate cfDelegate =
- (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+ try
+ {
+ ClientClusteredConnectionFactoryDelegate clusterDelegate = (ClientClusteredConnectionFactoryDelegate)this.cf.getDelegate();
- assertEquals(1, cfDelegate.getDelegates().length);
-
- conn.close();
-
- ServerManagement.kill(0);
+ //JBossConnectionFactory cf2 = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+ //ClientClusteredConnectionFactoryDelegate clusterDelegate2 = (ClientClusteredConnectionFactoryDelegate)cf2.getDelegate();
+
+ //We kill all the servers - this tests the connection factory's ability to create a first connection
+ //when its entire cached set of delegates is stale
+
+ startDefaultServer(2, currentOverrides, false);
+
+ assertEquals(3, clusterDelegate.getDelegates().length);
+
+ // assertEquals(3, clusterDelegate2.getDelegates().length);
+ log.info("#################################### Killing server 1 and 0");
+ ServerManagement.log(ServerManagement.INFO, "Killing server1", 2);
+ ServerManagement.kill(1);
+ // Need some time for Lease
+ Thread.sleep(5000);
+
+ assertEquals("Delegates are different on topology", 2,clusterDelegate.getTopology().getDelegates().length);
+ assertEquals(2, clusterDelegate.getDelegates().length);
+
+ ServerManagement.log(ServerManagement.INFO, "Stopping server0", 2);
+ ServerManagement.stop(0);
+
+ Thread.sleep(1000);
+
+ assertEquals(1, clusterDelegate.getDelegates().length);
+
+ Connection conn = createConnectionOnServer(cf, 2);
+
+ ClientClusteredConnectionFactoryDelegate cfDelegate =
+ (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+
+ assertEquals(1, cfDelegate.getDelegates().length);
+
+ conn.close();
+ }
+ finally
+ {
+ ServerManagement.kill(2);
+ }
}
+
+
+ /** Case the updateCF is not captured, the hopping should run nicely.
+ * This test will disable CF callback for a connection and validate if hoping is working*/
+ public void testNoUpdateCaptured() throws Exception
+ {
+ JBossConnectionFactory cfNoCallback = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+ ClientClusteredConnectionFactoryDelegate noCallbackDelegate = (ClientClusteredConnectionFactoryDelegate )cfNoCallback.getDelegate();
+ noCallbackDelegate.closeCallback();
+
+ ServerManagement.kill(1);
+
+ Connection conn = null;
+
+ for (int i=0; i<4; i++)
+ {
+ try
+ {
+ conn = cfNoCallback.createConnection();
+ // 0 is the only server alive, so.. all connection should be performed on it
+ assertEquals(0, getServerId(conn));
+ }
+ finally
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ }
public void testUpdateConnectionFactoryOnKill() throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,8 +25,12 @@
import javax.jms.Connection;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
import org.jboss.jms.exception.MessagingNetworkFailureException;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
/**
@@ -171,6 +175,45 @@
}
}
+ public void testRestartServer() throws Exception
+ {
+ JBossConnectionFactory cf2 = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
+
+ ClientClusteredConnectionFactoryDelegate clusterCF = (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+ ClientConnectionFactoryDelegate delegates[] = clusterCF.getDelegates();
+ clusterCF.closeCallback();
+
+ ServerManagement.kill(1);
+
+ //Restart the server on the same place
+ ServiceAttributeOverrides attr = new ServiceAttributeOverrides();
+ attr.put(ServiceContainer.REMOTING_OBJECT_NAME, "LocatorURI",delegates[1].getServerLocatorURI());
+ ServerManagement.start(1,config,attr,false);
+
+ // The server back on the same remoting port as before
+ startDefaultServer(1, attr, false);
+
+ Connection conn = null;
+ try
+ {
+ conn = cf2.createConnection();
+ }
+ finally
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java 2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java 2007-08-21 04:19:48 UTC (rev 3019)
@@ -186,20 +186,16 @@
if (!ServerManagement.isStarted(i))
{
log.info("Server " + i + " is not started - starting it");
-
- ServerManagement.start(i, config, currentOverrides, ic == null);
-
- if (ic == null)
- {
- ic = new InitialContext[nodeCount];
- queue = new Queue[nodeCount];
- topic = new Topic[nodeCount];
- }
-
- log.info("deploying queue on node " + i);
- ServerManagement.deployQueue("testDistributedQueue", i);
- ServerManagement.deployTopic("testDistributedTopic", i);
+
+ startDefaultServer(i, overrides, ic == null);
+ if (ic == null)
+ {
+ ic = new InitialContext[nodeCount];
+ queue = new Queue[nodeCount];
+ topic = new Topic[nodeCount];
+ }
+
ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
queue[i] = (Queue)ic[i].lookup("queue/testDistributedQueue");
@@ -230,7 +226,17 @@
cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
}
}
-
+
+ protected void startDefaultServer(int serverNumber, ServiceAttributeOverrides attributes, boolean cleanDatabase)
+ throws Exception
+ {
+ ServerManagement.start(serverNumber, config, attributes, cleanDatabase);
+
+ log.info("deploying queue on node " + serverNumber);
+ ServerManagement.deployQueue("testDistributedQueue", serverNumber);
+ ServerManagement.deployTopic("testDistributedTopic", serverNumber);
+ }
+
private boolean overridesChanged(ServiceAttributeOverrides sao1, ServiceAttributeOverrides sao2)
{
Map map1 = sao1 == null ? null : sao1.getMap();
More information about the jboss-cvs-commits
mailing list