[jboss-cvs] JBoss Messaging SVN: r2815 - in trunk: src/main/org/jboss/jms/server/destination and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 28 16:11:38 EDT 2007
Author: timfox
Date: 2007-06-28 16:11:38 -0400 (Thu, 28 Jun 2007)
New Revision: 2815
Modified:
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/destination/QueueService.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/contract/Binding.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
Log:
Fixed bug in merging queues
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -1436,7 +1436,10 @@
{
Queue queue = (Queue)iter.next();
- postOffice.removeBinding(queue.getName(), false);
+ //Durable subs need to be removed on all nodes
+ boolean all = !isQueue && queue.isRecoverable();
+
+ postOffice.removeBinding(queue.getName(), all);
}
return true;
Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -109,7 +109,7 @@
destination.getFullSize(), destination.getPageSize(),
destination.getDownCacheSize(), destination.isClustered(),
serverPeer.isDefaultPreserveOrdering());
- po.addBinding(new Binding(queueCond, queue), false);
+ po.addBinding(new Binding(queueCond, queue, false), false);
queue.activate();
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -566,7 +566,7 @@
// make a binding for this temporary queue
// temporary queues need to bound on ALL nodes of the cluster
- postOffice.addBinding(new Binding(cond, coreQueue), true);
+ postOffice.addBinding(new Binding(cond, coreQueue, true), true);
coreQueue.activate();
}
@@ -702,7 +702,7 @@
}
}
- postOffice.removeBinding(sub.getName(), true);
+ postOffice.removeBinding(sub.getName(), sub.isClustered());
String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX + sub.getName();
@@ -1276,7 +1276,7 @@
JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
- postOffice.addBinding(new Binding(topicCond, queue), false);
+ postOffice.addBinding(new Binding(topicCond, queue, false), false);
queue.activate();
@@ -1332,9 +1332,9 @@
mDest.isClustered(),
sp.isDefaultPreserveOrdering());
- // Durable subs must be bound on ALL nodes of the cluster
+ // Durable subs must be bound on ALL nodes of the cluster (if clustered)
- postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue), true);
+ postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), true);
queue.activate();
@@ -1408,7 +1408,7 @@
// Durable subs must be unbound on ALL nodes of the cluster
- postOffice.removeBinding(queue.getName(), true);
+ postOffice.removeBinding(queue.getName(), mDest.isClustered());
// create a fresh new subscription
@@ -1422,7 +1422,7 @@
// Durable subs must be bound on ALL nodes of the cluster
- postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue), true);
+ postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), true);
queue.activate();
Modified: trunk/src/main/org/jboss/messaging/core/contract/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Binding.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/contract/Binding.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -32,19 +32,23 @@
*/
public class Binding
{
- public Binding(Condition condition, Queue queue)
+ public Binding(Condition condition, Queue queue, boolean allNodes)
{
this.condition = condition;
this.queue = queue;
+
+ this.allNodes = allNodes;
}
public Condition condition;
public Queue queue;
+ public boolean allNodes;
+
public String toString()
{
- return "Binding:" + System.identityHashCode(this) + " condition: " + condition + " queue: " + queue;
+ return "Binding:" + System.identityHashCode(this) + " condition: " + condition + " queue: " + queue +" allNodes: " + allNodes;
}
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -859,6 +859,13 @@
{
if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID + " numberToLoad:" + numberToLoad + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:" + nextPagingOrder); }
+ //Sanity
+
+ if (fromChannelID == toChannelID)
+ {
+ throw new IllegalArgumentException("Cannot merge queues - they have the same channel id!!");
+ }
+
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -183,7 +183,7 @@
* over to another node, but a queue with the same name already exists. In this case we merge the
* two queues.
*/
- public void mergeIn(long channelID) throws Exception
+ public void mergeIn(long theChannelID) throws Exception
{
if (trace) { log.trace("Merging queue " + channelID + " into " + this); }
@@ -192,7 +192,7 @@
flushDownCache();
PersistenceManager.InitialLoadInfo ili =
- pm.mergeAndLoad(channelID, channelID, fullSize - messageRefs.size(),
+ pm.mergeAndLoad(theChannelID, this.channelID, fullSize - messageRefs.size(),
firstPagingOrder, nextPagingOrder);
if (trace) { log.trace("Loaded " + ili.getRefInfos().size() + " refs"); }
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -66,12 +66,14 @@
private boolean preserveOrdering;
+ private boolean allNodes;
+
MappingInfo()
{
}
MappingInfo(int nodeId, String queueName, String conditionText, String filterString,
- long channelId, boolean recoverable, boolean clustered)
+ long channelId, boolean recoverable, boolean clustered, boolean allNodes)
{
this.nodeId = nodeId;
@@ -86,13 +88,16 @@
this.recoverable = recoverable;
this.clustered = clustered;
+
+ this.allNodes = allNodes;
}
MappingInfo(int nodeId, String queueName, String conditionText, String filterString,
- long channelId, boolean recoverable, boolean clustered, int fullSize, int pageSize, int downCacheSize,
+ long channelId, boolean recoverable, boolean clustered, boolean allNodes,
+ int fullSize, int pageSize, int downCacheSize,
int maxSize, boolean preserveOrdering)
{
- this (nodeId, queueName, conditionText, filterString, channelId, recoverable, clustered);
+ this (nodeId, queueName, conditionText, filterString, channelId, recoverable, clustered, allNodes);
this.fullSize = fullSize;
@@ -123,6 +128,8 @@
clustered = in.readBoolean();
+ allNodes = in.readBoolean();
+
fullSize = in.readInt();
pageSize = in.readInt();
@@ -150,6 +157,8 @@
out.writeBoolean(clustered);
+ out.writeBoolean(allNodes);
+
out.writeInt(fullSize);
out.writeInt(pageSize);
@@ -195,6 +204,11 @@
{
return clustered;
}
+
+ boolean isAllNodes()
+ {
+ return allNodes;
+ }
int getFullSize()
{
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -199,6 +199,8 @@
private Map nodeIDAddressMap;
private Object waitForBindUnbindLock;
+
+ private Map loadedBindings;
// Constructors ---------------------------------------------------------------------------------
@@ -302,7 +304,9 @@
if (trace) { log.trace(this + " starting"); }
super.start();
-
+
+ loadedBindings = getBindingsFromStorage();
+
if (clustered)
{
groupMember.start();
@@ -327,7 +331,7 @@
//Now load the bindings for this node
- loadBindingsFromStorage();
+ loadBindings();
started = true;
@@ -390,7 +394,7 @@
{
internalAddBinding(binding, allNodes, true);
- if (allNodes)
+ if (allNodes && clustered && binding.queue.isClustered())
{
//Now we must wait for all the bindings to appear in state
//This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
@@ -401,9 +405,9 @@
public void removeBinding(String queueName, boolean allNodes) throws Throwable
{
- internalRemoveBinding(queueName, allNodes, true);
+ Binding binding = internalRemoveBinding(queueName, allNodes, true);
- if (allNodes)
+ if (binding != null && allNodes && clustered && binding.queue.isClustered())
{
//Now we must wait for all the bindings to be removed from state
//This is necessary since the second unbind in an all unbind is sent asynchronously to avoid deadlock
@@ -551,7 +555,7 @@
}
// GroupListener implementation -------------------------------------------------------------
-
+
public void setState(byte[] bytes) throws Exception
{
if (trace) { log.trace(this + " received state from group"); }
@@ -582,11 +586,41 @@
}
Queue queue = new MessagingQueue(mapping.getNodeId(), mapping.getQueueName(), mapping.getChannelId(),
- mapping.isRecoverable(), filter, mapping.isClustered());
+ mapping.isRecoverable(), filter, true);
Condition condition = conditionFactory.createCondition(mapping.getConditionText());
- addBindingInMemory(new Binding(condition, queue));
+ addBindingInMemory(new Binding(condition, queue, mapping.isAllNodes()));
+
+ if (mapping.isAllNodes())
+ {
+ // insert into db if not already there
+ if (!loadedBindings.containsKey(queue.getName()))
+ {
+ //Create a local binding too
+
+ long channelID = channelIDManager.getID();
+
+ Queue queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
+ mapping.isRecoverable(), mapping.getMaxSize(), filter,
+ mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
+ mapping.isPreserveOrdering());
+
+ Binding localBinding = new Binding(condition, queue2, true);
+
+ if (mapping.isRecoverable())
+ {
+ //We need to insert it into the database
+ if (trace) { log.trace(this + " got all binding in state for queue " + queue.getName() + " inserting it in DB"); }
+
+ insertBindingInStorage(condition, queue2, true);
+ }
+
+ // Add it to the loaded map
+
+ loadedBindings.put(mapping.getQueueName(), localBinding);
+ }
+ }
}
//Update the replicated data
@@ -607,30 +641,28 @@
try
{
- Iterator iter = mappings.entrySet().iterator();
-
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
-
- Condition condition = (Condition)entry.getKey();
-
- List queues = (List)entry.getValue();
-
- Iterator iter2 = queues.iterator();
-
- while (iter2.hasNext())
- {
- Queue queue = (Queue)iter2.next();
+ Iterator iter = nameMaps.values().iterator();
+
+ while (iter.hasNext())
+ {
+ Map map = (Map)iter.next();
+
+ Iterator iter2 = map.values().iterator();
+
+ while (iter2.hasNext())
+ {
+ Binding binding = (Binding)iter2.next();
+
+ Queue queue = binding.queue;
//We only get the clustered queues
if (queue.isClustered())
{
String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
- MappingInfo mapping = new MappingInfo(queue.getNodeID(), queue.getName(), condition.toText(),
+ MappingInfo mapping = new MappingInfo(queue.getNodeID(), queue.getName(), binding.condition.toText(),
filterString, queue.getChannelID(), queue.isRecoverable(),
- true);
+ true, binding.allNodes);
list.add(mapping);
}
}
@@ -749,7 +781,7 @@
Condition condition = conditionFactory.createCondition(mapping.getConditionText());
- addBindingInMemory(new Binding(condition, queue));
+ addBindingInMemory(new Binding(condition, queue, false));
if (allNodes)
{
@@ -769,7 +801,7 @@
mapping.isPreserveOrdering());
//We must cast back asynchronously to avoid deadlock
- boolean added = internalAddBinding(new Binding(condition, queue2), false, false);
+ boolean added = internalAddBinding(new Binding(condition, queue2, true), false, false);
if (added)
{
@@ -1259,6 +1291,7 @@
MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
queue.isRecoverable(), true,
+ allNodes,
queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
queue.getMaxSize(),
queue.isPreserveOrdering());
@@ -1304,7 +1337,7 @@
String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
- queue.isRecoverable(), true);
+ queue.isRecoverable(), true, allNodes);
UnbindRequest request = new UnbindRequest(info, allNodes);
@@ -1706,15 +1739,16 @@
groupMember.unicastData(request, address);
}
- private void loadBindingsFromStorage() throws Exception
+
+ private Map getBindingsFromStorage() throws Exception
{
- lock.writeLock().acquire();
-
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
+ Map bindings = new HashMap();
+
try
{
conn = ds.getConnection();
@@ -1760,24 +1794,12 @@
Condition condition = conditionFactory.createCondition(conditionText);
- addBindingInMemory(new Binding(condition, queue));
+ Binding binding = new Binding(condition, queue, allNodes);
- //Need to broadcast it too
- if (clustered && queue.isClustered())
- {
- String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
-
- MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
- queue.isRecoverable(), true,
- queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
- queue.getMaxSize(),
- queue.isPreserveOrdering());
-
- ClusterRequest request = new BindRequest(info, allNodes);
-
- groupMember.multicastControl(request, false);
- }
+ bindings.put(queueName, binding);
}
+
+ return bindings;
}
catch (Exception e)
{
@@ -1786,8 +1808,6 @@
}
finally
{
- lock.writeLock().release();
-
closeResultSet(rs);
closeStatement(ps);
@@ -1795,8 +1815,40 @@
closeConnection(conn);
wrap.end();
+ }
+ }
+
+ private void loadBindings() throws Exception
+ {
+ Iterator iter = loadedBindings.values().iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ addBindingInMemory(binding);
+
+ Queue queue = binding.queue;
+
+ //Need to broadcast it too
+ if (clustered && queue.isClustered())
+ {
+ String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
+
+ MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), binding.condition.toText(), filterString, queue.getChannelID(),
+ queue.isRecoverable(), true,
+ binding.allNodes,
+ queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
+ queue.getMaxSize(),
+ queue.isPreserveOrdering());
+
+ ClusterRequest request = new BindRequest(info, binding.allNodes);
+
+ groupMember.multicastControl(request, false);
+ }
}
}
+
private void insertBindingInStorage(Condition condition, Queue queue, boolean allNodes) throws Exception
{
@@ -1927,7 +1979,7 @@
if (queue.getNodeID() == nodeToRemove.intValue())
{
- toRemove.add(new Binding(condition, queue));
+ toRemove.add(new Binding(condition, queue, false));
}
}
}
@@ -2087,36 +2139,32 @@
// Need to lock
lock.writeLock().acquire();
-
+
try
{
- Iterator iter = mappings.entrySet().iterator();
-
+ Map nameMap = (Map)this.nameMaps.get(failedNodeID);
+
List toRemove = new ArrayList();
- while (iter.hasNext())
+ if (nameMap != null)
{
- Map.Entry entry = (Map.Entry)iter.next();
+ Iterator iter = nameMap.values().iterator();
- Condition condition = (Condition)entry.getKey();
-
- List queues = (List)entry.getValue();
-
- Iterator iter2 = queues.iterator();
-
- while (iter2.hasNext())
+ while (iter.hasNext())
{
- Queue queue = (Queue)iter2.next();
+ Binding binding = (Binding)iter.next();
+ Queue queue = binding.queue;
+
if (queue.isRecoverable() && queue.getNodeID() == failedNodeID.intValue())
{
- toRemove.add(new Binding(condition, queue));
- }
+ toRemove.add(binding);
+ }
}
}
-
- iter = toRemove.iterator();
-
+
+ Iterator iter = toRemove.iterator();
+
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
@@ -2129,17 +2177,19 @@
if (!queue.isRecoverable())
{
throw new IllegalStateException("Found non recoverable queue " +
- queue.getName() + "in map, these should have been removed!");
+ queue.getName() + " in map, these should have been removed!");
}
// Sanity check
if (!queue.isClustered())
{
- throw new IllegalStateException("Queue " + queue.getName() +
- " is not clustered!");
+ throw new IllegalStateException("Queue " + queue.getName() + " is not clustered!");
}
+
+ log.info("**** removing old queue with channel id " + queue.getChannelID());
- //Remove from the in-memory map
+ //Remove from the in-memory map - no need to broadcast anything - they will get removed from other nodes in memory
+ //maps when the other nodes detect failure
removeBindingInMemory(binding.queue.getNodeID(), binding.queue.getName());
//Delete from storage
@@ -2151,23 +2201,16 @@
// when the node crashes a view change will hit the other nodes and that will cause
// all binding data for that node to be removed anyway.
- Collection queues = getQueuesForCondition(condition, true);
+ //Find if there is a local queue with the same name
- Iterator iter2 = queues.iterator();
-
Queue localQueue = null;
- while (iter2.hasNext())
+ if (localNameMap != null)
{
- Queue q = (Queue)iter2.next();
+ Binding b = (Binding)localNameMap.get(queue.getName());
+ localQueue = b.queue;
- if (queue.getName().equals(q.getName()))
- {
- localQueue = q;
-
- break;
- }
-
+ log.info("Found a local queue with channel id " + localQueue.getChannelID());
}
if (localQueue != null)
@@ -2187,10 +2230,8 @@
Queue newQueue = new MessagingQueue(thisNodeID, queue.getName(), queue.getChannelID(), queue.isRecoverable(),
queue.getFilter(), true);
- addBinding(new Binding(condition, newQueue), false);
+ addBinding(new Binding(condition, newQueue, binding.allNodes), false);
- newQueue.deactivate();
-
newQueue.load();
//TODO - do we really want to activate ALL the queues - surely only the ones that correspond to deployed destinations??
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -197,14 +197,14 @@
Condition condition1 = new SimpleCondition("topic1");
- office1.addBinding(new Binding(condition1, queue1), false);
+ office1.addBinding(new Binding(condition1, queue1, false), false);
log.info("Added binding1");
Queue queue2 = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue2.activate();
- office1.addBinding(new Binding(condition1, queue2), false);
+ office1.addBinding(new Binding(condition1, queue2, false), false);
log.info("Added binding2");
@@ -227,7 +227,7 @@
Queue queue3 = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue3.activate();
- office2.addBinding(new Binding(condition1, queue3), false);
+ office2.addBinding(new Binding(condition1, queue3, false), false);
// Make sure both nodes pick it up
@@ -251,7 +251,7 @@
Queue queue4 = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue4.activate();
- office2.addBinding(new Binding(condition1, queue4), false);
+ office2.addBinding(new Binding(condition1, queue4, false), false);
// Make sure both nodes pick it up
@@ -308,7 +308,7 @@
Queue queue5 = new MessagingQueue(3, "sub5", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue5.activate();
- office3.addBinding(new Binding(condition1, queue5), false);
+ office3.addBinding(new Binding(condition1, queue5, false), false);
// Make sure all nodes pick it up
@@ -338,12 +338,12 @@
Queue queue6 = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, true, -1, null, true, false);
queue6.activate();
- office1.addBinding(new Binding(condition1, queue6), false);
+ office1.addBinding(new Binding(condition1, queue6, false), false);
Queue queue7 = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue7.activate();
- office1.addBinding(new Binding(condition1, queue7), false);
+ office1.addBinding(new Binding(condition1, queue7, false), false);
// Make sure all nodes pick them up
@@ -489,13 +489,13 @@
//Bind on different conditions
- office1.addBinding(new Binding(condition1, queue8), false);
+ office1.addBinding(new Binding(condition1, queue8, false), false);
- office2.addBinding(new Binding(condition1, queue9), false);
+ office2.addBinding(new Binding(condition1, queue9, false), false);
Condition condition2 = new SimpleCondition("topic2");
- office2.addBinding(new Binding(condition2, queue10), false);
+ office2.addBinding(new Binding(condition2, queue10, false), false);
queues = office1.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -523,9 +523,9 @@
Queue queue12 = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue12.activate();
- office1.addBinding(new Binding(condition1, queue11), false);
+ office1.addBinding(new Binding(condition1, queue11, false), false);
- office2.addBinding(new Binding(condition1, queue12), false);
+ office2.addBinding(new Binding(condition1, queue12, false), false);
queues = office1.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -594,7 +594,7 @@
Condition condition1 = new SimpleCondition("condition1");
- office1.addBinding(new Binding(condition1, queue1), true);
+ office1.addBinding(new Binding(condition1, queue1, false), true);
Collection queues = office1.getQueuesForCondition(condition1, false);
@@ -614,7 +614,7 @@
Queue queue2 = new MessagingQueue(2, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue2.activate();
- office2.addBinding(new Binding(condition1, queue2), true);
+ office2.addBinding(new Binding(condition1, queue2, false), true);
queues = office1.getQueuesForCondition(condition1, false);
@@ -883,19 +883,19 @@
Condition condition1 = new SimpleCondition("queue1");
- office1.addBinding(new Binding(condition1, queue1), false);
+ office1.addBinding(new Binding(condition1, queue1, false), false);
Queue queue2 = new MessagingQueue(2, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue2.activate();
- office2.addBinding(new Binding(condition1, queue2), false);
+ office2.addBinding(new Binding(condition1, queue2, false), false);
Queue queue3 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
queue3.activate();
try
{
- office1.addBinding(new Binding(condition1, queue3), false);
+ office1.addBinding(new Binding(condition1, queue3, false), false);
fail();
}
catch (Exception e)
@@ -908,7 +908,7 @@
try
{
- office2.addBinding(new Binding(condition1, queue4), false);
+ office2.addBinding(new Binding(condition1, queue4, false), false);
fail();
}
catch (Exception e)
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -85,12 +85,12 @@
Condition condition1 = new SimpleCondition("condition1");
- office1.addBinding(new Binding(condition1, queue1), false);
+ office1.addBinding(new Binding(condition1, queue1, false), false);
//Binding twice with the same name should fail
try
{
- office1.addBinding(new Binding(condition1, queue1), false);
+ office1.addBinding(new Binding(condition1, queue1, false), false);
fail();
}
catch (IllegalArgumentException e)
@@ -106,7 +106,7 @@
MessagingQueue queuexx =
new MessagingQueue(777, "durableQueue", channelIDManager.getID(), ms, pm, true, 1, null, false, false);
queuexx.activate();
- office1.addBinding(new Binding(condition1, queuexx), false);
+ office1.addBinding(new Binding(condition1, queuexx, false), false);
fail();
}
catch (IllegalArgumentException e)
@@ -122,7 +122,7 @@
Condition condition2 = new SimpleCondition("condition2");
- office1.addBinding(new Binding(condition2, queue2), false);
+ office1.addBinding(new Binding(condition2, queue2, false), false);
//Check they're there
@@ -222,44 +222,44 @@
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue1.activate();
- office.addBinding(new Binding(condition1, queue1), false);
+ office.addBinding(new Binding(condition1, queue1, false), false);
MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue2.activate();
- office.addBinding(new Binding(condition1, queue2), false);
+ office.addBinding(new Binding(condition1, queue2, false), false);
MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue3.activate();
- office.addBinding(new Binding(condition1, queue3), false);
+ office.addBinding(new Binding(condition1, queue3, false), false);
MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue4.activate();
- office.addBinding(new Binding(condition1, queue4), false);
+ office.addBinding(new Binding(condition1, queue4, false), false);
MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue5.activate();
Condition condition2 = new SimpleCondition("condition2");
- office.addBinding(new Binding(condition2, queue5), false);
+ office.addBinding(new Binding(condition2, queue5, false), false);
MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue6.activate();
- office.addBinding(new Binding(condition2, queue6), false);
+ office.addBinding(new Binding(condition2, queue6, false), false);
MessagingQueue queue7 = new MessagingQueue(1, "queue7", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue7.activate();
- office.addBinding(new Binding(condition2, queue7), false);
+ office.addBinding(new Binding(condition2, queue7, false), false);
MessagingQueue queue8 = new MessagingQueue(1, "queue8", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue8.activate();
- office.addBinding(new Binding(condition2, queue8), false);
+ office.addBinding(new Binding(condition2, queue8, false), false);
Collection queues = office.getQueuesForCondition(new SimpleCondition("dummy"), true);
assertNotNull(queues);
@@ -342,15 +342,15 @@
Condition condition1 = new SimpleCondition("condition1");
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
- office.addBinding(new Binding(condition1, queue1), false);
+ office.addBinding(new Binding(condition1, queue1, false), false);
MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
- office.addBinding(new Binding(condition1, queue2), false);
+ office.addBinding(new Binding(condition1, queue2, false), false);
Condition condition2 = new SimpleCondition("condition2");
MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
- office.addBinding(new Binding(condition2, queue3), false);
+ office.addBinding(new Binding(condition2, queue3, false), false);
Binding b1 = office.getBindingForQueueName("queue1");
assertNotNull(b1);
@@ -420,15 +420,15 @@
Condition condition1 = new SimpleCondition("condition1");
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
- office.addBinding(new Binding(condition1, queue1), false);
+ office.addBinding(new Binding(condition1, queue1, false), false);
MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
- office.addBinding(new Binding(condition1, queue2), false);
+ office.addBinding(new Binding(condition1, queue2, false), false);
Condition condition2 = new SimpleCondition("condition2");
MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
- office.addBinding(new Binding(condition2, queue3), false);
+ office.addBinding(new Binding(condition2, queue3, false), false);
Binding b1 = office.getBindingForChannelID(queue1.getChannelID());
assertNotNull(b1);
@@ -530,17 +530,17 @@
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue1.activate();
- postOffice.addBinding(new Binding(condition1, queue1), false);
+ postOffice.addBinding(new Binding(condition1, queue1, false), false);
MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue2.activate();
- postOffice.addBinding(new Binding(condition1, queue2), false);
+ postOffice.addBinding(new Binding(condition1, queue2, false), false);
MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue3.activate();
- postOffice.addBinding(new Binding(condition1, queue3), false);
+ postOffice.addBinding(new Binding(condition1, queue3, false), false);
MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue4.activate();
@@ -548,17 +548,17 @@
Condition condition2 = new SimpleCondition("topic2");
- postOffice.addBinding(new Binding(condition2, queue4), false);
+ postOffice.addBinding(new Binding(condition2, queue4, false), false);
MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false,-1, null, false, false);
queue5.activate();
- postOffice.addBinding(new Binding(condition2, queue5), false);
+ postOffice.addBinding(new Binding(condition2, queue5, false), false);
MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue6.activate();
- postOffice.addBinding(new Binding(condition2, queue6), false);
+ postOffice.addBinding(new Binding(condition2, queue6, false), false);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.getLocalDistributor().add(receiver1);
@@ -686,7 +686,7 @@
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue1.activate();
- postOffice.addBinding(new Binding(new SimpleCondition("condition1"), queue1), false);
+ postOffice.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
@@ -743,17 +743,17 @@
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, filter, false, false);
queue1.activate();
- postOffice.addBinding(new Binding(condition1, queue1), false);
+ postOffice.addBinding(new Binding(condition1, queue1, false), false);
MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue2.activate();
- postOffice.addBinding(new Binding(condition1, queue2), false);
+ postOffice.addBinding(new Binding(condition1, queue2, false), false);
MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue3.activate();
- postOffice.addBinding(new Binding(condition1, queue3), false);
+ postOffice.addBinding(new Binding(condition1, queue3, false), false);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.getLocalDistributor().add(receiver1);
@@ -848,34 +848,34 @@
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue1.activate();
- postOffice.addBinding(new Binding(condition1, queue1), false);
+ postOffice.addBinding(new Binding(condition1, queue1, false), false);
MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue2.activate();
- postOffice.addBinding(new Binding(condition1, queue2), false);
+ postOffice.addBinding(new Binding(condition1, queue2, false), false);
MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue3.activate();
- postOffice.addBinding(new Binding(condition1, queue3), false);
+ postOffice.addBinding(new Binding(condition1, queue3, false), false);
MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
queue4.activate();
Condition condition2 = new SimpleCondition("topic2");
- postOffice.addBinding(new Binding(condition2, queue4), false);
+ postOffice.addBinding(new Binding(condition2, queue4, false), false);
MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
queue5.activate();
- postOffice.addBinding(new Binding(condition2, queue5), false);
+ postOffice.addBinding(new Binding(condition2, queue5, false), false);
MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
queue6.activate();
- postOffice.addBinding(new Binding(condition2, queue6), false);
+ postOffice.addBinding(new Binding(condition2, queue6, false), false);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
queue1.getLocalDistributor().add(receiver1);
@@ -1026,12 +1026,12 @@
MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
queue1.activate();
- postOffice.addBinding(new Binding(condition1, queue1), false);
+ postOffice.addBinding(new Binding(condition1, queue1, false), false);
MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, true,-1, null, false, false);
queue2.activate();
- postOffice.addBinding(new Binding(condition1, queue2), false);
+ postOffice.addBinding(new Binding(condition1, queue2, false), false);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
queue1.getLocalDistributor().add(receiver1);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -491,6 +491,8 @@
MessageProducer producer0 = session0.createProducer(queue0);
+ log.info("sending messages on node 0");
+
for (int i = 0; i < messages0; i++)
{
producer0.send(session0.createTextMessage("message " + i));
@@ -503,6 +505,8 @@
//Send some more on node 1
+ log.info("Sending some messages on node 1");
+
Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer1 = session1.createProducer(queue1);
@@ -586,8 +590,7 @@
catch (Exception ignore)
{
}
-
-
+
if (conn0!=null)
{
conn0.close();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2007-06-28 14:06:25 UTC (rev 2814)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2007-06-28 20:11:38 UTC (rev 2815)
@@ -204,17 +204,17 @@
// wait for the client-side failover to complete
- while(true)
- {
- FailoverEvent event = failoverListener.getEvent(120000);
- if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
- {
- break;
- }
- if (event == null)
- {
- fail("Did not get expected FAILOVER_COMPLETED event");
- }
+ while (true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
}
// failover complete
More information about the jboss-cvs-commits
mailing list