[jboss-cvs] JBoss Messaging SVN: r3055 - in trunk: tests/src/org/jboss/test/messaging and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Aug 26 01:34:05 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-08-26 01:34:05 -0400 (Sun, 26 Aug 2007)
New Revision: 3055
Modified:
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1059 - The remove messages has to be done after the Suckers are closed, or we would get a failure
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-26 05:31:52 UTC (rev 3054)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-26 05:34:05 UTC (rev 3055)
@@ -1735,7 +1735,7 @@
{
try
{
- if (trace) { log.trace(this + " waiting for bind unbind lock"); }
+ if (trace) { log.trace(this + " waiting for bind unbind lock, timeout=" + groupMember.getCastTimeout()); }
waitForBindUnbindLock.wait(groupMember.getCastTimeout());
@@ -1827,39 +1827,40 @@
}
Binding removed = removeBindingInMemory(thisNodeID, queueName);
-
+
//The queue might not be removed (it's already removed) if two unbind all requests are sent simultaneously on the cluster
if (removed != null)
- {
+ {
Queue queue = removed.queue;
-
+
Condition condition = removed.condition;
-
+
if (queue.isRecoverable())
{
//Need to remove from db too
-
+
deleteBindingFromStorage(queue);
}
-
- queue.removeAllReferences();
-
+
if (clustered && queue.isClustered())
{
- String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
-
+ String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
+
MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
queue.isRecoverable(), true, allNodes);
-
+
UnbindRequest request = new UnbindRequest(info, allNodes);
-
+
groupMember.multicastControl(request, sync);
}
+
+ queue.removeAllReferences();
+
}
-
+
return removed;
}
-
+
private synchronized void calculateFailoverMap()
{
failoverMap.clear();
@@ -2192,7 +2193,32 @@
return routed;
}
-
+
+ private Binding lookupBinding(int nodeID, String queueName) throws Exception
+ {
+ lock.readLock().acquire();
+
+ try
+ {
+ Integer nid = new Integer(nodeID);
+
+ Map nameMap = (Map)nameMaps.get(nid);
+
+ if (nameMap == null)
+ {
+ return null;
+ }
+
+ return (Binding)nameMap.get(queueName);
+
+ }
+ finally
+ {
+ lock.readLock().acquire();
+ }
+
+ }
+
private Binding removeBindingInMemory(int nodeID, String queueName) throws Exception
{
lock.writeLock().acquire();
Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-08-26 05:31:52 UTC (rev 3054)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-08-26 05:34:05 UTC (rev 3055)
@@ -158,25 +158,37 @@
assertEquals(0, messageCount.intValue());
}
-
+
protected void checkNoSubscriptions(Topic topic) throws Exception
{
- ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
-
- Integer messageCount = (Integer)ServerManagement.getAttribute(destObjectName, "AllSubscriptionsCount");
-
- assertEquals(0, messageCount.intValue());
+ Integer messageCount = getNoSubscriptions(topic);
+
+ assertEquals(0, messageCount.intValue());
}
-
+
+
protected void checkNoSubscriptions(Topic topic, int server) throws Exception
{
- ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
-
- Integer messageCount = (Integer)ServerManagement.getServer(server).getAttribute(destObjectName, "AllSubscriptionsCount");
+ Integer messageCount = getNoSubscriptions(topic, server);
assertEquals(0, messageCount.intValue());
}
-
+
+ protected int getNoSubscriptions(Topic topic)
+ throws Exception
+ {
+ return getNoSubscriptions(topic,0);
+ }
+
+ protected int getNoSubscriptions(Topic topic, int server)
+ throws Exception
+ {
+ ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
+
+ Integer messageCount = (Integer) ServerManagement.getAttribute(server, destObjectName, "AllSubscriptionsCount");
+ return messageCount.intValue();
+ }
+
protected boolean assertRemainingMessages(int expected) throws Exception
{
ObjectName destObjectName =
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2007-08-26 05:31:52 UTC (rev 3054)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2007-08-26 05:34:05 UTC (rev 3055)
@@ -32,6 +32,7 @@
import javax.jms.Topic;
import org.jboss.test.messaging.tools.ServerManagement;
+import java.util.ArrayList;
/**
@@ -131,6 +132,110 @@
{
nonClusteredTopicDurable(true);
}
+
+ public void testFloodSubscriptions() throws Exception
+ {
+ Connection conn0 = this.createConnectionOnServer(cf, 0);
+ Connection conn1 = this.createConnectionOnServer(cf, 1);
+ Connection conn2 = this.createConnectionOnServer(cf, 2);
+
+ try
+ {
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ conn0.setClientID("c1");
+ conn1.setClientID("c1");
+ conn2.setClientID("c1");
+
+ Session session [] = new Session[3];
+ session[0] = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session[1] = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session[2] = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn0.start();
+ conn1.start();
+ conn2.start();
+
+ ArrayList<MessageConsumer> consumersArray[] = new ArrayList[3];
+
+
+ MessageProducer prod = session[2].createProducer(topic[0]);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ int NUMBER_OF_MESSAGES = 10;
+ int NUMBER_OF_SUBSCRIPTIONS = 50;
+
+ int sessId=0;
+ for (Session sess : session)
+ {
+ consumersArray[sessId] = new ArrayList<MessageConsumer>();
+ for (int i=0;i<NUMBER_OF_SUBSCRIPTIONS;i++)
+ {
+ MessageConsumer consumer = sess.createDurableSubscriber(topic[sessId], "sess_" + sessId + "_" + i);
+ consumersArray[sessId].add(consumer);
+ }
+
+ sessId++;
+ }
+
+
+ for (int i=0;i<NUMBER_OF_MESSAGES;i++)
+ {
+ log.info("Sending message " + i);
+ prod.send(session[0].createTextMessage("test" + i));
+ }
+
+
+ assertEquals(NUMBER_OF_SUBSCRIPTIONS * 3, getNoSubscriptions(topic[0]));
+
+ int messageRead = 0;
+
+ for (ArrayList<MessageConsumer> consumers: consumersArray)
+ {
+ for (MessageConsumer consumer: consumers)
+ {
+ TextMessage msg = null;
+ for (int i=0;i<NUMBER_OF_MESSAGES;i++)
+ {
+ msg = (TextMessage)consumer.receive(5000);
+ assertNotNull(msg);
+ log.info("Msg:" + msg + " text - " + msg.getText());
+ assertEquals("test" + i, msg.getText());
+ messageRead ++;
+ }
+ }
+ }
+
+ assertEquals(NUMBER_OF_SUBSCRIPTIONS * NUMBER_OF_MESSAGES * 3, messageRead);
+
+ MessageProducer prod1 = session[1].createProducer(topic[0]);
+
+ for (ArrayList<MessageConsumer> consumers: consumersArray)
+ {
+ for (MessageConsumer consumer: consumers)
+ {
+ consumer.close();
+ }
+ }
+
+ for (sessId = 0; sessId < 3; sessId++)
+ {
+ for (int i=0;i<NUMBER_OF_SUBSCRIPTIONS;i++)
+ {
+ session[sessId].unsubscribe("sess_" + sessId + "_" + i);
+ }
+ }
+
+ checkNoSubscriptions(topic[0]);
+
+ }
+ finally
+ {
+ try { if (conn0 != null) conn0.close(); } catch (Exception ignored){}
+ try { if (conn1 != null) conn1.close(); } catch (Exception ignored){}
+ try { if (conn2 != null) conn2.close(); } catch (Exception ignored){}
+ }
+ }
// Package protected ---------------------------------------------
@@ -597,7 +702,7 @@
//close beta
beta.close();
-
+
// Create another beta - this one node 0
MessageConsumer beta0 = sess0.createDurableSubscriber(topic[0], "beta");
@@ -677,19 +782,19 @@
beta0.close();
beta1.close();
-
+
alpha.close();
beta.close();
gamma.close();
delta.close();
epsilon.close();
-
+
sess0.unsubscribe("alpha");
sess1.unsubscribe("beta");
sess2.unsubscribe("gamma");
sess0.unsubscribe("delta");
sess1.unsubscribe("epsilon");
-
+
}
finally
{
More information about the jboss-cvs-commits
mailing list