[jboss-cvs] JBoss Messaging SVN: r1853 - in trunk: src/main/org/jboss/jms/server/endpoint tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 23 03:43:47 EST 2006
Author: timfox
Date: 2006-12-23 03:43:42 -0500 (Sat, 23 Dec 2006)
New Revision: 1853
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
serialize deliveries
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-23 06:20:50 UTC (rev 1852)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-23 08:43:42 UTC (rev 1853)
@@ -47,6 +47,9 @@
import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
/**
* Concrete implementation of ConsumerEndpoint.
*
@@ -100,6 +103,8 @@
// Must be volatile
private volatile boolean clientAccepting;
+ private Executor executor; /// TEMPORARILY
+
// Constructors --------------------------------------------------
ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
@@ -125,6 +130,9 @@
this.destination = dest;
+ //TEMP
+ this.executor = new QueuedExecutor();
+
//Always start as false - wait for consumer to initiate
this.clientAccepting = false;
@@ -216,21 +224,47 @@
Callback callback = new Callback(mm);
+ //FIXME - we need to use the asynch callback API, this is the Sync one
+
+ //This is temporary - to ensure deliveries happen in sequence!!!!!!!!
+
+ class Runner implements Runnable
+ {
+ Callback cb;
+
+ Runner(Callback cb)
+ {
+ this.cb = cb;
+ }
+
+ public void run()
+ {
+ try
+ {
+ callbackHandler.handleCallback(cb);
+ }
+ catch (HandleCallbackException e)
+ {
+ log.error("Failed to handle callback", e);
+ }
+
+ }
+ }
+
try
{
- //FIXME - we need to use the asynch callback API, this is the Sync one
- callbackHandler.handleCallback(callback);
+ executor.execute(new Runner(callback));
}
- catch (HandleCallbackException e)
+ catch (InterruptedException e)
{
- log.error("Failed to handle callback", e);
-
- return null;
+ //Ignore
}
-
+
return delivery;
}
}
+
+
// Filter implementation -----------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATest.java 2006-12-23 06:20:50 UTC (rev 1852)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATest.java 2006-12-23 08:43:42 UTC (rev 1853)
@@ -1371,6 +1371,9 @@
assertEquals("jellyfish2", r1.getText());
cons1.close();
+
+ //Cancel is asynch
+ Thread.sleep(500);
MessageConsumer cons2 = sess2.createConsumer(queue);
TextMessage r2 = (TextMessage)cons2.receive(MAX_TIMEOUT);
More information about the jboss-cvs-commits
mailing list