[jboss-cvs] JBoss Messaging SVN: r1705 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/tools/jmx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 5 09:06:46 EST 2006
Author: timfox
Date: 2006-12-05 09:06:32 -0500 (Tue, 05 Dec 2006)
New Revision: 1705
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Various fixes to get things running
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-05 14:06:32 UTC (rev 1705)
@@ -70,8 +70,8 @@
// Map<uniqueName<String> - ServerConnectionFactoryEndpoint>
protected Map endpoints;
- // Map<uniqueName<String> - ClusteredClientConnectionFactoryDelegate>
- protected Map clusteredDelegates;
+ // Map<uniqueName<String> - ClientConnectionFactoryDelegate> (not just clustered delegates)
+ protected Map delegates;
protected Replicator replicator;
@@ -81,7 +81,7 @@
{
this.serverPeer = serverPeer;
endpoints = new HashMap();
- clusteredDelegates = new HashMap();
+ delegates = new HashMap();
}
// ConnectionFactoryManager implementation -----------------------
@@ -98,6 +98,8 @@
boolean clustered)
throws Exception
{
+ log.info("Registering connection factory with name " + uniqueName + " bindings " + jndiBindings);
+
int id = serverPeer.getNextObjectID();
Version version = serverPeer.getVersion();
@@ -107,42 +109,51 @@
defaultTempQueueFullSize,
defaultTempQueuePageSize,
defaultTempQueueDownCacheSize);
- endpoints.put(uniqueName, endpoint);
-
- JMSDispatcher.instance.
- registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
-
- ClientConnectionFactoryDelegate localDelegate =
- new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
-
- if (!clustered)
+ endpoints.put(uniqueName, endpoint);
+
+ ClientConnectionFactoryDelegate delegate = null;
+
+ boolean replicateChanges = false;
+
+ if (clustered)
{
- rebindConnectionFactory(initialContext, jndiBindings, localDelegate);
- return;
+ setupReplicator();
+
+ if (replicator != null)
+ {
+ //Replicator might still be null since we might be deploying a clustered cf in a non clustered
+ //post office (which is ok)
+ delegate = new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+ }
}
-
- // We are clustered, we need to propagate the local delegate across the cluster.
-
- setupReplicator();
-
- if (replicator == null)
+
+ if (delegate == null)
{
- return;
+ //Local
+ delegate = new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
}
+
+ delegates.put(uniqueName, delegate);
+
+ //Now bind it in JNDI
+ rebindConnectionFactory(initialContext, jndiBindings, delegate);
+
+ if (replicateChanges)
+ {
+ // We are clustered, we need to propagate the local delegate across the cluster.
- // Create a "hollow" clustered delegate ...
+ // ... and then update it (and the clustered delegate on every node, while we're at it) by
+ // replicating the local delegate across the cluster. This way, the clustred delegates on each
+ // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
+ // will rebind updated ConnectionFactories in JNDI.
+ // This will update the local node too
- ClusteredClientConnectionFactoryDelegate clusteredDelegate =
- new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
-
- clusteredDelegates.put(uniqueName, clusteredDelegate);
-
- // ... and then update it (and the clustered delegate on every node, while we're at it) by
- // replicating the local delegate across the cluster. This way, the clustred delegates on each
- // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
- // will rebind updated ConnectionFactories in JNDI.
-
- replicator.put(CF_PREFIX + uniqueName, localDelegate);
+ replicator.put(CF_PREFIX + uniqueName, delegate);
+ }
+
+ //Registering with the dispatcher should always be the last thing otherwise a client could use
+ //a partially initialised object
+ JMSDispatcher.instance.registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
}
public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception
@@ -169,7 +180,7 @@
}
ClientConnectionFactoryDelegate delegate =
- (ClientConnectionFactoryDelegate)clusteredDelegates.remove(uniqueName);
+ (ClientConnectionFactoryDelegate)delegates.remove(uniqueName);
if (delegate == null)
{
@@ -245,7 +256,7 @@
String uniqueName = sKey.substring(CF_PREFIX.length());
ClusteredClientConnectionFactoryDelegate clusteredDelegate =
- (ClusteredClientConnectionFactoryDelegate)clusteredDelegates.get(uniqueName);
+ (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
if (clusteredDelegate == null)
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-12-05 14:06:32 UTC (rev 1705)
@@ -65,6 +65,7 @@
private String officeName;
+ //This lock protects the condition and name maps
protected ReadWriteLock lock;
protected MessageStore ms;
@@ -73,7 +74,7 @@
protected TransactionRepository tr;
- protected int nodeId;
+ protected int currentNodeId;
//Map <node id, Map < queue name, binding > >
protected Map nameMaps;
@@ -104,7 +105,7 @@
conditionMap = new LinkedHashMap();
- this.nodeId = nodeId;
+ this.currentNodeId = nodeId;
this.officeName = officeName;
@@ -172,7 +173,7 @@
try
{
//We currently only allow one binding per name per node
- Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+ Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
Binding binding = null;
@@ -186,7 +187,7 @@
throw new IllegalArgumentException("Binding already exists for name " + queue.getName());
}
- binding = new DefaultBinding(nodeId, condition, queue, false);
+ binding = new DefaultBinding(currentNodeId, condition, queue, false);
addBinding(binding);
@@ -217,13 +218,13 @@
try
{
- Binding binding = removeBinding(nodeId,queueName);
+ Binding binding = removeBinding(currentNodeId,queueName);
if (binding.getQueue().isRecoverable())
{
//Need to remove from db too
- deleteBinding(nodeId, binding.getQueue().getName());
+ deleteBinding(currentNodeId, binding.getQueue().getName());
}
binding.getQueue().removeAllReferences();
@@ -270,7 +271,7 @@
*/
protected Binding internalGetBindingForQueueName(String queueName)
{
- Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+ Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
Binding binding = null;
@@ -339,7 +340,7 @@
Binding binding = (Binding)iter.next();
//Sanity check
- if (binding.getNodeId() != this.nodeId)
+ if (binding.getNodeId() != this.currentNodeId)
{
throw new IllegalStateException("Local post office has foreign bindings!");
}
@@ -407,7 +408,7 @@
{
Binding binding = (Binding)iter.next();
- if (!localOnly || (binding.getNodeId() == this.nodeId))
+ if (!localOnly || (binding.getNodeId() == this.currentNodeId))
{
list.add(binding);
}
@@ -499,7 +500,7 @@
protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
{
Queue queue;
- if (nodeId == this.nodeId)
+ if (nodeId == this.currentNodeId)
{
QueuedExecutor executor = (QueuedExecutor)pool.get();
@@ -532,7 +533,7 @@
String filterString = binding.getQueue().getFilter() == null ? null : binding.getQueue().getFilter().getFilterString();
ps.setString(1, this.officeName);
- ps.setInt(2, this.nodeId);
+ ps.setInt(2, this.currentNodeId);
ps.setString(3, binding.getQueue().getName());
ps.setString(4, binding.getCondition());
if (filterString != null)
@@ -564,7 +565,7 @@
protected boolean deleteBinding(int parameterNodeId, String queueName) throws Exception
{
- if (parameterNodeId<0) parameterNodeId=this.nodeId;
+ if (parameterNodeId<0) parameterNodeId=this.currentNodeId;
Connection conn = null;
PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-05 14:06:32 UTC (rev 1705)
@@ -21,7 +21,6 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -40,8 +39,10 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
+
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
@@ -74,6 +75,8 @@
import org.jgroups.blocks.RequestHandler;
import org.w3c.dom.Element;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
/**
*
* A DefaultClusteredPostOffice
@@ -99,6 +102,7 @@
private boolean failAfterCommit;
private boolean failHandleResult;
+ //End of failure testing attributes
private boolean trace = log.isTraceEnabled();
@@ -152,27 +156,11 @@
private Map failedBindings;
private StatsSender statsSender;
+
+ private ReplicationListener nodeAddressMapListener;
private boolean started;
- public DefaultClusteredPostOffice()
- {
- init();
- }
-
- private void init()
- {
- holdingArea = new HashMap();
-
- replicatedData = new HashMap();
-
- replicationListeners = new HashSet();
-
- failoverMap = new LinkedHashMap();
-
- leftSet = new HashSet();
- }
-
/*
* Constructor using Element for configuration
*/
@@ -275,7 +263,15 @@
statsSender = new StatsSender(this, statsSendPeriod);
- init();
+ holdingArea = new HashMap();
+
+ replicatedData = new HashMap();
+
+ replicationListeners = new HashSet();
+
+ failoverMap = new LinkedHashMap();
+
+ leftSet = new HashSet();
}
// MessagingComponent overrides
@@ -309,10 +305,16 @@
MessageListener cml = new ControlMessageListener();
MembershipListener ml = new ControlMembershipListener();
RequestHandler rh = new PostOfficeRequestHandler();
+
+ //Register as a listener for nodeid-adress mapping events
+ nodeAddressMapListener = new NodeAddressMapListener();
+
+ registerListener(nodeAddressMapListener);
this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
Receiver r = new DataReceiver();
+
asyncChannel.setReceiver(r);
syncChannel.connect(groupName);
@@ -320,7 +322,7 @@
asyncChannel.connect(groupName);
super.start();
-
+
Address syncAddress = syncChannel.getLocalAddress();
Address asyncAddress = asyncChannel.getLocalAddress();
@@ -329,7 +331,7 @@
put(ADDRESS_INFO_KEY, info);
- verifyMembership(null,this.currentView);
+ //verifyMembership(null, this.currentView);
statsSender.start();
@@ -346,8 +348,11 @@
}
syncSendRequest(new LeaveClusterRequest(this.getNodeId()));
+
super.stop(sendNotification);
+ unregisterListener(nodeAddressMapListener);
+
statsSender.stop();
syncChannel.close();
@@ -365,10 +370,10 @@
{
if (trace)
{
- log.trace(this.nodeId + " binding clustered queue: " + queue + " with condition: " + condition);
+ log.trace(this.currentNodeId + " binding clustered queue: " + queue + " with condition: " + condition);
}
- if (queue.getNodeId() != this.nodeId)
+ if (queue.getNodeId() != this.currentNodeId)
{
log.warn("queue.getNodeId is not this node");
//throw new IllegalArgumentException("Queue node id does not match office node id");
@@ -386,7 +391,7 @@
throws Exception
{
BindRequest request =
- new BindRequest(this.nodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
+ new BindRequest(this.currentNodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
syncSendRequest(request);
@@ -396,12 +401,12 @@
{
if (trace)
{
- log.trace(this.nodeId + " unbind clustered queue: " + queueName);
+ log.trace(this.currentNodeId + " unbind clustered queue: " + queueName);
}
Binding binding = (Binding)super.unbindQueue(queueName);
- UnbindRequest request = new UnbindRequest(this.nodeId, queueName);
+ UnbindRequest request = new UnbindRequest(this.currentNodeId, queueName);
syncSendRequest(request);
@@ -412,7 +417,7 @@
{
if (trace)
{
- log.trace(this.nodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
+ log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
}
if (ref == null)
@@ -451,7 +456,7 @@
startInternalTx = true;
if (trace)
{
- log.trace(this.nodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
+ log.trace(this.currentNodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
}
}
}
@@ -485,7 +490,7 @@
if (trace)
{
- log.trace(this.nodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
+ log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
queue.getNodeId() +" local:" + queue.isLocal());
}
@@ -527,14 +532,14 @@
{
if (numberRemote == 1)
{
- if (trace) { log.trace(this.nodeId + " unicasting message to " + lastNodeId); }
+ if (trace) { log.trace(this.currentNodeId + " unicasting message to " + lastNodeId); }
//Unicast - only one node is interested in the message
asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
}
else
{
- if (trace) { log.trace(this.nodeId + " multicasting message to group"); }
+ if (trace) { log.trace(this.currentNodeId + " multicasting message to group"); }
//Multicast - more than one node is interested
asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
@@ -546,7 +551,7 @@
if (callback == null)
{
- callback = new CastMessagesCallback(nodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
+ callback = new CastMessagesCallback(currentNodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
//This callback MUST be executed first
@@ -593,48 +598,45 @@
public int getFailoverNodeID(int nodeId)
{
- Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
-
- if (failoverNode == null)
- {
- return nodeId;
+ synchronized (failoverMap)
+ {
+ Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
+
+ if (failoverNode == null)
+ {
+ return nodeId;
+ }
+
+ return failoverNode.intValue();
}
-
- return failoverNode.intValue();
}
// Replicator implementation --------------------------------------------------------------------------
public Map get(Serializable key) throws Exception
{
- lock.readLock().acquire();
-
- try
- {
+ synchronized (replicatedData)
+ {
Map m = (Map)replicatedData.get(key);
-
+
return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
}
- finally
- {
- lock.readLock().release();
- }
}
public void put(Serializable key, Serializable replicant) throws Exception
{
- putReplicantLocally(nodeId, key, replicant);
+ putReplicantLocally(currentNodeId, key, replicant);
- PutReplicantRequest request = new PutReplicantRequest(nodeId, key, replicant);
+ PutReplicantRequest request = new PutReplicantRequest(currentNodeId, key, replicant);
syncSendRequest(request);
}
public boolean remove(Serializable key) throws Exception
{
- if (removeReplicantLocally(this.nodeId, key))
+ if (removeReplicantLocally(this.currentNodeId, key))
{
- RemoveReplicantRequest request = new RemoveReplicantRequest(this.nodeId, key);
+ RemoveReplicantRequest request = new RemoveReplicantRequest(this.currentNodeId, key);
syncSendRequest(request);
@@ -648,20 +650,26 @@
public void registerListener(ReplicationListener listener)
{
- if (replicationListeners.contains(listener))
+ synchronized (replicationListeners)
{
- throw new IllegalArgumentException("Listener " + listener + " is already registered");
+ if (replicationListeners.contains(listener))
+ {
+ throw new IllegalArgumentException("Listener " + listener + " is already registered");
+ }
+ replicationListeners.add(listener);
}
- replicationListeners.add(listener);
}
public void unregisterListener(ReplicationListener listener)
{
- boolean removed = replicationListeners.remove(listener);
-
- if (!removed)
+ synchronized (replicationListeners)
{
- throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
+ boolean removed = replicationListeners.remove(listener);
+
+ if (!removed)
+ {
+ throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
+ }
}
}
@@ -669,16 +677,10 @@
public void handleNodeLeft(int nodeId) throws Exception
{
- lock.writeLock().acquire();
-
- try
+ synchronized (leftSet)
{
leftSet.add(new Integer(nodeId));
}
- finally
- {
- lock.writeLock().release();
- }
}
/**
@@ -687,9 +689,7 @@
public void putReplicantLocally(int originatorNodeID, Serializable key, Serializable replicant)
throws Exception
{
- lock.writeLock().acquire();
-
- try
+ synchronized (replicatedData)
{
Map m = (Map)replicatedData.get(key);
@@ -704,10 +704,6 @@
notifyListeners(key, m);
}
- finally
- {
- lock.writeLock().release();
- }
}
/**
@@ -715,10 +711,10 @@
*/
public boolean removeReplicantLocally(int originatorNodeID, Serializable key) throws Exception
{
- lock.writeLock().acquire();
-
- try
+ synchronized (replicatedData)
{
+ log.info(this.currentNodeId + " removing key " + key + " from node " + originatorNodeID);
+
Map m = (Map)replicatedData.get(key);
if (m == null)
@@ -736,16 +732,11 @@
if (m.isEmpty())
{
replicatedData.remove(key);
- }
-
+ }
notifyListeners(key, m);
-
- return true;
+
+ return true;
}
- finally
- {
- lock.writeLock().release();
- }
}
/*
@@ -759,7 +750,7 @@
if (trace)
{
- log.info(this.nodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);
+ log.info(this.currentNodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);
}
try
@@ -783,7 +774,7 @@
if (binding != null && failed)
{
- throw new IllegalArgumentException(this.nodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
+ throw new IllegalArgumentException(this.currentNodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
}
binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable, failed);
@@ -805,7 +796,7 @@
if (trace)
{
- log.trace(this.nodeId + " removing binding from node: " + nodeId + " queue: " + queueName);
+ log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName);
}
try
@@ -829,7 +820,7 @@
{
if (trace)
{
- log.trace(this.nodeId + " routing from cluster, message: " + message + " routing key " +
+ log.trace(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
routingKey + " map " + queueNameNodeIdMap);
}
@@ -860,7 +851,7 @@
{
Binding binding = (Binding)iter.next();
- if (binding.getNodeId() == this.nodeId)
+ if (binding.getNodeId() == this.currentNodeId)
{
boolean handle = true;
@@ -873,7 +864,7 @@
if (in != null)
{
- handle = in.intValue() == nodeId;
+ handle = in.intValue() == currentNodeId;
}
}
@@ -887,7 +878,7 @@
if (trace)
{
- log.trace(this.nodeId + " queue " + queue.getName() + " handled reference from cluster " + del);
+ log.trace(this.currentNodeId + " queue " + queue.getName() + " handled reference from cluster " + del);
}
}
}
@@ -909,7 +900,7 @@
*/
public void asyncSendRequest(ClusterRequest request) throws Exception
{
- if (trace) { log.trace(this.nodeId + " sending asynch request to group, request: " + request); }
+ if (trace) { log.trace(this.currentNodeId + " sending asynch request to group, request: " + request); }
byte[] bytes = writeRequest(request);
@@ -921,11 +912,11 @@
*/
public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
{
- if (trace) { log.trace(this.nodeId + " sending asynch request to single node, request: " + request + " node " + nodeId); }
+ if (trace) { log.trace(this.currentNodeId + " sending asynch request to single node, request: " + request + " node " + nodeId); }
Address address = this.getAddressForNodeId(nodeId, false);
- if (trace) { log.trace(this.nodeId + " sending to address " + address); }
+ if (trace) { log.trace(this.currentNodeId + " sending to address " + address); }
if (address == null)
{
@@ -948,13 +939,13 @@
{
holdingArea.put(id, tx);
- if (trace) { log.trace(this.nodeId + " added transaction " + tx + " to holding area with id " + id); }
+ if (trace) { log.trace(this.currentNodeId + " added transaction " + tx + " to holding area with id " + id); }
}
}
public void commitTransaction(TransactionId id) throws Throwable
{
- if (trace) { log.trace(this.nodeId + " committing transaction " + id ); }
+ if (trace) { log.trace(this.currentNodeId + " committing transaction " + id ); }
ClusterTransaction tx = null;
@@ -970,12 +961,12 @@
tx.commit(this);
- if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
+ if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
}
public void rollbackTransaction(TransactionId id) throws Throwable
{
- if (trace) { log.trace(this.nodeId + " rolling back transaction " + id ); }
+ if (trace) { log.trace(this.currentNodeId + " rolling back transaction " + id ); }
ClusterTransaction tx = null;
@@ -991,7 +982,7 @@
tx.rollback(this);
- if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
+ if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
}
/**
@@ -999,7 +990,7 @@
*/
public void check(Integer nodeId) throws Throwable
{
- if (trace) { log.trace(this.nodeId + " checking for any stranded transactions for node " + nodeId); }
+ if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
synchronized (holdingArea)
{
@@ -1021,7 +1012,7 @@
boolean commit = tx.check(this);
- if (trace) { log.trace(this.nodeId + " transaction " + tx + " will be committed?: " + commit); }
+ if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
if (commit)
{
@@ -1034,7 +1025,7 @@
toRemove.add(id);
- if (trace) { log.trace(this.nodeId + " resolved " + tx); }
+ if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
}
}
@@ -1049,7 +1040,7 @@
holdingArea.remove(id);
}
}
- if (trace) { log.trace(this.nodeId + " check complete"); }
+ if (trace) { log.trace(this.currentNodeId + " check complete"); }
}
public void sendQueueStats() throws Exception
@@ -1065,7 +1056,7 @@
try
{
- Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+ Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
if (nameMap != null)
{
@@ -1090,7 +1081,7 @@
statsList.add(stats);
- if (trace) { log.trace(this.nodeId + " adding stat for send " + stats); }
+ if (trace) { log.trace(this.currentNodeId + " adding stat for send " + stats); }
}
}
}
@@ -1103,11 +1094,11 @@
if (statsList != null)
{
- ClusterRequest req = new QueueStatsRequest(nodeId, statsList);
+ ClusterRequest req = new QueueStatsRequest(currentNodeId, statsList);
asyncSendRequest(req);
- if (trace) { log.trace(this.nodeId + " Sent stats"); }
+ if (trace) { log.trace(this.currentNodeId + " Sent stats"); }
}
}
@@ -1115,11 +1106,11 @@
{
lock.readLock().acquire();
- if (trace) { log.trace(this.nodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
+ if (trace) { log.trace(this.currentNodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
try
{
- if (nodeId == this.nodeId)
+ if (nodeId == this.currentNodeId)
{
//Sanity check
throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
@@ -1130,7 +1121,7 @@
if (nameMap == null)
{
//This is ok, the node might have left
- if (trace) { log.trace(this.nodeId + " cannot find node in name map, i guess the node might have left?"); }
+ if (trace) { log.trace(this.currentNodeId + " cannot find node in name map, i guess the node might have left?"); }
}
else
{
@@ -1145,7 +1136,7 @@
if (bb == null)
{
//I guess this is possible if the queue was unbound
- if (trace) { log.trace(this.nodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
+ if (trace) { log.trace(this.currentNodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
}
else
{
@@ -1153,7 +1144,7 @@
stub.setStats(st);
- if (trace) { log.trace(this.nodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
+ if (trace) { log.trace(this.currentNodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
@@ -1166,7 +1157,7 @@
//TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
- if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+ if (trace) { log.trace(this.currentNodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
localQueue.setPullQueue(toQueue);
@@ -1177,7 +1168,7 @@
localQueue.deliver(false);
- if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
+ if (trace) { log.trace(this.currentNodeId + " triggered delivery for " + localQueue.getName()); }
}
}
}
@@ -1199,7 +1190,7 @@
public void handleMessagePullResult(int remoteNodeId, long holdingTxId,
String queueName, org.jboss.messaging.core.Message message) throws Throwable
{
- if (trace) { log.trace(this.nodeId + " handling pull result " + message + " for " + queueName); }
+ if (trace) { log.trace(this.currentNodeId + " handling pull result " + message + " for " + queueName); }
Binding binding = getBindingForQueueName(queueName);
@@ -1246,22 +1237,22 @@
if (message.isReliable())
{
//Only reliable messages will be in holding area
- this.asyncSendRequest(new RollbackPullRequest(this.nodeId, holdingTxId), remoteNodeId);
+ this.asyncSendRequest(new RollbackPullRequest(this.currentNodeId, holdingTxId), remoteNodeId);
- if (trace) { log.trace(this.nodeId + " send rollback pull request"); }
+ if (trace) { log.trace(this.currentNodeId + " send rollback pull request"); }
}
}
}
public int getNodeId()
{
- return nodeId;
+ return currentNodeId;
}
public String toString()
{
StringBuffer sb = new StringBuffer("ClusteredPostOffice[");
- sb.append(nodeId).append(":").append(getOfficeName()).append(":");
+ sb.append(currentNodeId).append(":").append(getOfficeName()).append(":");
if (syncChannel == null)
{
@@ -1324,7 +1315,7 @@
{
if (oldView != null)
{
- for(Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
{
Address address = (Address)i.next();
if (!newView.containsMember(address))
@@ -1334,7 +1325,7 @@
}
}
- for(Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
{
Address address = (Address)i.next();
if (oldView == null || !oldView.containsMember(address))
@@ -1457,7 +1448,7 @@
}
//Create a new binding
- Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
+ Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
stub.getName(), stub.getChannelID(),
stub.getFilter(), stub.isRecoverable(), failed);
@@ -1497,7 +1488,7 @@
{
//First look in the failed map
//Failed bindings are stored in the failed map by channel id
- Map channelMap = (Map)failedBindings.get(new Integer(nodeId));
+ Map channelMap = (Map)failedBindings.get(new Integer(currentNodeId));
Binding binding = null;
if (channelMap != null)
{
@@ -1507,7 +1498,7 @@
if (binding == null)
{
//Not found in the failed map - look in the name map
- Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+ Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
if (nameMap != null)
{
@@ -1684,7 +1675,7 @@
if (bindings == null)
{
- bindings = new DefaultClusteredBindings(nodeId);
+ bindings = new DefaultClusteredBindings(currentNodeId);
conditionMap.put(condition, bindings);
}
@@ -1769,7 +1760,7 @@
{
//The state will be set in due course via the MessageListener - we must wait until this happens
- if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
+ if (trace) { log.trace(this.currentNodeId + " Not first member of group- so waiting for state to arrive...."); }
synchronized (setStateLock)
{
@@ -1780,14 +1771,14 @@
}
}
- if (trace) { log.trace(this.nodeId + " Received state"); }
+ if (trace) { log.trace(this.currentNodeId + " Received state"); }
}
}
protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
{
Queue queue;
- if (nodeId == this.nodeId)
+ if (nodeId == this.currentNodeId)
{
QueuedExecutor executor = (QueuedExecutor)pool.get();
@@ -1806,16 +1797,10 @@
private boolean leaveMessageReceived(Integer nodeId) throws Exception
{
- lock.writeLock().acquire();
-
- try
+ synchronized (leftSet)
{
return leftSet.remove(nodeId);
}
- finally
- {
- lock.writeLock().release();
- }
}
/*
@@ -1857,8 +1842,15 @@
removeBinding(nodeID.intValue(), binding.getQueue().getName());
}
- }
+ }
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ synchronized (replicatedData)
+ {
// We need to remove any replicant data for the node. This includes the node-address info.
for(Iterator i = replicatedData.entrySet().iterator(); i.hasNext(); )
{
@@ -1876,12 +1868,8 @@
// Need to trigger listeners
notifyListeners(key, replicants);
- }
+ }
}
- finally
- {
- lock.writeLock().release();
- }
}
/**
@@ -1890,10 +1878,14 @@
*/
private void notifyListeners(Serializable key, Map updatedReplicantMap)
{
- for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
- {
- ReplicationListener listener = (ReplicationListener)i.next();
- listener.onReplicationChange(key, updatedReplicantMap);
+ synchronized (replicationListeners)
+ {
+ for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
+ {
+ ReplicationListener listener = (ReplicationListener)i.next();
+
+ listener.onReplicationChange(key, updatedReplicantMap);
+ }
}
}
@@ -1913,13 +1905,33 @@
if (trace) { log.trace(this + " request sent OK"); }
}
+ //DEBUG ONLY - remove this
+ private void dumpNodeIdAddressMap(Map map) throws Exception
+ {
+ log.info("** DUMPING NODE ADDRESS MAPPING");
+
+ Iterator iter = map.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ Integer theNodeId = (Integer)entry.getKey();
+
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
+
+ log.info("node id: " + theNodeId +" --------->(async:sync) " + info.getAsyncChannelAddress() + ":" + info.getSyncChannelAddress());
+ }
+
+ log.info("** END DUMP");
+ }
+
//TODO - this is a bit tortuous - needs optimising
private Integer getNodeIdForSyncAddress(Address address) throws Exception
{
- lock.readLock().acquire();
- try
- {
+ synchronized (replicatedData)
+ {
Map map = get(ADDRESS_INFO_KEY);
if (map == null)
@@ -1927,43 +1939,54 @@
throw new IllegalStateException("Cannot find node id -> address mapping");
}
+ this.dumpNodeIdAddressMap(map);
+
Iterator iter = map.entrySet().iterator();
- Integer nodeId = null;
+ log.info("iterating, looking for " + address);
+
+ Integer theNodeId = null;
while (iter.hasNext())
{
Map.Entry entry = (Map.Entry)iter.next();
PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
+ log.info("info synch channel address: " + info.getSyncChannelAddress());
+
if (info.getSyncChannelAddress().equals(address))
{
- nodeId = (Integer)entry.getKey();
+ log.info("equal");
+ theNodeId = (Integer)entry.getKey();
+ break;
}
+ else
+ {
+ log.info("Not equal");
+ }
}
- return nodeId;
+ return theNodeId;
}
- finally
- {
- lock.readLock().release();
- }
}
private boolean knowAboutNodeId(int nodeId)
{
//The nodeid->Address info mapping is stored in the replicated data
- Map nodeIdAddressMapping = (Map)replicatedData.get(ADDRESS_INFO_KEY);
-
- if (nodeIdAddressMapping == null)
- {
- return false;
- }
- else
- {
- Object obj = nodeIdAddressMapping.get(new Integer(nodeId));
+ synchronized (replicatedData)
+ {
+ Map nodeIdAddressMapping = (Map)replicatedData.get(ADDRESS_INFO_KEY);
- return obj != null;
+ if (nodeIdAddressMapping == null)
+ {
+ return false;
+ }
+ else
+ {
+ Object obj = nodeIdAddressMapping.get(new Integer(nodeId));
+
+ return obj != null;
+ }
}
}
@@ -1973,7 +1996,7 @@
*/
private boolean isFailoverNodeForNode(int nodeId)
{
- return this.nodeId == getFailoverNodeID(nodeId);
+ return this.currentNodeId == getFailoverNodeID(nodeId);
}
private byte[] getStateAsBytes() throws Exception
@@ -2004,8 +2027,17 @@
}
}
- SharedState state = new SharedState(bindings, replicatedData);
+ //Need to copy
+ Map copy;
+
+ synchronized (replicatedData)
+ {
+ copy = copyReplicatedData(replicatedData);
+ }
+
+ SharedState state = new SharedState(bindings, copy);
+
return StreamUtils.toBytes(state);
}
@@ -2034,7 +2066,7 @@
Binding binding = this.createBinding(info.getNodeId(), info.getCondition(), info.getQueueName(), info.getChannelId(),
info.getFilterString(), info.isDurable(),info.isFailed());
- if (binding.getNodeId() == this.nodeId)
+ if (binding.getNodeId() == this.currentNodeId)
{
//We deactivate if this is one of our own bindings - it can only
//be one of our own durable bindings - and since state is retrieved before we are fully started
@@ -2046,11 +2078,20 @@
addBinding(binding);
}
- //Copy the map
- this.replicatedData.clear();
+ //Update the replicated data
- iter = state.getReplicatedData().entrySet().iterator();
+ synchronized (replicatedData)
+ {
+ replicatedData = copyReplicatedData(state.getReplicatedData());
+ }
+ }
+
+ private Map copyReplicatedData(Map toCopy)
+ {
+ Map copy = new HashMap();
+ Iterator iter = toCopy.entrySet().iterator();
+
while (iter.hasNext())
{
Map.Entry entry = (Map.Entry)iter.next();
@@ -2063,8 +2104,10 @@
m.putAll(replicants);
- replicatedData.put(key, m);
+ copy.put(key, m);
}
+
+ return copy;
}
@@ -2096,9 +2139,7 @@
private Address getAddressForNodeId(int nodeId, boolean sync) throws Exception
{
- lock.readLock().acquire();
-
- try
+ synchronized (replicatedData)
{
Map map = this.get(ADDRESS_INFO_KEY);
@@ -2124,64 +2165,11 @@
{
return null;
}
- }
- finally
- {
- lock.readLock().release();
- }
+ }
}
- /*
- * Given a JGroups view, generate a map of node to failover node.
- * The mapping is determined by a pluggable policy.
- */
- private void generateFailoverMap(View view) throws Exception
- {
- List nodes = new ArrayList();
-
- for (Iterator i = view.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
+
- // Ignore own address, a node can't be its own failover node
- if (syncChannel.getLocalAddress().equals(address))
- {
- continue;
- }
-
- // Convert to node id
- // TODO this should be optimised - currently the implementation of the lookup
- // is a bit tortuous
-
- Integer n = getNodeIdForSyncAddress(address);
-
- if (n == null)
- {
- throw new IllegalStateException("Cannot find node id for address: " + address);
- }
-
- nodes.add(n);
- }
-
- List failoverNodes = failoverMapper.generateMapping(nodes);
-
- // Now put this in the map of node -> failover node
-
- failoverMap.clear();
-
- Iterator iter = nodes.iterator();
- Iterator iter2 = failoverNodes.iterator();
-
- while (iter.hasNext())
- {
- Integer node = (Integer)iter.next();
-
- Integer failoverNode = (Integer)iter2.next();
-
- failoverMap.put(node, failoverNode);
- }
- }
-
/*
* A new node has joined the group
*/
@@ -2189,8 +2177,10 @@
{
if (trace) { log.trace(this + ": " + address + " joined"); }
+ log.info(this.currentNodeId + " Node with address: " + address + " joined");
+
// We need to regenerate the failover map
- generateFailoverMap(currentView);
+ //generateFailoverMap(currentView);
}
/*
@@ -2199,32 +2189,34 @@
private void nodeLeft(Address address) throws Throwable
{
if (trace) { log.trace(this + ": " + address + " left"); }
+
+ log.info(this.currentNodeId + " Node with address: " + address + " left");
- Integer nodeId = getNodeIdForSyncAddress(address);
+ Integer theNodeId = getNodeIdForSyncAddress(address);
- if (nodeId != null)
+ if (theNodeId == null)
{
- throw new IllegalStateException("Cannot find node id for address " + address);
+ throw new IllegalStateException(this.currentNodeId + " Cannot find node id for address " + address);
}
- boolean crashed = !this.leaveMessageReceived(nodeId);
+ boolean crashed = !this.leaveMessageReceived(theNodeId);
- if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
+ if (trace) { log.trace("Node " + address + " id: " + theNodeId +" has left the group, crashed = " + crashed); }
//Cleanup any hanging transactions - we do this irrespective of whether we crashed
- check(nodeId);
+ check(theNodeId);
//Need to evaluate this before we regenerate the failover map
- boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
+ boolean isFailover = isFailoverNodeForNode(theNodeId.intValue());
//Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
//since that may cause connection factories to be rebound
- generateFailoverMap(currentView);
+ //generateFailoverMap(currentView);
//Remove any replicant data and non durable bindings for the node - again we need to do this
//irrespective of whether we crashed
//This will notify any listeners which will recalculate the connection factory delegates and failover delegates
- removeDataForNode(nodeId);
+ removeDataForNode(theNodeId);
if (crashed && isFailover)
{
@@ -2233,7 +2225,7 @@
//TODO server side valve
- failOver(nodeId.intValue());
+ failOver(theNodeId.intValue());
}
}
@@ -2337,8 +2329,8 @@
// + DefaultClusteredPostOffice.this.getOfficeName()); }
//TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
//TODO: can't we do the same since this is pretty useful?
- log.info(DefaultClusteredPostOffice.this + " got new view: " + newView + " postOffice:"
- + DefaultClusteredPostOffice.this.getOfficeName());
+ log.info(currentNodeId + " got new view: " + newView + " postOffice:"
+ + DefaultClusteredPostOffice.this.getOfficeName());
// JGroups will make sure this method is never called by more than one thread concurrently
@@ -2347,11 +2339,16 @@
// Since now membership is sent over state transfer, we have to wait stateSet to be set.
// If this logic is not good enough, we will have to place Address in a separate data structure
- // as it used to be (NodeInfos)
- if (stateSet)
- {
+ // as it used to be (NodeInfos) - Clebert
+
+
+ // FIXME - I don't understand why this check with flag (stateSet)is necessary
+ //The state will always be set before any view changes occur - JGroups guarantees this -
+ //Maybe I am wrong - Tim
+// if (stateSet)
+// {
verifyMembership(oldView, newView);
- }
+ // }
}
public byte[] getState()
@@ -2391,7 +2388,7 @@
public void receive(Message message)
{
- if (trace) { log.trace(nodeId + " received message " + message + " on async channel"); }
+ if (trace) { log.trace(currentNodeId + " received message " + message + " on async channel"); }
try
{
@@ -2423,7 +2420,7 @@
{
public Object handle(Message message)
{
- if (trace) { log.info(nodeId + " received message " + message + " on sync channel"); }
+ if (trace) { log.info(currentNodeId + " received message " + message + " on sync channel"); }
try
{
byte[] bytes = message.getBuffer();
@@ -2441,4 +2438,66 @@
}
}
}
+
+ /*
+ * We use this class to respond to node address mappings being added or removed from the cluster
+ * and then recalculate the node->failover node mapping
+ *
+ */
+ private class NodeAddressMapListener implements ReplicationListener
+ {
+
+ public void onReplicationChange(Serializable key, Map updatedReplicantMap)
+ {
+ if (key instanceof String && ((String)key).equals(ADDRESS_INFO_KEY))
+ {
+ log.info(currentNodeId + " got node address change");
+
+ try
+ {
+ //DEBUG only
+ dumpNodeIdAddressMap(updatedReplicantMap);
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ //A node-address mapping has been added/removed from global state-
+ //We need to update the failover map
+ generateFailoverMap(updatedReplicantMap);
+ }
+ }
+
+ private void generateFailoverMap(Map nodeAddressMap)
+ {
+ List nodes = new ArrayList(nodeAddressMap.keySet());
+
+ log.info("generating failover map");
+
+ log.info("I have " + nodes.size() + " nodes");
+
+ List failoverNodes = failoverMapper.generateMapping(nodes);
+
+ log.info("I generated " + failoverNodes.size() +" failover nodes");
+
+ // Now put this in the map of node -> failover node
+
+ synchronized (failoverMap)
+ {
+ failoverMap.clear();
+
+ Iterator iter = nodes.iterator();
+ Iterator iter2 = failoverNodes.iterator();
+
+ while (iter.hasNext())
+ {
+ Integer node = (Integer)iter.next();
+
+ Integer failoverNode = (Integer)iter2.next();
+
+ failoverMap.put(node, failoverNode);
+ }
+ }
+ }
+ }
}
\ No newline at end of file
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-05 14:06:32 UTC (rev 1705)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.FailoverMapper;
/**
@@ -40,22 +41,25 @@
*/
public class DefaultFailoverMapper implements FailoverMapper
{
-
+ private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
+
public List generateMapping(List nodes)
{
- List failoverNodes = new ArrayList(nodes.size());
+ int s = nodes.size();
+ log.info("Genertaing failover mapping, node size="+ s);
+
+ List failoverNodes = new ArrayList(s);
+
if (!(nodes instanceof ArrayList))
{
//So we can ensure fast index based access
nodes = new ArrayList(nodes);
}
-
- int s = nodes.size();
-
+
for (int i = 0; i < s; i++)
{
- int j = i++;
+ int j = i + 1;
if (j == s)
{
@@ -67,6 +71,8 @@
failoverNodes.add(failoverNode);
}
+ log.info("Returning " + failoverNodes.size() + " nodes");
+
return failoverNodes;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2006-12-05 14:06:32 UTC (rev 1705)
@@ -36,6 +36,7 @@
Object execute(PostOfficeInternal office) throws Throwable
{
office.handleNodeLeft(nodeId);
+
return null;
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-12-05 14:06:32 UTC (rev 1705)
@@ -85,6 +85,60 @@
super.tearDown();
}
+ public final void testSimpleJoinLeave() throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ //ClusteredPostOffice office3 = null;
+
+ try
+ {
+ log.info("Starting office 1");
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ log.info("starting office 2");
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ //office3 = createClusteredPostOffice(3, "testgroup");
+
+ Thread.sleep(2000);
+
+ office1.stop();
+ office1 = null;
+
+ office2.stop();
+ office2 = null;
+
+// office3.stop();
+// office3 = null;
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+// if (office3 != null)
+// {
+// office3.stop();
+// }
+
+ if (checkNoBindingData())
+ {
+ fail("data still in database");
+ }
+ }
+
+ }
+
public final void testClusteredBindUnbind() throws Throwable
{
ClusteredPostOffice office1 = null;
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-05 14:06:32 UTC (rev 1705)
@@ -56,6 +56,7 @@
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.jboss.MBeanConfigurationElement;
import org.jboss.test.messaging.tools.jboss.ServiceDeploymentDescriptor;
+import org.jboss.test.messaging.tools.jndi.Constants;
import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactoryBuilder;
import org.jboss.tm.TransactionManagerLocator;
@@ -290,6 +291,8 @@
Hashtable t = InVMInitialContextFactory.getJNDIEnvironment(serverIndex);
System.setProperty("java.naming.factory.initial",
(String)t.get("java.naming.factory.initial"));
+ System.setProperty(Constants.SERVER_INDEX_PROPERTY_NAME,
+ Integer.toString(serverIndex));
initialContext = new InitialContext();
More information about the jboss-cvs-commits
mailing list