[jboss-cvs] JBoss Messaging SVN: r1853 - in trunk: src/main/org/jboss/jms/server/endpoint tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 23 03:43:47 EST 2006


Author: timfox
Date: 2006-12-23 03:43:42 -0500 (Sat, 23 Dec 2006)
New Revision: 1853

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
serialize deliveries


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-23 06:20:50 UTC (rev 1852)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-23 08:43:42 UTC (rev 1853)
@@ -47,6 +47,9 @@
 import org.jboss.remoting.callback.HandleCallbackException;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
 /**
  * Concrete implementation of ConsumerEndpoint.
  * 
@@ -100,6 +103,8 @@
    // Must be volatile
    private volatile boolean clientAccepting;
    
+   private Executor executor; /// TEMPORARILY
+   
    // Constructors --------------------------------------------------
 
    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
@@ -125,6 +130,9 @@
       
       this.destination = dest;
       
+      //TEMP
+      this.executor = new QueuedExecutor();
+      
       //Always start as false - wait for consumer to initiate
       this.clientAccepting = false;
       
@@ -216,21 +224,47 @@
          
          Callback callback = new Callback(mm);
    
+         //FIXME - we need to use the asynch callback API, this is the Sync one
+
+         //This is temporary - to ensure deliveries happen in sequence!!!!!!!!
+         
+         class Runner implements Runnable
+         {
+            Callback cb;
+            
+            Runner(Callback cb)
+            {
+               this.cb = cb;
+            }
+         
+            public void run()
+            {
+               try
+               {            
+                  callbackHandler.handleCallback(cb);               
+               }
+               catch (HandleCallbackException e)
+               {
+                  log.error("Failed to handle callback", e);
+               }
+               
+            }  
+         }
+         
          try
          {
-            //FIXME - we need to use the asynch callback API, this is the Sync one
-            callbackHandler.handleCallback(callback);
+            executor.execute(new Runner(callback));
          }
-         catch (HandleCallbackException e)
+         catch (InterruptedException e)
          {
-            log.error("Failed to handle callback", e);
-            
-            return null;
+            //Ignore
          }
-                             
+           
          return delivery;      
       }
    }      
+   
+   
 
    // Filter implementation -----------------------------------------
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATest.java	2006-12-23 06:20:50 UTC (rev 1852)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATest.java	2006-12-23 08:43:42 UTC (rev 1853)
@@ -1371,6 +1371,9 @@
          assertEquals("jellyfish2", r1.getText());
 
          cons1.close();
+         
+         //Cancel is asynch
+         Thread.sleep(500);
 
          MessageConsumer cons2 = sess2.createConsumer(queue);
          TextMessage r2 = (TextMessage)cons2.receive(MAX_TIMEOUT);




More information about the jboss-cvs-commits mailing list