[jboss-cvs] JBoss Messaging SVN: r3219 - in trunk: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Oct 20 06:05:22 EDT 2007
Author: timfox
Date: 2007-10-20 06:05:22 -0400 (Sat, 20 Oct 2007)
New Revision: 3219
Modified:
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
Log:
Reverted fix for http://jira.jboss.com/jira/browse/JBMESSAGING-1099
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-10-20 09:17:22 UTC (rev 3218)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-10-20 10:05:22 UTC (rev 3219)
@@ -125,46 +125,34 @@
}
public Object handleClosing(Invocation invocation) throws Throwable
- {
+ {
ConsumerState consumerState = getState(invocation);
- try
- {
+
+ // We make sure closing is called on the ServerConsumerEndpoint.
+ // This returns us the last delivery id sent
+
+ Long l = (Long)invocation.invokeNext();
+
+ long lastDeliveryId = l.longValue();
+
+ // First we call close on the ClientConsumer which waits for onMessage invocations
+ // to complete and the last delivery to arrive
+ consumerState.getClientConsumer().close(lastDeliveryId);
+
+
+ SessionState sessionState = (SessionState)consumerState.getParent();
+ ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+
+ sessionState.removeCallbackHandler(consumerState.getClientConsumer());
- // We make sure closing is called on the ServerConsumerEndpoint.
- // This returns us the last delivery id sent
-
- Long l = (Long)invocation.invokeNext();
-
- long lastDeliveryId = l.longValue();
-
- // First we call close on the ClientConsumer which waits for onMessage invocations
- // to complete and the last delivery to arrive
- consumerState.getClientConsumer().close(lastDeliveryId);
-
- SessionState sessionState = (SessionState)consumerState.getParent();
- ConnectionState connectionState = (ConnectionState)sessionState.getParent();
-
- sessionState.removeCallbackHandler(consumerState.getClientConsumer());
-
- CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
- cm.unregisterHandler(consumerState.getConsumerID());
-
- //And then we cancel any messages still in the message callback handler buffer
- consumerState.getClientConsumer().cancelBuffer();
-
- return l;
- }
- finally
- {
- // If this method fails before the call to clientConsumer.close,
- // we need to ensure the method will close the consumer, otherwise the server
- // would hang during a shutdown
- if (!consumerState.getClientConsumer().isClosed())
- {
- consumerState.getClientConsumer().close(-1);
- }
- }
- }
+ CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+ cm.unregisterHandler(consumerState.getConsumerID());
+
+ //And then we cancel any messages still in the message callback handler buffer
+ consumerState.getClientConsumer().cancelBuffer();
+
+ return l;
+ }
public Object handleReceive(Invocation invocation) throws Throwable
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-10-20 09:17:22 UTC (rev 3218)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-10-20 10:05:22 UTC (rev 3219)
@@ -150,8 +150,7 @@
try
{
-
- // Objects Server0
+ //node 0
conn0 = createConnectionOnServer(cf, 0);
assertEquals(0, getServerId(conn0));
@@ -166,7 +165,8 @@
MessageConsumer consumer0 = session0.createConsumer(queue[0]);
- for (int i=0; i < 10; i++)
+ //Send messages 0 - 9 on node 0
+ for (int i= 0; i < 10; i++)
{
producer0.send(session0.createTextMessage("message " + i));
}
@@ -175,6 +175,8 @@
TextMessage msg;
+
+ //Consume messages 0 - 4 on node node 0 transactionally
for (int i = 0; i < 5; i++)
{
msg = (TextMessage)consumer0.receive(5000);
@@ -183,12 +185,13 @@
assertEquals("message " + i, msg.getText());
}
+ //commit the session
session0.commit();
consumer0.close();
- log.info("** sent first five on node0");
-
- // Objects Server1
+ log.info("Consumed messages 0 - 4 on node 0");
+
+ // node 1
conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -201,6 +204,7 @@
producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
+ //Send messages 10 - 19 on node 1
for (int i = 10; i < 20; i++)
{
producer1.send(session0.createTextMessage("message " + i));
@@ -208,15 +212,21 @@
session1.commit();
- log.info("Sent next 15 on node 1");
+ log.info("Sent messages 10 - 19 on node 1");
+
+ //At this point we should have messages 5 - 9 sitting in queue on node 0
+ //and messages 10 - 19 sitting in queue on node 1
- // creates another consumer... before killing the server
+ //Create a consumer on node 1
+ log.info("Creating consumer on node 1");
+
// This will actually end up sucking messages from node 0
MessageConsumer consumer1 = session1.createConsumer(queue[1]);
//Give it enough time to suck
+ log.info("Waiting for suck");
Thread.sleep(5000);
log.info("Killing node1");
More information about the jboss-cvs-commits
mailing list