[jboss-cvs] JBoss Messaging SVN: r8426 - in branches/Branch_1_4: src/main/org/jboss/jms/server/endpoint and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 25 19:51:25 EDT 2011
Author: gaohoward
Date: 2011-08-25 19:51:25 -0400 (Thu, 25 Aug 2011)
New Revision: 8426
Modified:
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
Log:
JBMESSAGING-1844
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-08-25 23:51:25 UTC (rev 8426)
@@ -212,12 +212,19 @@
SessionDelegate connectionConsumerSession,
boolean shouldAck)
throws JMSException
- {
- if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+ {
+ if (isSucker)
{
- //Message has been cancelled
- return;
+ log.trace("We don't check expiry and max delivery for a sucker");
}
+ else
+ {
+ if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+ {
+ //Message has been cancelled
+ return;
+ }
+ }
DeliveryInfo deliveryInfo =
new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
@@ -324,6 +331,8 @@
//JBMESSAGING-1876
private long minTimeoutProcessTime;
+ private boolean isSucker = false;
+
public int getBufferSize()
{
return buffer.size();
@@ -1313,6 +1322,12 @@
mainLock.notifyAll();
}
}
+
+ public void setSucker(boolean b)
+ {
+ isSucker = b;
+ }
+
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2011-08-25 23:51:25 UTC (rev 8426)
@@ -232,8 +232,8 @@
return null;
}
-
- if (ref.getMessage().isExpired())
+
+ if ((!remote) && ref.getMessage().isExpired())
{
SimpleDelivery delivery = new SimpleDelivery(observer, ref, true, false);
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2011-08-25 23:51:25 UTC (rev 8426)
@@ -131,6 +131,8 @@
consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+
+ clientConsumer.setSucker(true);
consumer.setMessageListener(this);
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java 2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java 2011-08-25 23:51:25 UTC (rev 8426)
@@ -64,6 +64,7 @@
protected static InitialContext[] ic;
protected static Queue queue[];
+ protected static Queue defaultExpiryQueue[];
protected static Topic topic[];
// No need to have multiple connection factories since a clustered connection factory will create
@@ -76,6 +77,8 @@
protected Long overrideNodeStateRefreshInterval = 500l;
protected boolean withNewFailoverModel = false;
+
+ protected boolean deployDefaultExpiryQueue = false;
protected static ServiceAttributeOverrides currentOverrides;
@@ -132,6 +135,8 @@
queue = new Queue[nodeCount];
topic = new Topic[nodeCount];
+
+ defaultExpiryQueue = new Queue[nodeCount];
for (int i = 0; i < nodeCount; i++)
{
@@ -150,6 +155,11 @@
queue[i] = (Queue)ic[i].lookup("queue/testDistributedQueue");
topic[i] = (Topic)ic[i].lookup("topic/testDistributedTopic");
+
+ if (this.deployDefaultExpiryQueue)
+ {
+ defaultExpiryQueue[i] = (Queue)ic[i].lookup("queue/ExpiryQueue");
+ }
}
cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
@@ -239,6 +249,11 @@
ServerManagement.start(serverNumber, config, attributes, cleanDatabase);
log.info("deploying queue on node " + serverNumber);
+
+ if (deployDefaultExpiryQueue)
+ {
+ ServerManagement.deployQueue("ExpiryQueue", serverNumber);
+ }
ServerManagement.deployQueue("testDistributedQueue", serverNumber);
ServerManagement.deployTopic("testDistributedTopic", serverNumber);
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2011-08-25 23:51:25 UTC (rev 8426)
@@ -387,7 +387,64 @@
}
}
}
+
+ //JBMESSAGING-1844
+ public void testSuckExpired() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ try
+ {
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //send 20 messages
+ final int NUM_MESSAGES = 20;
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("expired-message" + i);
+ prod0.setTimeToLive(500);
+ prod0.send(tm);
+ }
+
+ Thread.sleep(1000);
+
+ //now messages are expired. if we receive from node1
+ //messages should be put to its expiry queue.
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ conn1.start();
+
+ Message m = cons1.receive(1000);
+ assertNull(m);
+
+ //now receive it from its expiry queue
+ MessageConsumer cons2 = sess1.createConsumer(defaultExpiryQueue[1]);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message rm = cons2.receive(5000);
+ assertNotNull(rm);
+ }
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
//https://jira.jboss.org/browse/JBMESSAGING-1822
//send 1000 messages to node 0, receive on node 1 as such
// 1. when receiving, kill node1
@@ -522,7 +579,7 @@
protected void setUp() throws Exception
{
nodeCount = 3;
-
+ deployDefaultExpiryQueue = true;
super.setUp();
}
More information about the jboss-cvs-commits
mailing list