[jboss-cvs] JBoss Messaging SVN: r1718 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms: client/delegate server/connectionfactory
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Dec 7 15:31:52 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-07 15:31:50 -0500 (Thu, 07 Dec 2006)
New Revision: 1718
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
Log:
Fixing connection factories
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-07 19:15:49 UTC (rev 1717)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-07 20:31:50 UTC (rev 1718)
@@ -60,11 +60,26 @@
// Constructors --------------------------------------------------
public ClusteredClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
- Version serverVersion, boolean clientPing)
+ Version serverVersion, boolean clientPing,
+ ClientConnectionFactoryDelegate[] delegates,
+ int[] failoverIndexes)
{
super(objectID, serverId, serverLocatorURI, serverVersion, clientPing);
+ this.delegates = delegates;
+ this.failoverIndexes = failoverIndexes;
}
+ // Some of the properties of ClientConnectionFactoryDelegate are not exposed..
+ // I didn't want to expose then while I needed another delegate's properties to perform a copy.
+ // So, I created this Constructor so I could have access into protected members inside an extension class
+ public ClusteredClientConnectionFactoryDelegate(ClientConnectionFactoryDelegate mainDelegate,
+ ClientConnectionFactoryDelegate[] delegates,
+ int[] failoverIndexes)
+ {
+ this(mainDelegate.getID(), mainDelegate.serverId, mainDelegate.serverLocatorURI,
+ mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverIndexes);
+ }
+
// DelegateSupport overrides -------------------------------------
public void init()
@@ -92,13 +107,6 @@
// Public --------------------------------------------------------
- public void setFailoverDelegates(ClientConnectionFactoryDelegate[] delegates,
- int[] failoverIndexes)
- {
- this.delegates = delegates;
- this.failoverIndexes = failoverIndexes;
- }
-
// Only be used in testing
public ClientConnectionFactoryDelegate[] getDelegates()
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-07 19:15:49 UTC (rev 1717)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-07 20:31:50 UTC (rev 1718)
@@ -118,10 +118,19 @@
endpoints.put(uniqueName, endpoint);
ClientConnectionFactoryDelegate delegate = null;
-
+
if (clustered)
{
setupReplicator();
+ }
+ else
+ {
+ log.info("ConnectionFactoryJNDIMapper is non clustered");
+ }
+
+ /* if (clustered)
+ {
+ setupReplicator();
if (replicator != null)
{
@@ -136,14 +145,19 @@
//Local
delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
locatorURI, version, clientPing);
- }
+ } */
+ delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+ locatorURI, version, clientPing);
+
+
+ //delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
+
+
log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
delegates.put(uniqueName, delegate);
- //Now bind it in JNDI
- rebindConnectionFactory(initialContext, jndiBindings, delegate);
-
+
// We are clustered, we need to propagate the local delegate across the cluster.
// ... and then update it (and the clustered delegate on every node, while we're at it) by
@@ -152,7 +166,16 @@
// will rebind updated ConnectionFactories in JNDI.
// This will update the local node too
- replicator.put(CF_PREFIX + uniqueName, delegate);
+ log.info("CF_PREFIX=" + CF_PREFIX + " uniqueName=" + uniqueName + " delegate = " + delegate + " replicator = " + replicator);
+ if (replicator != null)
+ {
+ replicator.put(CF_PREFIX + uniqueName, delegate);
+ }
+ else
+ {
+ //Now bind it in JNDI
+ rebindConnectionFactory(initialContext, jndiBindings, delegate);
+ }
//Registering with the dispatcher should always be the last thing otherwise a client could use
//a partially initialised object
@@ -263,22 +286,19 @@
{
String uniqueName = sKey.substring(CF_PREFIX.length());
- ClusteredClientConnectionFactoryDelegate clusteredDelegate =
- (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
-
- if (clusteredDelegate == null)
+ if (log.isTraceEnabled())
{
- log.info("Delegates list: delegates.length=" + delegates.size());
+ log.trace("Delegates list: delegates.length=" + delegates.size());
for (Iterator iterDelegates = delegates.entrySet().iterator(); iterDelegates.hasNext();)
{
Map.Entry entry = (Map.Entry)iterDelegates.next();
- log.info("key=" + entry.getKey() + ", value=" + entry.getValue());
+ log.trace("key=" + entry.getKey() + ", value=" + entry.getValue());
}
throw new IllegalStateException("Cannot find connection delegate for " +
uniqueName + " to update");
}
- setFailoverDelegates(clusteredDelegate, updatedReplicantMap);
+ ClusteredClientConnectionFactoryDelegate clusteredDelegate = createClusteredDelegate(updatedReplicantMap);
// Now rebind ...
@@ -323,16 +343,22 @@
/**
* @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
*/
- private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
- Map localDelegates)
+ private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates)
{
+
+
//TODO: make it trace after the code is stable
log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
+
+
int s = localDelegates.size();
int[] nodesIDs = new int[s];
int[] failoverNodeIDs = new int[s];
ClientConnectionFactoryDelegate[] delegates = new ClientConnectionFactoryDelegate[s];
+ // Ths created ClusteredClientFactoryDelegate will copy the properties from this delegate which is
+ // the delegate on this current serverPeer
+ ClientConnectionFactoryDelegate mainDelegate = null;
int idx = 0;
@@ -349,8 +375,25 @@
failoverNodeIDs[idx] = failoverNodeID;
delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
+ if (delegates[idx].getServerId() == this.serverPeer.getServerPeerID())
+ {
+
+ // sanity check
+ if (mainDelegate != null)
+ {
+ throw new IllegalStateException("There are two server with serverID, verify your clustering configuration");
+ }
+ mainDelegate = delegates[idx];
+ }
+
idx++;
}
+ // sanity check
+ if (mainDelegate == null)
+ {
+ throw new IllegalStateException("Couldn't find a delegate for this serverID=" + this.serverPeer.getServerPeerID());
+ }
+
// Generate the failover indexes. This could probably be optimised if need be.
@@ -377,8 +420,11 @@
failoverIndexes[i] = failoverIndex;
}
-
- clusteredDelegate.setFailoverDelegates(delegates, failoverIndexes);
+
+ ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+ new ClusteredClientConnectionFactoryDelegate(mainDelegate,delegates,failoverIndexes);
+
+ return clusteredDelegate;
}
private void rebindConnectionFactory(Context ic,
More information about the jboss-cvs-commits
mailing list