[jboss-cvs] JBoss Messaging SVN: r2818 - in trunk: src/main/org/jboss/jms/client/delegate and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 29 07:40:33 EDT 2007
Author: timfox
Date: 2007-06-29 07:40:33 -0400 (Fri, 29 Jun 2007)
New Revision: 2818
Modified:
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1005
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -210,6 +210,7 @@
private boolean waitingForLastDelivery;
private boolean shouldAck;
private boolean handleFlowControl;
+ private long redeliveryDelay;
// Constructors ---------------------------------------------------------------------------------
@@ -218,7 +219,8 @@
SessionDelegate sess, ConsumerDelegate cons, int consumerID,
String queueName,
int bufferSize, QueuedExecutor sessionExecutor,
- int maxDeliveries, boolean shouldAck, boolean handleFlowControl)
+ int maxDeliveries, boolean shouldAck, boolean handleFlowControl,
+ long redeliveryDelay)
{
if (bufferSize < 1)
{
@@ -239,6 +241,7 @@
this.maxDeliveries = maxDeliveries;
this.shouldAck = shouldAck;
this.handleFlowControl = handleFlowControl;
+ this.redeliveryDelay = redeliveryDelay;
}
// Public ---------------------------------------------------------------------------------------
@@ -546,6 +549,12 @@
serverSending = true;
}
+ public long getRedeliveryDelay()
+ {
+ return redeliveryDelay;
+ }
+
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -79,6 +79,7 @@
int prefetchSize = consumerState.getBufferSize();
QueuedExecutor sessionExecutor = sessionState.getExecutor();
int maxDeliveries = consumerState.getMaxDeliveries();
+ long redeliveryDelay = consumerState.getRedeliveryDelay();
//We need the queue name for recovering any deliveries after failover
String queueName = null;
@@ -104,7 +105,7 @@
new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
sessionDelegate, consumerDelegate, consumerID, queueName,
prefetchSize, sessionExecutor, maxDeliveries, consumerState.isShouldAck(),
- autoFlowControl);
+ autoFlowControl, redeliveryDelay);
sessionState.addCallbackHandler(messageHandler);
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -510,15 +510,19 @@
MessageProxy proxy = info.getMessageProxy();
ClientConsumer handler = state.getCallbackHandler(info.getConsumerId());
-
+
if (handler == null)
{
// This is ok. The original consumer has closed, so we cancel the message
- //FIXME - this needs to be done atomically for all cancels
-
cancelDelivery(del, info);
}
+ else if (handler.getRedeliveryDelay() != 0)
+ {
+ //We have a redelivery delay in action - all delayed redeliveries are handled on the server
+
+ cancelDelivery(del, info);
+ }
else
{
if (trace) { log.trace("Adding proxy back to front of buffer"); }
Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -160,11 +160,12 @@
int consumerID = consumerDelegate.getID();
int bufferSize = consumerDelegate.getBufferSize();
int maxDeliveries = consumerDelegate.getMaxDeliveries();
+ long redeliveryDelay = consumerDelegate.getRedeliveryDelay();
ConsumerState consumerState =
new ConsumerState(sessionState, consumerDelegate, dest, selector, noLocal,
subscriptionName, consumerID, connectionConsumer, bufferSize,
- maxDeliveries);
+ maxDeliveries, redeliveryDelay);
delegate.setState(consumerState);
return consumerDelegate;
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -60,16 +60,18 @@
private int bufferSize;
private int maxDeliveries;
+ private long redeliveryDelay;
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
- public ClientConsumerDelegate(int objectID, int bufferSize, int maxDeliveries)
+ public ClientConsumerDelegate(int objectID, int bufferSize, int maxDeliveries, long redeliveryDelay)
{
super(objectID);
this.bufferSize = bufferSize;
this.maxDeliveries = maxDeliveries;
+ this.redeliveryDelay = redeliveryDelay;
}
public ClientConsumerDelegate()
@@ -197,6 +199,8 @@
bufferSize = in.readInt();
maxDeliveries = in.readInt();
+
+ redeliveryDelay = in.readLong();
}
public void write(DataOutputStream out) throws Exception
@@ -206,6 +210,8 @@
out.writeInt(bufferSize);
out.writeInt(maxDeliveries);
+
+ out.writeLong(redeliveryDelay);
}
// Public ---------------------------------------------------------------------------------------
@@ -224,6 +230,11 @@
{
return maxDeliveries;
}
+
+ public long getRedeliveryDelay()
+ {
+ return redeliveryDelay;
+ }
// Protected ------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -59,6 +59,7 @@
private ClientConsumer clientConsumer;
private int bufferSize;
private int maxDeliveries;
+ private long redeliveryDelay;
private boolean storingDeliveries;
@@ -69,7 +70,7 @@
public ConsumerState(SessionState parent, ConsumerDelegate delegate, JBossDestination dest,
String selector, boolean noLocal, String subscriptionName, int consumerID,
- boolean isCC, int bufferSize, int maxDeliveries)
+ boolean isCC, int bufferSize, int maxDeliveries, long redeliveryDelay)
{
super(parent, (DelegateSupport)delegate);
children = Collections.EMPTY_SET;
@@ -81,6 +82,7 @@
this.bufferSize = bufferSize;
this.subscriptionName=subscriptionName;
this.maxDeliveries = maxDeliveries;
+ this.redeliveryDelay = redeliveryDelay;
//We don't store deliveries if this a non durable subscriber
@@ -211,9 +213,13 @@
//If e are a non durable subscriber to a topic then there is no need
//to send acks to the server - we wouldn't have stored them on the server side anyway
- return !(destination.isTopic() && subscriptionName == null);
-
+ return !(destination.isTopic() && subscriptionName == null);
}
+
+ public long getRedeliveryDelay()
+ {
+ return redeliveryDelay;
+ }
// Package protected ----------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -83,7 +83,7 @@
protected ManagedQueue expiryQueue;
- protected long redeliveryDelay;
+ protected long redeliveryDelay = -1;
protected int maxSize = -1;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -145,6 +145,7 @@
private PostOffice postOffice;
private int nodeId;
private int defaultMaxDeliveryAttempts;
+ private long defaultRedeliveryDelay;
private Queue defaultDLQ;
private Queue defaultExpiryQueue;
@@ -182,6 +183,7 @@
defaultExpiryQueue = sp.getDefaultExpiryQueueInstance();
tr = sp.getTxRepository();
defaultMaxDeliveryAttempts = sp.getDefaultMaxDeliveryAttempts();
+ defaultRedeliveryDelay = sp.getDefaultRedeliveryDelay();
deliveries = new ConcurrentHashMap();
@@ -1172,10 +1174,12 @@
JBossDestination dest = new JBossQueue(queueName);
+ //We don't care about redelivery delays and number of attempts for a direct consumer
+
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, binding.queue,
binding.queue.getName(), this, selectorString, false,
- dest, null, null, sp.getDefaultRedeliveryDelay(), defaultMaxDeliveryAttempts, true);
+ dest, null, null, 0, 0, true);
ConsumerAdvised advised;
@@ -1189,7 +1193,7 @@
Dispatcher.instance.registerTarget(consumerID, advised);
ClientConsumerDelegate stub =
- new ClientConsumerDelegate(consumerID, prefetchSize, defaultMaxDeliveryAttempts);
+ new ClientConsumerDelegate(consumerID, prefetchSize, 0, 0);
synchronized (consumers)
{
@@ -1465,17 +1469,12 @@
int maxDeliveryAttemptsToUse = mDest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : mDest.getMaxDeliveryAttempts();
- long redeliveryDelay = mDest.getRedeliveryDelay();
+ long redeliveryDelayToUse = mDest.getRedeliveryDelay() == -1 ? defaultRedeliveryDelay : mDest.getRedeliveryDelay();
- if (redeliveryDelay == 0)
- {
- redeliveryDelay = sp.getDefaultRedeliveryDelay();
- }
-
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, queue,
queue.getName(), this, selectorString, noLocal,
- jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelay, maxDeliveryAttemptsToUse, false);
+ jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelayToUse, maxDeliveryAttemptsToUse, false);
if (queue.isClustered() && postOffice.isClustered() && jmsDestination.isTopic() && subscriptionName != null)
{
@@ -1503,8 +1502,7 @@
Dispatcher.instance.registerTarget(consumerID, advised);
ClientConsumerDelegate stub =
- new ClientConsumerDelegate(consumerID,
- prefetchSize, maxDeliveryAttemptsToUse);
+ new ClientConsumerDelegate(consumerID, prefetchSize, maxDeliveryAttemptsToUse, redeliveryDelayToUse);
synchronized (consumers)
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -236,19 +236,108 @@
public void testDelayedRedeliveryDefault() throws Exception
{
ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ try
+ {
+ ObjectName queueObjectName = new ObjectName("jboss.messaging.destination:service=Queue,name=Queue");
+
+ ServerManagement.setAttribute(queueObjectName, "RedeliveryDelay", "-1");
+
+ long delay = 3000;
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", String.valueOf(delay));
+
+ this.delayedRedeliveryDefaultOnClose(delay);
+
+ this.delayedRedeliveryDefaultOnRollback(delay);
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", "0");
+ }
+ }
+
+ public void testDelayedRedeliveryOverride() throws Exception
+ {
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ObjectName queueObjectName = new ObjectName("jboss.messaging.destination:service=Queue,name=Queue");
+
+ try
+ {
+ long delay = 6000;
+
+ ServerManagement.setAttribute(queueObjectName, "RedeliveryDelay", String.valueOf(delay));
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", "3000");
+
+ this.delayedRedeliveryDefaultOnClose(delay);
+
+ this.delayedRedeliveryDefaultOnRollback(delay);
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", "0");
+
+ ServerManagement.setAttribute(queueObjectName, "RedeliveryDelay", "-1");
+ }
+ }
- String queueObjectName = "jboss.messaging.destination:service=Queue,name=Queue";
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ ServerManagement.undeployQueue("Queue");
+ ServerManagement.undeployTopic("Topic");
+
+ ServerManagement.deployQueue("Queue");
+
+ ServerManagement.deployTopic("Topic");
+
+ queue = (Queue)ic.lookup("/queue/Queue");
+
+ topic = (Topic)ic.lookup("/topic/Topic");
+
+ this.drainDestination(cf, queue);
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ ServerManagement.undeployQueue("Queue");
+
+ ServerManagement.undeployTopic("Topic");
+
+ // Some tests here are changing this attribute.. what would affect tests later
+ // Instead of restart the ServerPeer I'm just restoring the default
+ ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(),
+ "DefaultRedeliveryDelay", "0");
+
+ if (ic != null) ic.close();
+ }
+
+ // Private -------------------------------------------------------
+
+ private void delayedRedeliveryDefaultOnClose(long delay) throws Exception
+ {
Connection conn = null;
try
- {
- ServerManagement.setAttribute(new ObjectName(queueObjectName), "RedeliveryDelay", String.valueOf(0));
-
- final long delay = 3000;
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", String.valueOf(delay));
-
+ {
conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -280,7 +369,7 @@
assertNotNull(tm);
- log.info("Got message:" + tm.getText());
+ log.info("Got message:" + tm.getText());
assertEquals("message" + i, tm.getText());
}
@@ -321,147 +410,86 @@
if (conn != null)
{
conn.close();
- }
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", "0");
-
+ }
}
}
- public void testDelayedRedeliveryOverride() throws Exception
+ private void delayedRedeliveryDefaultOnRollback(long delay) throws Exception
{
- ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- String queueObjectName = "jboss.messaging.destination:service=Queue,name=Queue";
-
+ Connection conn = null;
- Connection conn = null;
-
- try
- {
- final long delay = 3000;
-
- ServerManagement.setAttribute(new ObjectName(queueObjectName), "RedeliveryDelay", String.valueOf(delay));
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", String.valueOf(delay * 3));
-
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue);
-
- final int NUM_MESSAGES = 5;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons = sess2.createConsumer(queue);
-
- conn.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(500);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- //Now close the session
- //This should cancel back to the queue with a delayed redelivery
-
- long now = System.currentTimeMillis();
-
- sess2.close();
-
- Session sess3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons2 = sess3.createConsumer(queue);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(delay + 1000);
-
- long time = System.currentTimeMillis();
-
- assertTrue(time - now >= delay);
- assertTrue(time - now < delay + 1000);
-
- assertNotNull(tm);
- }
-
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNull(tm);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultRedeliveryDelay", "0");
-
- }
- }
+ try
+ {
+ conn = cf.createConnection();
-
- // Package protected ---------------------------------------------
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Protected -----------------------------------------------------
+ MessageProducer prod = sess.createProducer(queue);
- protected void setUp() throws Exception
- {
- super.setUp();
+ final int NUM_MESSAGES = 5;
- ServerManagement.start("all");
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message" + i);
- ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+ prod.send(tm);
+ }
- cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ log.info("Sent messages");
- ServerManagement.undeployQueue("Queue");
-
- ServerManagement.undeployTopic("Topic");
-
- ServerManagement.deployQueue("Queue");
-
- ServerManagement.deployTopic("Topic");
+ Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
- queue = (Queue)ic.lookup("/queue/Queue");
-
- topic = (Topic)ic.lookup("/topic/Topic");
-
- this.drainDestination(cf, queue);
+ MessageConsumer cons = sess2.createConsumer(queue);
- }
+ conn.start();
- protected void tearDown() throws Exception
- {
- super.tearDown();
+ log.info("Started connection");
- ServerManagement.undeployQueue("Queue");
-
- ServerManagement.undeployTopic("Topic");
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(500);
- // Some tests here are changing this attribute.. what would affect tests later
- // Instead of restart the ServerPeer I'm just restoring the default
- ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(),
- "DefaultRedeliveryDelay", "0");
+ assertNotNull(tm);
- if (ic != null) ic.close();
+ log.info("Got message:" + tm.getText());
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Now rollback
+
+ sess2.rollback();
+
+ //This should redeliver with a delayed redelivery
+
+ long now = System.currentTimeMillis();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(delay + 1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message 2nd time: " + tm.getText());
+
+ long time = System.currentTimeMillis();
+
+ assertTrue(time - now >= delay);
+ assertTrue(time - now < delay + 1000);
+ }
+
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNull(tm);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
-
- // Private -------------------------------------------------------
private void scheduledDelivery(boolean tx) throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-06-29 10:42:18 UTC (rev 2817)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-06-29 11:40:33 UTC (rev 2818)
@@ -781,7 +781,7 @@
public void testSessionCreateConsumerDelegateResponse() throws Exception
{
- ClientConsumerDelegate del = new ClientConsumerDelegate(786, 13123, 213);
+ ClientConsumerDelegate del = new ClientConsumerDelegate(786, 13123, 213, 0);
ResponseSupport resp =
new SessionCreateConsumerDelegateResponse(del);
More information about the jboss-cvs-commits
mailing list