[jboss-cvs] JBoss Messaging SVN: r1695 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/server/connectionfactory messaging/core/plugin/contract messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Dec 3 21:26:24 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-12-03 21:26:21 -0500 (Sun, 03 Dec 2006)
New Revision: 1695
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Fixed broken clustered ConnectionFactory initialization. http://jira.jboss.org/jira/browse/JBMESSAGING-667
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-04 02:26:21 UTC (rev 1695)
@@ -34,6 +34,7 @@
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
import org.jboss.jms.server.ConnectionFactoryManager;
import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.Version;
import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -65,8 +66,12 @@
protected Context initialContext;
protected ServerPeer serverPeer;
+
+ // Map<uniqueName<String> - ServerConnectionFactoryEndpoint>
protected Map endpoints;
- protected Map factories;
+
+ // Map<uniqueName<String> - ClusteredClientConnectionFactoryDelegate>
+ protected Map clusteredDelegates;
protected Replicator replicator;
@@ -76,7 +81,7 @@
{
this.serverPeer = serverPeer;
endpoints = new HashMap();
- factories = new HashMap();
+ clusteredDelegates = new HashMap();
}
// ConnectionFactoryManager implementation -----------------------
@@ -94,6 +99,7 @@
throws Exception
{
int id = serverPeer.getNextObjectID();
+ Version version = serverPeer.getVersion();
ServerConnectionFactoryEndpoint endpoint =
new ServerConnectionFactoryEndpoint(id, serverPeer, clientID,
@@ -101,61 +107,43 @@
defaultTempQueueFullSize,
defaultTempQueuePageSize,
defaultTempQueueDownCacheSize);
+ endpoints.put(uniqueName, endpoint);
- ClientConnectionFactoryDelegate delegate =
- new ClientConnectionFactoryDelegate(id, locatorURI, serverPeer.getVersion(), clientPing);
-
- if (clustered)
+ JMSDispatcher.instance.
+ registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
+
+ ClientConnectionFactoryDelegate localDelegate =
+ new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+
+ if (!clustered)
{
- setupReplicator();
-
- //We may have no clustered post office deployed, in which case a clustered connection factory
- //behaves like a standard connection factory
- if (replicator != null)
- {
- // First we must replicate the delegate across the cluster so that all the
- // ConnectionFactoryJNDIMapper instances on different nodes have access to the list of
- // local connection factories so they can be added to the clustered connection factory
-
- replicator.putReplicant(CF_PREFIX + uniqueName, delegate);
-
- // Get the list of delegates
-
- Map replicants = replicator.getReplicants(CF_PREFIX + uniqueName);
-
- //Create a clustered delegate which contains the array of the local delegates
- //and failover indexes
-
- ClusteredClientConnectionFactoryDelegate clusteredDel =
- new ClusteredClientConnectionFactoryDelegate(id,
- locatorURI, serverPeer.getVersion(), clientPing);
-
- setFailoverDelegates(clusteredDel, replicants);
-
- delegate = clusteredDel;
- }
+ rebindConnectionFactory(initialContext, jndiBindings, localDelegate);
+ return;
}
-
- ConnectionFactoryAdvised connFactoryAdvised = new ConnectionFactoryAdvised(endpoint);
-
- JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
-
- if (jndiBindings != null)
+
+ // We are clustered, we need to propagate the local delegate across the cluster.
+
+ setupReplicator();
+
+ if (replicator == null)
{
- List jndiNames = jndiBindings.getNames();
- for(Iterator i = jndiNames.iterator(); i.hasNext(); )
- {
- String jndiName = (String)i.next();
- JNDIUtil.rebind(initialContext, jndiName, cf);
- }
+ return;
}
-
- factories.put(uniqueName, cf);
-
- endpoints.put(uniqueName, endpoint);
-
- JMSDispatcher.instance.registerTarget(new Integer(id), connFactoryAdvised);
- }
+
+ // Create a "hollow" clustered delegate ...
+
+ ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+ new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+
+ clusteredDelegates.put(uniqueName, clusteredDelegate);
+
+ // ... and then update it (and the clustered delegate on every node, while we're at it) by
+ // replicating the local delegate across the cluster. This way, the clustred delegates on each
+ // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
+ // will rebind updated ConnectionFactories in JNDI.
+
+ replicator.putReplicant(CF_PREFIX + uniqueName, localDelegate);
+ }
public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception
{
@@ -181,7 +169,7 @@
}
ClientConnectionFactoryDelegate delegate =
- (ClientConnectionFactoryDelegate)factories.remove(uniqueName);
+ (ClientConnectionFactoryDelegate)clusteredDelegates.remove(uniqueName);
if (delegate == null)
{
@@ -241,55 +229,44 @@
// ReplicationListener interface ----------------------------------
- public synchronized void onReplicationChange(Serializable key, Map replicants)
+ public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap)
{
try
{
- //The list of connection factories across the cluster has changed -
- //we need to update the clustered connection factories in JNDI with
- //new ones with the up to date list
+ // The list of connection factories across the cluster has changed, so we need to update
+ // the clustered connection factories in JNDI with new ones with the up to date list.
+ // Replication can be used for other stuff (not just connection factories) so we check
+ // the prefix.
- //Replication can be used for other stuff (not just connection factories)
- //so we check the prefix
-
String sKey = (String)key;
if (key instanceof String && sKey.startsWith(CF_PREFIX))
{
String uniqueName = sKey.substring(CF_PREFIX.length());
- ClusteredClientConnectionFactoryDelegate cf =
- (ClusteredClientConnectionFactoryDelegate)factories.get(uniqueName);
+ ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+ (ClusteredClientConnectionFactoryDelegate)clusteredDelegates.get(uniqueName);
- if (cf == null)
+ if (clusteredDelegate == null)
{
- throw new IllegalStateException("Cannot find connection factory " + uniqueName + " to update");
+ throw new IllegalStateException("Cannot find connection delegate for " +
+ uniqueName + " to update");
}
- setFailoverDelegates(cf, replicants);
+ setFailoverDelegates(clusteredDelegate, updatedReplicantMap);
- //Now rebind
+ // Now rebind ...
ServerConnectionFactoryEndpoint endpoint =
- (ServerConnectionFactoryEndpoint)endpoints.remove(uniqueName);
+ (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
if (endpoint == null)
{
throw new IllegalStateException("Cannot find endpoint " + uniqueName );
}
-
- JNDIBindings jndiBindings = endpoint.getJNDIBindings();
-
- if (jndiBindings != null)
- {
- List jndiNames = jndiBindings.getNames();
- for(Iterator i = jndiNames.iterator(); i.hasNext(); )
- {
- String jndiName = (String)i.next();
- initialContext.rebind(jndiName, cf);
- }
- }
- }
+
+ rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
+ }
}
catch (NamingException e)
{
@@ -316,62 +293,50 @@
{
this.serverPeer.getQueuePostOfficeInstance();
}
-
-
- private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate cf,
- Map replicants)
+
+
+ /**
+ * @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
+ */
+ private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
+ Map localDelegates)
{
- // We need to get the failover indexes too
- // If delegates[i] is the current delegate, then the failover delegate
- // is given by delegates[failoverIndexes[i]]
-
- int s = replicants.size();
-
+ int s = localDelegates.size();
+
+ int[] nodesIDs = new int[s];
+ int[] failoverNodeIDs = new int[s];
ClientConnectionFactoryDelegate[] delegates = new ClientConnectionFactoryDelegate[s];
-
- Iterator iter = replicants.entrySet().iterator();
-
- int i = 0;
-
- int[] nodes = new int[s];
-
- int[] failoverNodes = new int[s];
-
- while (iter.hasNext())
+
+ int idx = 0;
+
+ for(Iterator i = localDelegates.entrySet().iterator(); i.hasNext(); )
{
- Map.Entry entry = (Map.Entry)iter.next();
-
- int nodeId = ((Integer)entry.getKey()).intValue();
-
- delegates[i] = (ClientConnectionFactoryDelegate)entry.getValue();
+ Map.Entry entry = (Map.Entry)i.next();
- int failoverNode = replicator.getFailoverNodeForNode(nodeId);
-
- nodes[i] = nodeId;
-
- failoverNodes[i] = failoverNode;
-
- i++;
+ int nodeID = ((Integer)entry.getKey()).intValue();
+ int failoverNodeID = replicator.getFailoverNodeID(nodeID);
+
+ nodesIDs[idx] = nodeID;
+ failoverNodeIDs[idx] = failoverNodeID;
+ delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
+
+ idx++;
}
// Generate the failover indexes. This could probably be optimised if need be.
int[] failoverIndexes = new int[s];
- for (int j = 0; j < s; i++)
+ for (int i = 0; i < s; i++)
{
- int failoverNode = failoverNodes[j];
-
- //What index is this node?
-
+ int failoverNode = failoverNodeIDs[i];
int failoverIndex = -1;
- for (int k = 0; k < s; k++)
+ for (int j = 0; j < s; j++)
{
- if (nodes[k] == failoverNode)
+ if (nodesIDs[j] == failoverNode)
{
- failoverIndex = k;
-
+ failoverIndex = j;
break;
}
}
@@ -381,11 +346,29 @@
throw new IllegalStateException("Failover node is not in list of nodes!");
}
- failoverIndexes[j] = failoverIndex;
+ failoverIndexes[i] = failoverIndex;
}
- cf.setFailoverDelegates(delegates, failoverIndexes);
+ clusteredDelegate.setFailoverDelegates(delegates, failoverIndexes);
}
+ private void rebindConnectionFactory(Context ic,
+ JNDIBindings jndiBindings,
+ ClientConnectionFactoryDelegate delegate)
+ throws NamingException
+ {
+ JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
+
+ if (jndiBindings != null)
+ {
+ List jndiNames = jndiBindings.getNames();
+ for(Iterator i = jndiNames.iterator(); i.hasNext(); )
+ {
+ String jndiName = (String)i.next();
+ JNDIUtil.rebind(ic, jndiName, cf);
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java 2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java 2006-12-04 02:26:21 UTC (rev 1695)
@@ -35,5 +35,9 @@
*/
public interface ReplicationListener
{
- void onReplicationChange(Serializable key, Map replicants);
+ /**
+ * @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
+ * the given key.
+ */
+ void onReplicationChange(Serializable key, Map updatedReplicantMap);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-04 02:26:21 UTC (rev 1695)
@@ -56,10 +56,12 @@
void unregisterListener(ReplicationListener listener);
/**
- * Gets the current failover node id for the specified node id.
- * @param nodeId The node to get the failover node id for.
- * @return the failover node id. If there is no failover node (one-node cluster), the method
+ * Gets the current failover node ID for the specified node ID.
+ *
+ * @param nodeID - The node to get the failover node id for.
+ *
+ * @return the failover node ID. If there is no failover node (one-node cluster), the method
* returns the original nodeID.
*/
- int getFailoverNodeForNode(int nodeId);
+ int getFailoverNodeID(int nodeID);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-04 02:26:21 UTC (rev 1695)
@@ -314,11 +314,11 @@
asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
- this.controlMessageListener = new ControlMessageListener();
+ controlMessageListener = new ControlMessageListener();
- this.requestHandler = new PostOfficeRequestHandler();
+ requestHandler = new PostOfficeRequestHandler();
- this.controlMembershipListener = new ControlMembershipListener();
+ controlMembershipListener = new ControlMembershipListener();
this.controlMessageDispatcher =
new MessageDispatcher(syncChannel, controlMessageListener,
@@ -603,7 +603,7 @@
return listBindingsForConditionInternal(condition, false);
}
- public int getFailoverNodeForNode(int nodeId)
+ public int getFailoverNodeID(int nodeId)
{
Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
@@ -1812,13 +1812,17 @@
lock.writeLock().release();
}
}
-
- private void notifyListeners(Serializable key, Map replicants)
+
+ /**
+ * @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
+ * the given key.
+ */
+ private void notifyListeners(Serializable key, Map updatedReplicantMap)
{
for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
{
ReplicationListener listener = (ReplicationListener)i.next();
- listener.onReplicationChange(key, replicants);
+ listener.onReplicationChange(key, updatedReplicantMap);
}
}
@@ -1898,7 +1902,7 @@
*/
private boolean isFailoverNodeForNode(int nodeId)
{
- return this.nodeId == getFailoverNodeForNode(nodeId);
+ return this.nodeId == getFailoverNodeID(nodeId);
}
private byte[] getStateAsBytes() throws Exception
More information about the jboss-cvs-commits
mailing list