[jboss-cvs] JBoss Messaging SVN: r2431 - in trunk: src/main/org/jboss/jms/client/remoting and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Feb 25 17:38:43 EST 2007


Author: timfox
Date: 2007-02-25 17:38:43 -0500 (Sun, 25 Feb 2007)
New Revision: 2431

Modified:
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
   trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
Log:
Made change rate synchronous to avoid race


Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-02-25 22:38:43 UTC (rev 2431)
@@ -136,7 +136,7 @@
    {
       RequestSupport req = new ConsumerChangeRateRequest(id, version, newRate);
       
-      doInvokeOneway(client, req);
+      doInvoke(client, req);
    }
 
    /**

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-25 22:38:43 UTC (rev 2431)
@@ -240,6 +240,10 @@
 
    public void handleMessage(final Object message) throws Exception
    {
+      //TODO - we temporarily need to execute on a different thread to 
+      //avoid a deadlock situation in failover
+      //where a message is sent then the valve is locked, and the message send cause a message
+      //delivery back to the same client which tries to ack but can't get through the valve
       this.sessionExecutor.execute(
                new Runnable() { public void run()
                {
@@ -541,6 +545,10 @@
       // TODO If we don't zap this buffer, we may be able to salvage some non-persistent messages
 
       buffer.clear();
+      
+      //need to reset toggle state
+      serverSending = true;
+      
    }
    
    public long getLastDeliveryId()

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-25 22:38:43 UTC (rev 2431)
@@ -475,20 +475,15 @@
 
          // No need to synchronize - clientAccepting is volatile.
 
-         // Important note - this invocations can arrive in a different order to
-         // which they were
-         // sent - this is inherent in one way invocations where a client side
-         // pool is used.
-         // Therefore we just toggle the clientAccepting flag - if we actually
-         // looked at the newRate
-         // value we might end up turning off the consumer when it should be on
-         // (since a off-on, arrives as on-off)
-         // Toggling is safe, but when we start to look at the actual rate value
-         // we will
-         // have to be a bit cleverer
+         if (newRate > 0)
+         {
+            clientAccepting = true;
+         }
+         else
+         {
+            clientAccepting = false;
+         }
 
-         clientAccepting = !clientAccepting;
-
          if (clientAccepting)
          {
             promptDelivery();

Modified: trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java	2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java	2007-02-25 22:38:43 UTC (rev 2431)
@@ -88,17 +88,20 @@
       os.flush();
    }
    
-   public Object getPayload()
-   {
-      OnewayInvocation oi = new OnewayInvocation(this);
+   //Until we have NBIO transport this needs to be synchronous otherwise we can get out of sync
+   //due to earlier invocations overtaking later invocations
+   
+//   public Object getPayload()
+//   {
+//      OnewayInvocation oi = new OnewayInvocation(this);
+//
+//      InvocationRequest request =
+//         new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+//                               oi, ONE_WAY_METADATA, null, null);
+//      
+//      return request;     
+//   }
 
-      InvocationRequest request =
-         new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
-                               oi, ONE_WAY_METADATA, null, null);
-      
-      return request;     
-   }
-
 }
 
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java	2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java	2007-02-25 22:38:43 UTC (rev 2431)
@@ -64,92 +64,92 @@
       
    }
    
-//   public void testSendReceive() throws Exception
-//   {            
-//      Hashtable properties = new Hashtable();
-//         
-//      properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
-//      
-//      properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
-//      
-//      properties.put("java.naming.factory.url", "org.jnp.interfaces");
-//      
-//      log.info("Creaing ic");
-//      
-//      InitialContext ic = new InitialContext(properties);
-//      
-//      log.info("************ REMOTE");
-//      
-//      Connection conn = null;
-//      
-//      try
-//      {           
-//         log.info("Created ic");
-//         
-//         Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
-//         
-//         log.info("Looked up queue");
-//         
-//         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-//         
-//         log.info("Looked up cf");
-//          
-//         conn = cf.createConnection();
-//         
-//         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         MessageConsumer cons = sessCons.createConsumer(queue);
-//         
-//         MessageListener list = new MyListener();
-//         
-//         cons.setMessageListener(list);
-//         
-//         conn.start();
-//         
-//         MessageProducer prod = sessSend.createProducer(queue);
-//        
-//         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-//         
-//         int count = 0;
-//         
-//         while (true)
+   public void testSendReceive() throws Exception
+   {            
+      Hashtable properties = new Hashtable();
+         
+      properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
+      
+      properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
+      
+      properties.put("java.naming.factory.url", "org.jnp.interfaces");
+      
+      log.info("Creaing ic");
+      
+      InitialContext ic = new InitialContext(properties);
+      
+      log.info("************ REMOTE");
+      
+      Connection conn = null;
+      
+      try
+      {           
+         log.info("Created ic");
+         
+         Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
+         
+         log.info("Looked up queue");
+         
+         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+         
+         log.info("Looked up cf");
+          
+         conn = cf.createConnection();
+         
+         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons = sessCons.createConsumer(queue);
+         
+         MessageListener list = new MyListener();
+         
+         cons.setMessageListener(list);
+         
+         conn.start();
+         
+         MessageProducer prod = sessSend.createProducer(queue);
+        
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         
+         int count = 0;
+         
+         while (true)
+         {
+            TextMessage tm = sessSend.createTextMessage("message " + count);
+            
+            prod.send(tm);
+            
+            log.info("sent " + count);
+            
+            count++;
+            
+            //Thread.sleep(250);
+         }
+         
+         
+      }
+      catch (Exception e)
+      {
+         log.error("Failed", e);
+         throw e;
+      }
+      finally
+      {      
+//         if (conn != null)
 //         {
-//            TextMessage tm = sessSend.createTextMessage("message " + count);
-//            
-//            prod.send(tm);
-//            
-//            log.info("sent " + count);
-//            
-//            count++;
-//            
-//            //Thread.sleep(250);
+//            log.info("closing connetion");
+//            try
+//            {
+//               conn.close();
+//            }
+//            catch (Exception ignore)
+//            {               
+//            }
+//            log.info("closed connection");
 //         }
-//         
-//         
-//      }
-//      catch (Exception e)
-//      {
-//         log.error("Failed", e);
-//         throw e;
-//      }
-//      finally
-//      {      
-////         if (conn != null)
-////         {
-////            log.info("closing connetion");
-////            try
-////            {
-////               conn.close();
-////            }
-////            catch (Exception ignore)
-////            {               
-////            }
-////            log.info("closed connection");
-////         }
-//      }     
-//   }
+      }     
+   }
    
    class MyListener implements MessageListener
    {




More information about the jboss-cvs-commits mailing list