[jboss-cvs] JBoss Messaging SVN: r2809 - in trunk: src/main/org/jboss/jms/server/endpoint and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 27 18:54:43 EDT 2007
Author: timfox
Date: 2007-06-27 18:54:42 -0400 (Wed, 27 Jun 2007)
New Revision: 2809
Modified:
trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
Log:
a few fixes
Modified: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-06-27 22:54:42 UTC (rev 2809)
@@ -65,16 +65,21 @@
ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)message;
- Object d = getState().getClusteredConnectionFactoryDelegate();
+ ConnectionState state = getState();
+
+ if (state != null)
+ {
+ Object d = state.getClusteredConnectionFactoryDelegate();
+
+ if (d instanceof ClientClusteredConnectionFactoryDelegate)
+ {
+ ClientClusteredConnectionFactoryDelegate clusteredDelegate =
+ (ClientClusteredConnectionFactoryDelegate)d;
- if (d instanceof ClientClusteredConnectionFactoryDelegate)
- {
- ClientClusteredConnectionFactoryDelegate clusteredDelegate =
- (ClientClusteredConnectionFactoryDelegate)d;
-
- clusteredDelegate.updateFailoverInfo(viewChange.getDelegates(),
- viewChange.getFailoverMap());
- }
+ clusteredDelegate.updateFailoverInfo(viewChange.getDelegates(),
+ viewChange.getFailoverMap());
+ }
+ }
}
public String toString()
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-27 22:54:42 UTC (rev 2809)
@@ -24,6 +24,7 @@
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.TextMessage;
import org.jboss.jms.delegate.ConsumerEndpoint;
import org.jboss.jms.destination.JBossDestination;
@@ -49,7 +50,6 @@
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.remoting.Client;
import org.jboss.remoting.callback.Callback;
-import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
@@ -240,6 +240,17 @@
if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
Message message = ref.getMessage();
+
+ TextMessage tm = (TextMessage)message;
+
+ try
+ {
+ log.info("TRYING TO DELIVER " + tm.getText());
+ }
+ catch (Exception e)
+ {
+
+ }
boolean selectorRejected = !this.accept(message);
@@ -317,18 +328,24 @@
this.lastDeliveryID = deliveryId;
}
}
- catch (HandleCallbackException e)
+ catch (Throwable t)
{
// it's an oneway callback, so exception could only have happened on the server, while
// trying to send the callback. This is a good reason to smack the whole connection.
// I trust remoting to have already done its own cleanup via a CallbackErrorHandler,
// I need to do my own cleanup at ConnectionManager level.
- log.debug(this + " failed to handle callback", e);
+ log.debug(this + " failed to handle callback", t);
- //We stop the consumer - some time later the lease will expire and the connection will be closed
+ //We stop the consumer - some time later the lease will expire and the connection will be closed
+ //which will remove the consumer
+
+ started = false;
- return null;
+ //** IMPORTANT NOTE! We must return the delivery NOT null. **
+ //This is because if we return NULL then message will remain in the queue, but later
+ //the connection checker will cleanup and close this consumer which will cancel all the deliveries in it
+ //including this one, so the message will go back on the queue twice!
}
return delivery;
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-06-27 22:54:42 UTC (rev 2809)
@@ -155,7 +155,7 @@
*
*/
public synchronized void notify(ClusterNotification notification)
- {
+ {
if (replicator == null)
{
//Non clustered
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-06-27 22:54:42 UTC (rev 2809)
@@ -6,6 +6,9 @@
*/
package org.jboss.test.messaging.jms.clustering;
+import java.util.HashSet;
+import java.util.Set;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
@@ -201,7 +204,13 @@
log.info("Sent next 15 on node 1");
// creates another consumer... before killing the server
+
+ // This will actually end up sucking messages from node 0
MessageConsumer consumer1 = session1.createConsumer(queue[1]);
+
+ //Give it enough time to suck
+
+ Thread.sleep(5000);
log.info("Killing node1");
ServerManagement.killAndWait(1);
@@ -217,13 +226,28 @@
log.info("creating new consumer");
+ //We should now be able to consume the messages 5 to 19.
+ //Note that they will be in a different order since 10 to 10 were sucked to node 0 before crashing
+ //Also there is the possibility that after crashing the queue attempted to delivery to one or more of the remote consumers
+ //for the node that crashed, (YES it is possible to send more than one message on a failed connection before getting
+ //an exception), so this won't be cancelled until the connection checker kicks in any closes the consumer
+
+ Set msgs = new HashSet();
+
for (int i = 5; i < 20; i++)
{
- msg = (TextMessage)consumer0.receive(5000);
+ msg = (TextMessage)consumer0.receive(60000);
assertNotNull(msg);
- log.info("msg = " + msg.getText());
- assertEquals("message " + i, msg.getText());
+
+ log.info("Got message " + msg.getText());
+
+ msgs.add(msg.getText());
}
+
+ for (int i = 5; i < 20; i++)
+ {
+ assertTrue(msgs.contains("message " + i));
+ }
assertNull(consumer0.receive(5000));
@@ -289,75 +313,29 @@
session1.commit();
- //Make sure messages exist
+ //Don't consume them or they will be pulled from one node to another
- MessageConsumer cons0 = session0.createConsumer(queue[0]);
- conn0.start();
-
- TextMessage tm;
-
- for (int i = 0; i < 10; i++)
- {
- tm = (TextMessage)cons0.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message " + i, tm.getText());
- }
-
- tm = (TextMessage)cons0.receive(1000);
-
- assertNull(tm);
-
- session0.rollback();
-
- cons0.close();
-
- cons0 = null;
-
-
- MessageConsumer cons1 = session1.createConsumer(queue[0]);
-
- conn1.start();
-
- for (int i = 10; i < 20; i++)
- {
- tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message " + i, tm.getText());
- }
-
- tm = (TextMessage)cons1.receive(1000);
-
- assertNull(tm);
-
- session1.rollback();
-
- cons1.close();
-
- cons1 = null;
-
-
//Now kill the server
waitForFailoverComplete(1, conn1);
//Messages should all be available on node 0
- cons0 = session0.createConsumer(queue[0]);
+ MessageConsumer cons0 = session0.createConsumer(queue[0]);
+ TextMessage tm;
+
+ conn0.start();
+
for (int i = 0; i < 20; i++)
{
- tm = (TextMessage)cons0.receive(1000);
+ tm = (TextMessage)cons0.receive(60000);
assertNotNull(tm);
log.info("received message " + tm.getText());
-
-
- //assertEquals("message " + i, tm.getText());
+
+ assertEquals("message " + i, tm.getText());
}
tm = (TextMessage)cons0.receive(1000);
@@ -516,6 +494,8 @@
for (int i = 0; i < messages0; i++)
{
producer0.send(session0.createTextMessage("message " + i));
+
+ log.info("Sent message: message " + i);
}
session0.commit();
@@ -530,62 +510,14 @@
for (int i = messages0; i < messages0 + messages1; i++)
{
producer1.send(session1.createTextMessage("message " + i));
+
+ log.info("Sent message: message " + i);
}
session1.commit();
+ MessageConsumer cons0 = null;
- //Make sure messages exist
-
- MessageConsumer cons0 = session0.createConsumer(queue0);
-
- conn0.start();
-
- TextMessage tm;
-
- for (int i = 0; i < messages0; i++)
- {
- tm = (TextMessage)cons0.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message " + i, tm.getText());
- }
-
- tm = (TextMessage)cons0.receive(2000);
-
- assertNull(tm);
-
- session0.rollback();
-
- cons0.close();
-
- cons0 = null;
-
-
- MessageConsumer cons1 = session1.createConsumer(queue1);
-
- conn1.start();
-
- for (int i = messages0; i < messages0 + messages1; i++)
- {
- tm = (TextMessage)cons1.receive(2000);
-
- assertNotNull(tm);
-
- assertEquals("message " + i, tm.getText());
- }
-
- tm = (TextMessage)cons1.receive(2000);
-
- assertNull(tm);
-
- session1.rollback();
-
- cons1.close();
-
- cons1 = null;
-
if (fillConsumer)
{
//Creating the consumer immediately after kill should ensure that all the messages are in the consumer and
@@ -604,22 +536,38 @@
//Messages should all be available on node 0
+ //Note they may be in a different order due to being pulled in to the consumer before killing the server
+ //And also because they may have been attempted to have been delivered to a remote consumer corresponding to a
+ //remote consumer for the failed node, so that delivery or one after may fail, so those messages may not get cancelled
+ //back until the connection checker kicks in and closes the consumer
+
conn0.start();
-
- log.info("now consuming");
+
+
+ Set msgs = new HashSet();
+
+ TextMessage tm;
+
for (int i = 0; i < messages0 + messages1; i++)
{
- tm = (TextMessage)cons0.receive(5000);
+ //Need a long timeout to allow for connection checker to kick in and close consumer
+ tm = (TextMessage)cons0.receive(60000);
assertNotNull(tm);
- log.info("received message " + tm.getText());
+ log.info("Got message " + tm.getText());
+
+ msgs.add(tm.getText());
}
+ for (int i = 0; i < messages0 + messages1; i++)
+ {
+ assertTrue(msgs.contains("message " + i));
+ }
+
tm = (TextMessage)cons0.receive(2000);
- assertNull(tm);
-
+ assertNull(tm);
}
finally
{
More information about the jboss-cvs-commits
mailing list