[jboss-cvs] JBoss Messaging SVN: r2814 - in trunk: src/main/org/jboss/messaging/core/contract and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 28 10:06:26 EDT 2007
Author: timfox
Date: 2007-06-28 10:06:25 -0400 (Thu, 28 Jun 2007)
New Revision: 2814
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/contract/Message.java
trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
Removed extraneous logging, and more fixes
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -543,7 +543,7 @@
Replicator rep = (Replicator)postOffice;
- rep.put(queue.getName(), ServerSessionEndpoint.DUR_SUB_STATE_NO_CONSUMERS);
+ rep.remove(queue.getName());
}
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -117,8 +117,6 @@
static final String DUR_SUB_STATE_CONSUMERS = "C";
- static final String DUR_SUB_STATE_NO_CONSUMERS = "N";
-
static final String TEMP_QUEUE_MESSAGECOUNTER_PREFIX = "TempQueue.";
// Static ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/contract/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Message.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/contract/Message.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -38,6 +38,11 @@
public interface Message extends Streamable
{
/**
+ * This header can be used to trace a message across the cluster
+ */
+ public static final String HEADER_JBM_TRACE_ROUTE = "HEADER_JBM_TRACE_ROUTE";
+
+ /**
* This header is set on a message when a message is sucked from one node of the cluster to another
* and order preservation is true.
* The header is checked when sucking messages and if order preservation is true then the message is not accepted.
Modified: trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.contract.DeliveryObserver;
import org.jboss.messaging.core.contract.Distributor;
import org.jboss.messaging.core.contract.Filter;
+import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
@@ -222,8 +223,6 @@
{
if (distributor != null && distributor.getNumberOfReceivers() > 0)
{
- log.info("Deliver was called");
-
setReceiversReady(true);
deliverInternal();
@@ -456,13 +455,6 @@
}
}
- //Only used for testing
-
- public String toString()
- {
- return "ChannelSupport[" + channelID + "]";
- }
-
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -520,8 +512,6 @@
{
if (trace) { log.trace(this + " receivers not ready so not delivering"); }
- log.info("There are " + this.distributor.getNumberOfReceivers() + " receivers");
-
return;
}
@@ -667,6 +657,22 @@
try
{
+ if (trace)
+ {
+ //We add a header that tracks the route of the message across the cluster
+
+ String route = (String)ref.getMessage().getHeader(Message.HEADER_JBM_TRACE_ROUTE);
+
+ if (route == null)
+ {
+ route = "nodes:";
+ }
+
+ route += this + "-";
+
+ ref.getMessage().putHeader(Message.HEADER_JBM_TRACE_ROUTE, route);
+ }
+
if (tx == null)
{
if (persist && ref.getMessage().isReliable() && recoverable)
Modified: trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -54,6 +54,8 @@
// Attributes -----------------------------------------------------------------------------------
+ private boolean trace = log.isTraceEnabled();
+
private Distributor localDistributor;
private Distributor remoteDistributor;
@@ -75,24 +77,24 @@
{
//First try the local distributor
- log.info("** first trying with local distributor");
+ if (trace) { log.trace(this + " first trying with local distributor"); }
Delivery del = localDistributor.handle(observer, ref, tx);
- log.info("*** local distributor returned " + del);
+ if (trace) { log.trace(this + " local distributor returned " + del); }
if (del == null)
{
//If no local distributor takes the ref then we try the remote distributor
- log.info("** preserve ordering is " + preserveOrdering);
-
if (preserveOrdering)
{
if (ref.getMessage().getHeader(Message.CLUSTER_SUCKED) != null)
{
//The message has already been sucked once - don't suck it again
+ if (trace) { log.trace(this + " preserveOrdering is true and has already been sucked so not allowing message to be sucked again"); }
+
return null;
}
else
@@ -103,19 +105,11 @@
}
}
- log.info("*** sending to remote distributor");
-
- String wib = (String)ref.getMessage().getHeader("wib");
- if (wib == null)
- {
- wib = "nodes:";
- }
- wib += ((MessagingQueue)observer).getNodeID() + "-";
- ref.getMessage().putHeader("wib", wib);
-
+ if (trace) { log.trace(this + " trying with remote distributor"); }
+
del = remoteDistributor.handle(observer, ref, tx);
-
- log.info("** remote distributor returned " + del);
+
+ if (trace) { log.trace(this + " remote distributor returned " + del); }
}
return del;
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -221,10 +221,6 @@
sucker.setConsuming(true);
}
- else
- {
- log.info("No receivers ready set setting consuming to false");
- }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -232,14 +232,10 @@
while (iter.hasNext())
{
Integer nid = (Integer)iter.next();
-
- log.info("*********** CLOSING CLUSTER CONNECTION FOR NODE " + nid);
-
+
ConnectionInfo info = (ConnectionInfo)connections.remove(nid);
info.close();
-
- log.info("******* CLOSED");
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -152,8 +152,6 @@
if (trace) { log.trace(this + " Registering sucker"); }
- log.info("**** starting sucker sucking from queue " + this.getQueueName());
-
localQueue.registerSucker(this);
if (trace) { log.trace(this + " Registered sucker"); }
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -200,6 +200,11 @@
return dataChannel.getLocalAddress();
}
+ public long getCastTimeout()
+ {
+ return castTimeout;
+ }
+
public void multicastControl(ClusterRequest request, boolean sync) throws Exception
{
lock.readLock().acquire();
@@ -382,8 +387,6 @@
{
boolean retrievedState = false;
- log.info("***** waiting for state to arrive");
-
if (controlChannel.getState(null, stateTimeout))
{
//We are not the first member of the group, so let's wait for state to be got and processed
@@ -453,8 +456,6 @@
{
public byte[] getState()
{
- log.info("*** getting state");
-
try
{
lock.readLock().acquire();
@@ -472,8 +473,6 @@
byte[] state = groupListener.getState();
- log.info("**** got state " + state);
-
return state;
}
finally
@@ -495,13 +494,11 @@
public void setState(byte[] bytes)
{
- log.info("************* setting state");
synchronized (setStateLock)
{
try
{
groupListener.setState(bytes);
- log.info("* set it");
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -197,6 +197,8 @@
// Map <node id, PostOfficeAddressInfo>
private Map nodeIDAddressMap;
+
+ private Object waitForBindUnbindLock;
// Constructors ---------------------------------------------------------------------------------
@@ -249,6 +251,8 @@
channelIDMap = new HashMap();
nodeIDAddressMap = new ConcurrentHashMap();
+
+ waitForBindUnbindLock = new Object();
}
/*
@@ -381,116 +385,33 @@
{
return officeName;
}
-
+
public void addBinding(Binding binding, boolean allNodes) throws Exception
{
internalAddBinding(binding, allNodes, true);
- }
-
- public void internalAddBinding(Binding binding, boolean allNodes, boolean sync) throws Exception
- {
- if (trace) { log.trace(this.thisNodeID + " binding " + binding.queue + " with condition " + binding.condition + " all nodes " + allNodes); }
-
- if (binding == null)
- {
- throw new IllegalArgumentException("Binding is null");
- }
- Condition condition = binding.condition;
-
- Queue queue = binding.queue;
-
- if (queue == null)
- {
- throw new IllegalArgumentException("Queue is null");
- }
-
- if (queue.getNodeID() != thisNodeID)
+ if (allNodes)
{
- throw new IllegalArgumentException("Cannot bind a queue from another node");
+ //Now we must wait for all the bindings to appear in state
+ //This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
+
+ waitForBindUnbind(binding.queue.getName(), true);
}
-
- if (condition == null)
- {
- throw new IllegalArgumentException("Condition is null");
- }
-
- addBindingInMemory(binding);
-
- if (queue.isRecoverable())
- {
- // Need to write the mapping to the database
- insertBindingInStorage(condition, queue, allNodes);
- }
-
- if (clustered && queue.isClustered())
- {
- String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
-
- MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
- queue.isRecoverable(), true,
- queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
- queue.getMaxSize(),
- queue.isPreserveOrdering());
-
- ClusterRequest request = new BindRequest(info, allNodes);
-
-// if (sync)
-// {
-// syncSendRequest(request);
-// }
-// else
-// {
- //When sending as a result of an all binding being received from the cluster we send the bind on asynchronously
- //To avoid a deadlock which happens when you have one sync request hitting a node which then tries to send another back
- //to the original node
- groupMember.multicastControl(request, sync);
- // }
- }
}
-
+
public void removeBinding(String queueName, boolean allNodes) throws Throwable
{
- this.internalRemoveBinding(queueName, allNodes, true);
- }
-
- private void internalRemoveBinding(String queueName, boolean allNodes, boolean sync) throws Throwable
- {
- if (trace) { log.trace(this.thisNodeID + " unbind queue: " + queueName + " all nodes " + allNodes); }
-
- if (queueName == null)
- {
- throw new IllegalArgumentException("Queue name is null");
- }
-
- Binding removed = removeBindingInMemory(thisNodeID, queueName);
-
- Queue queue = removed.queue;
-
- Condition condition = removed.condition;
-
- if (queue.isRecoverable())
- {
- //Need to remove from db too
-
- deleteBindingFromStorage(queue);
- }
-
- queue.removeAllReferences();
-
- if (clustered && queue.isClustered())
- {
- String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
-
- MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
- queue.isRecoverable(), true);
-
- UnbindRequest request = new UnbindRequest(info, allNodes);
-
- groupMember.multicastControl(request, sync);
- }
- }
-
+ internalRemoveBinding(queueName, allNodes, true);
+
+ if (allNodes)
+ {
+ //Now we must wait for all the bindings to be removed from state
+ //This is necessary since the second unbind in an all unbind is sent asynchronously to avoid deadlock
+
+ waitForBindUnbind(queueName, false);
+ }
+ }
+
public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
{
if (ref == null)
@@ -828,7 +749,7 @@
Condition condition = conditionFactory.createCondition(mapping.getConditionText());
- addBindingInMemory(new Binding(condition, queue));
+ addBindingInMemory(new Binding(condition, queue));
if (allNodes)
{
@@ -837,43 +758,34 @@
//There is the possibility that two nodes send a bind all with the same name simultaneously OR
//a node starts and sends a bind "ALL" and the other nodes already have a queue with that name
//This is ok - but we must check for this and not create the local binding in this case
+
+ //Bind locally
+
+ long channelID = channelIDManager.getID();
- lock.readLock().acquire();
+ Queue queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
+ mapping.isRecoverable(), mapping.getMaxSize(), filter,
+ mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
+ mapping.isPreserveOrdering());
+
+ //We must cast back asynchronously to avoid deadlock
+ boolean added = internalAddBinding(new Binding(condition, queue2), false, false);
- Queue queue2 = null;
-
- try
- {
- if (localNameMap != null && localNameMap.get(mapping.getQueueName()) != null)
- {
- //Already exists - don't create it again!
- }
- else
- {
- //Bind locally
-
- long channelID = channelIDManager.getID();
+ if (added)
+ {
+ if (trace) { log.trace(this + " inserted in binding locally"); }
+
+ queue2.load();
- queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
- mapping.isRecoverable(), mapping.getMaxSize(), filter,
- mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
- mapping.isPreserveOrdering());
-
- internalAddBinding(new Binding(condition, queue2), false, false);
- }
+ queue2.activate();
}
- finally
- {
- lock.readLock().release();
- }
-
- if (queue2 != null)
- {
- queue2.load();
-
- queue2.activate();
- }
}
+
+ synchronized (waitForBindUnbindLock)
+ {
+ if (trace) { log.trace(this + " notifying bind unbind lock"); }
+ waitForBindUnbindLock.notifyAll();
+ }
}
/*
@@ -891,13 +803,20 @@
removeBindingInMemory(mapping.getNodeId(), mapping.getQueueName());
+ synchronized (waitForBindUnbindLock)
+ {
+ if (trace) { log.trace(this + " notifying bind unbind lock"); }
+ waitForBindUnbindLock.notifyAll();
+ }
+
if (allNodes)
{
//Also unbind locally
if (trace) { log.trace("allNodes is true, so also forcing a local unbind"); }
- removeBinding(mapping.getQueueName(), false);
+ // We must cast back asynchronously to avoid deadlock
+ internalRemoveBinding(mapping.getQueueName(), false, false);
}
}
@@ -1210,6 +1129,192 @@
// Private ------------------------------------------------------------------------------------
+ private void waitForBindUnbind(String queueName, boolean bind) throws Exception
+ {
+ if (trace) { log.trace(this + " waiting for " + (bind ? "bind" : "unbind") + " of "+ queueName + " on all nodes"); }
+
+ Set nodesToWaitFor = new HashSet(nodeIDAddressMap.keySet());
+
+ long timeToWait = groupMember.getCastTimeout();
+
+ long start = System.currentTimeMillis();
+
+ boolean boundAll = true;
+
+ boolean unboundAll = true;
+
+ synchronized (waitForBindUnbindLock)
+ {
+ do
+ {
+ boundAll = true;
+
+ unboundAll = true;
+
+ lock.readLock().acquire();
+
+ try
+ {
+ // Refresh the to wait for map - a node might have left
+
+ Iterator iter = nodesToWaitFor.iterator();
+
+ while (iter.hasNext())
+ {
+ Integer node = (Integer)iter.next();
+
+ if (!nodeIDAddressMap.containsKey(node))
+ {
+ iter.remove();
+ }
+ else
+ {
+ Map nameMap = (Map)this.nameMaps.get(node);
+
+ if (nameMap != null && nameMap.get(queueName) != null)
+ {
+ if (trace) { log.trace(this + " queue " + queueName + " exists on node " + node); }
+ unboundAll = false;
+ }
+ else
+ {
+ if (trace) { log.trace(this + " queue " + queueName + " does not exist on node " + node); }
+ boundAll = false;
+ }
+ }
+ }
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
+
+ if ((bind && !boundAll) || (!bind && !unboundAll))
+ {
+ try
+ {
+ if (trace) { log.trace(this + " waiting for bind unbind lock"); }
+ waitForBindUnbindLock.wait(groupMember.getCastTimeout());
+ if (trace) { log.trace(this + " woke up"); }
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
+ timeToWait -= System.currentTimeMillis() - start;
+ }
+ }
+ while (((bind && !boundAll) || (!bind && !unboundAll)) && timeToWait > 0);
+
+ if (trace) { log.trace(this + " waited ok"); }
+ }
+ if ((bind && !boundAll) || (!bind && !unboundAll))
+ {
+ throw new IllegalStateException(this + " timed out waiting for " + (bind ? " bind " : " unbind ") + "ALL to occur");
+ }
+ }
+
+ private boolean internalAddBinding(Binding binding, boolean allNodes, boolean sync) throws Exception
+ {
+ if (trace) { log.trace(this.thisNodeID + " binding " + binding.queue + " with condition " + binding.condition + " all nodes " + allNodes); }
+
+ if (binding == null)
+ {
+ throw new IllegalArgumentException("Binding is null");
+ }
+
+ Condition condition = binding.condition;
+
+ Queue queue = binding.queue;
+
+ if (queue == null)
+ {
+ throw new IllegalArgumentException("Queue is null");
+ }
+
+ if (queue.getNodeID() != thisNodeID)
+ {
+ throw new IllegalArgumentException("Cannot bind a queue from another node");
+ }
+
+ if (condition == null)
+ {
+ throw new IllegalArgumentException("Condition is null");
+ }
+
+ //The binding might already exist - this could happen if the queue is bind all simultaneously from more than one node of the cluster
+ boolean added = addBindingInMemory(binding);
+
+ if (added)
+ {
+ if (queue.isRecoverable())
+ {
+ // Need to write the mapping to the database
+ insertBindingInStorage(condition, queue, allNodes);
+ }
+
+ if (clustered && queue.isClustered())
+ {
+ String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
+
+ MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
+ queue.isRecoverable(), true,
+ queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
+ queue.getMaxSize(),
+ queue.isPreserveOrdering());
+
+ ClusterRequest request = new BindRequest(info, allNodes);
+
+ groupMember.multicastControl(request, sync);
+ }
+ }
+
+ return added;
+ }
+
+ private Binding internalRemoveBinding(String queueName, boolean allNodes, boolean sync) throws Throwable
+ {
+ if (trace) { log.trace(this.thisNodeID + " unbind queue: " + queueName + " all nodes " + allNodes); }
+
+ if (queueName == null)
+ {
+ throw new IllegalArgumentException("Queue name is null");
+ }
+
+ Binding removed = removeBindingInMemory(thisNodeID, queueName);
+
+ //The queue might not be removed (it's already removed) if two unbind all requests are sent simultaneously on the cluster
+ if (removed != null)
+ {
+ Queue queue = removed.queue;
+
+ Condition condition = removed.condition;
+
+ if (queue.isRecoverable())
+ {
+ //Need to remove from db too
+
+ deleteBindingFromStorage(queue);
+ }
+
+ queue.removeAllReferences();
+
+ if (clustered && queue.isClustered())
+ {
+ String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
+
+ MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
+ queue.isRecoverable(), true);
+
+ UnbindRequest request = new UnbindRequest(info, allNodes);
+
+ groupMember.multicastControl(request, sync);
+ }
+ }
+
+ return removed;
+ }
+
private void calculateFailoverMap()
{
//calculate the failover map
@@ -1275,8 +1380,6 @@
List targets = new ArrayList();
- log.info("routing to " + queues.size() + " queues");
-
while (iter.hasNext())
{
Queue queue = (Queue)iter.next();
@@ -1305,7 +1408,8 @@
if (filter == null || filter.accept(ref.getMessage()))
{
- log.info("Added queue " + queue + " to list of targets");
+ if (trace) { log.trace(this + " Added queue " + queue + " to list of targets"); }
+
targets.add(queue);
if (ref.getMessage().isReliable() && queue.isRecoverable())
@@ -1394,10 +1498,8 @@
{
Queue queue = (Queue)iter.next();
- if (trace) { log.trace("Routing ref to queue " + queue); }
+ if (trace) { log.trace(this + " Routing ref to queue " + queue); }
- log.info("Routing ref " + ref + " to queue " + queue);
-
Delivery del = queue.handle(null, ref, tx);
if (trace) { log.trace("Queue returned " + del); }
@@ -1438,14 +1540,17 @@
if (nameMap == null)
{
- throw new IllegalArgumentException("Cannot find name maps for node " + nodeID);
+ log.warn("Cannot find name maps for node " + nodeID);
+
+ return null;
}
Binding binding = (Binding)nameMap.remove(queueName);
if (binding == null)
{
- throw new IllegalArgumentException("Cannot find binding for queue name " + queueName);
+ log.warn("Cannot find binding for queue name " + queueName);
+ return null;
}
if (nameMap.isEmpty())
@@ -1497,14 +1602,14 @@
}
}
- private void addBindingInMemory(Binding binding) throws Exception
+ private boolean addBindingInMemory(Binding binding) throws Exception
{
Queue queue = binding.queue;
+ if (trace) { log.trace(this + " Adding binding in memory " + binding); }
+
lock.writeLock().acquire();
- if (trace) { log.trace(this + " Adding binding in memory " + binding); }
-
try
{
Integer nid = new Integer(queue.getNodeID());
@@ -1513,14 +1618,16 @@
if (nameMap != null && nameMap.containsKey(queue.getName()))
{
- throw new IllegalArgumentException("Name map for node " + nid + " already contains binding for queue " + queue.getName());
+ log.warn("Name map for node " + nid + " already contains binding for queue " + queue.getName());
+
+ return false;
}
Long cid = new Long(queue.getChannelID());
if (channelIDMap.containsKey(cid))
{
- throw new IllegalArgumentException("Channel id map for node " + nid + " already contains binding for queue " + cid);
+ throw new IllegalStateException("Channel id map for node " + nid + " already contains binding for queue " + cid);
}
if (nameMap == null)
@@ -1568,6 +1675,8 @@
ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_BIND, queue.getNodeID(), queue.getName());
clusterNotifier.sendNotification(notification);
+
+ return true;
}
/*
@@ -1647,7 +1756,7 @@
Queue queue = new MessagingQueue(thisNodeID, queueName, channelID, ms, pm,
true, filter, bindingClustered && clustered);
- log.info("**** loaded binding from storage: " + queueName);
+ if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
Condition condition = conditionFactory.createCondition(conditionText);
@@ -1794,7 +1903,7 @@
lock.writeLock().acquire();
- log.info("** cleaning data for node " + nodeToRemove);
+ if (trace) { log.trace(this + " cleaning data for node " + nodeToRemove); }
try
{
@@ -1818,14 +1927,8 @@
if (queue.getNodeID() == nodeToRemove.intValue())
{
- log.info("** removing queue " + queue);
-
toRemove.add(new Binding(condition, queue));
}
- else
- {
- log.info("** not removing " + queue);
- }
}
}
@@ -1866,8 +1969,14 @@
}
//remove node id - address info
- nodeIDAddressMap.remove(nodeToRemove);
+ nodeIDAddressMap.remove(nodeToRemove);
+ synchronized (waitForBindUnbindLock)
+ {
+ if (trace) { log.trace(this + " notifying bind unbind lock"); }
+ waitForBindUnbindLock.notifyAll();
+ }
+
//Recalculate the failover map
calculateFailoverMap();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -26,7 +26,6 @@
import javax.jms.InvalidDestinationException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -186,6 +185,8 @@
TextMessage tm = (TextMessage)cons0.receive(1000);
assertNotNull(tm);
+
+ log.info("Got message " + tm.getText());
assertEquals("message" + i, tm.getText());
}
@@ -199,6 +200,8 @@
TextMessage tm = (TextMessage)cons1.receive(1000);
assertNotNull(tm);
+
+ log.info("Got message " + tm.getText());
assertEquals("message" + i, tm.getText());
}
@@ -212,6 +215,8 @@
TextMessage tm = (TextMessage)cons2.receive(1000);
assertNotNull(tm);
+
+ log.info("Got message " + tm.getText());
assertEquals("message" + i, tm.getText());
}
@@ -225,6 +230,8 @@
TextMessage tm = (TextMessage)cons3.receive(1000);
assertNotNull(tm);
+
+ log.info("Got message " + tm.getText());
assertEquals("message" + i, tm.getText());
}
@@ -238,6 +245,8 @@
TextMessage tm = (TextMessage)cons4.receive(1000);
assertNotNull(tm);
+
+ log.info("Got message " + tm.getText());
assertEquals("message" + i, tm.getText());
}
@@ -497,8 +506,6 @@
conn1.start();
conn2.start();
- Thread.sleep(5000);
-
log.info("started");
// Send at node 0 - and make sure the messages are consumable from all the durable subs
@@ -963,11 +970,6 @@
sess2.unsubscribe("sub");
}
catch (Exception ignore) {}
- try
- {
- sess3.unsubscribe("sub");
- }
- catch (Exception ignore) {}
MessageConsumer cons1 = sess2.createDurableSubscriber(topic[1], "sub");
MessageConsumer cons2 = sess3.createDurableSubscriber(topic[2], "sub");
@@ -987,34 +989,65 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- TextMessage tm = sess1.createTextMessage("message" + i);
+ TextMessage tm = sess1.createTextMessage("message2-" + i);
prod.send(tm);
}
-
+
+
+ int offset = 0;
+
for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
TextMessage tm = (TextMessage)cons1.receive(1000);
-
assertNotNull(tm);
-
- assertEquals("message" + i * 2, tm.getText());
+ log.info("**** got message" + tm.getText());
+
+ if (tm.getText().substring("message2-".length()).equals("1"))
+ {
+ offset = 1;
+ }
+
+ assertEquals("message2-" + (i * 2 + offset), tm.getText());
}
-
+
+ Message msg = cons1.receive(2000);
+ assertNull(msg);
+
+ if (offset == 1)
+ {
+ offset = 0;
+ }
+ else
+ {
+ offset = 1;
+ }
+
for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
TextMessage tm = (TextMessage)cons2.receive(1000);
-
assertNotNull(tm);
-
- assertEquals("message" + (i * 2 + 1), tm.getText());
+ log.info("**** got message" + tm.getText());
+ assertEquals("message2-" + (i * 2 + offset), tm.getText());
}
-
+
+ msg = cons2.receive(2000);
+ assertNull(msg);
+
cons1.close();
cons2.close();
sess2.unsubscribe("sub");
- sess3.unsubscribe("sub");
+
+ try
+ {
+ sess3.unsubscribe("sub");
+ fail("Should already be unsubscribed");
+ }
+ catch (InvalidDestinationException e)
+ {
+ //Ok - the previous unsubscribe should do a cluster wide unsubscribe
+ }
}
finally
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -153,7 +153,6 @@
{
// most likely the remote server is not started, so spawn it
servers[i] = new ServerHolder(ServerManagement.spawn(i), true);
- log.info("server " + i + " online");
}
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-06-28 08:51:05 UTC (rev 2813)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-06-28 14:06:25 UTC (rev 2814)
@@ -352,9 +352,7 @@
postOfficeObjectName = sc.registerAndConfigureService(postOfficeConfig);
sc.setAttribute(postOfficeObjectName, "Clustered", clustered ? "true" : "false");
-
- log.info("************* SET CLUSTERED ATTRIBUTE TO " + clustered);
-
+
overrideAttributes(postOfficeObjectName, attrOverrides);
sc.invoke(postOfficeObjectName, "create", new Object[0], new String[0]);
sc.invoke(postOfficeObjectName, "start", new Object[0], new String[0]);
More information about the jboss-cvs-commits
mailing list