[jboss-cvs] JBoss Messaging SVN: r4153 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 6 14:51:11 EDT 2008


Author: timfox
Date: 2008-05-06 14:51:11 -0400 (Tue, 06 May 2008)
New Revision: 4153

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
Log:
Disabled flow control for now


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-06 18:51:11 UTC (rev 4153)
@@ -126,26 +126,28 @@
    {
    	ProducerSendMessage message = new ProducerSendMessage(address, msg.copy());
    	
-   	//We only flow control with non-anonymous producers
-   	if (address == null)
-   	{
-   		while (windowSize == 0)
-   		{
-				synchronized (this)
-				{
-					try
-					{						
-					   wait();						
-					}
-					catch (InterruptedException e)
-					{   						
-					}
-				}		
-   		}
-   		
-   		windowSize--;
-   	}
+   	//TODO flow control disabled for now
    	
+//   	//We only flow control with non-anonymous producers
+//   	if (address == null)
+//   	{
+//   		while (windowSize == 0)
+//   		{
+//				synchronized (this)
+//				{
+//					try
+//					{						
+//					   wait();						
+//					}
+//					catch (InterruptedException e)
+//					{   						
+//					}
+//				}		
+//   		}
+//   		
+//   		windowSize--;
+//   	}
+   	
    	if (msg.isDurable())
    	{
    	   remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), message);
@@ -155,12 +157,12 @@
    	   remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
    	}
    	 	   	
-   	if (rateLimiter != null)
-   	{
-   	   // Rate flow control
-      	   		
-   		rateLimiter.limit();
-   	}
+//   	if (rateLimiter != null)
+//   	{
+//   	   // Rate flow control
+//      	   		
+//   		rateLimiter.limit();
+//   	}
    }
             
    public void registerAcknowledgementHandler(final AcknowledgementHandler handler)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java	2008-05-06 18:51:11 UTC (rev 4153)
@@ -60,7 +60,7 @@
       PacketDispatcher previousDispatcher = localDispatchers.get(key);
 
       localDispatchers.put(key, serverDispatcher);
-      if(log.isDebugEnabled())
+      if (log.isDebugEnabled())
       {
          log.debug("registered " + key + " for " + serverDispatcher);
       }
@@ -89,8 +89,6 @@
       assert location != null;
       String key = location.getLocation();
       
-      log.info("*** Getting connector for " + location);
-      
       if (connectors.containsKey(key))
       {         
          NIOConnectorHolder holder = connectors.get(key);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-05-06 18:51:11 UTC (rev 4153)
@@ -6,7 +6,8 @@
  */
 package org.jboss.messaging.core.remoting.impl.invm;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.NIOSession;
@@ -31,6 +32,7 @@
    private final PacketDispatcher clientDispatcher;
    private final PacketDispatcher serverDispatcher;
    private boolean connected;
+   private ExecutorService executor = Executors.newSingleThreadExecutor();
    
    // Static --------------------------------------------------------
    private static final Logger log = Logger.getLogger(INVMSession.class);
@@ -53,6 +55,7 @@
 
    public boolean close()
    {
+      executor.shutdownNow();
       connected = false;
       return true;
    }
@@ -73,25 +76,42 @@
    {
      // assert packet instanceof Packet;
 
-      serverDispatcher.dispatch((Packet) packet,
-            new PacketSender()
+      //Must be executed on different thread
+      
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
             {
-               public void send(Packet response) throws Exception
-               {                  
-                  serverDispatcher.callFilters(response);
-                  clientDispatcher.dispatch(response, null);   
-               }
-               
-               public long getSessionID()
-               {
-                  return getID();
-               }
-               
-               public String getRemoteAddress()
-               {
-                  return "invm";
-               }
-            });
+               serverDispatcher.dispatch((Packet) packet,
+                     new PacketSender()
+                     {
+                        public void send(Packet response) throws Exception
+                        {                  
+                           serverDispatcher.callFilters(response);
+                           clientDispatcher.dispatch(response, null);   
+                        }
+                        
+                        public long getSessionID()
+                        {
+                           return getID();
+                        }
+                        
+                        public String getRemoteAddress()
+                        {
+                           return "invm";
+                        }
+                     });
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to execute dispatch", e);
+            }
+         }
+      });
+            
+      
    }
 
 //   public Object writeAndBlock(final Packet request, long timeout, TimeUnit timeUnit) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-06 18:51:11 UTC (rev 4153)
@@ -1052,11 +1052,13 @@
 
    	final int maxRateToUse = maxRate;
 
-   	if (address != null)
-   	{
-   		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
-   	}
+   	// TODO Flow control disabled for now
    	
+//   	if (address != null)
+//   	{
+//   		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+//   	}
+   	
    	long id = dispatcher.generateID();
 
    	ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-06 18:51:11 UTC (rev 4153)
@@ -1,24 +1,24 @@
 /*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.test.messaging.jms;
 
 import java.io.Serializable;
@@ -47,7 +47,7 @@
    // Constants -----------------------------------------------------
 
    // Static --------------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -62,62 +62,62 @@
    public void testSendForeignWithForeignDestinationSet() throws Exception
    {   	   	
       Connection conn = null;      
- 
+
       try
       {
-      	conn = cf.createConnection();
+         conn = cf.createConnection();
 
          Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
+
          MessageProducer p = sess.createProducer(queue1);
-         
+
          MessageConsumer c = sess.createConsumer(queue1);
 
          conn.start();
-        
+
          Message foreign = new SimpleJMSMessage(new SimpleDestination());
-         
+
          foreign.setJMSDestination(new SimpleDestination());
-         
+
          //the producer destination should override the foreign destination and the send should succeed
-         
+
          p.send(foreign);
 
          Message m = c.receive(1000);
-         
+
          assertNotNull(m);
-         
+
       }
       finally
       {
          conn.close();
       }
    }
-   
+
    private static class SimpleDestination implements Destination, Serializable
    {
    }
-   
+
    public void testSendToQueuePersistent() throws Exception
    {
-   	sendToQueue(true);
+      sendToQueue(true);
    }
-   
+
    public void testSendToQueueNonPersistent() throws Exception
    {
-   	sendToQueue(false);
+      sendToQueue(false);
    }
-   
+
    private void sendToQueue(boolean persistent) throws Exception
    {
       Connection pconn = null;      
       Connection cconn = null;
-      
+
       try
       {
-      	pconn = cf.createConnection();
-      	cconn = cf.createConnection();
-      	
+         pconn = cf.createConnection();
+         cconn = cf.createConnection();
+
          Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer p = ps.createProducer(queue1);
@@ -140,16 +140,54 @@
       }
    }
 
+   public void testSpeed() throws Exception
+   {
+      Connection pconn = null;      
+
+      try
+      {
+         pconn = cf.createConnection();
+
+         Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer p = ps.createProducer(queue1);
+         
+         p.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final int numMessages = 10000;
+
+         long start = System.currentTimeMillis();
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            Message msg = ps.createMessage();
+
+            p.send(msg);
+         }
+
+         long end = System.currentTimeMillis();
+
+         double actualRate = 1000 * (double)numMessages / ( end - start);
+
+         log.info("rate " + actualRate + " msgs /sec");
+
+      }
+      finally
+      {
+
+      }
+   }
+
    public void testTransactedSendPersistent() throws Exception
    {
-   	transactedSend(true);
+      transactedSend(true);
    }
-   
+
    public void testTransactedSendNonPersistent() throws Exception
    {
-   	transactedSend(false);
+      transactedSend(false);
    }
-   
+
    private void transactedSend(boolean persistent) throws Exception
    {
       Connection pconn = null;
@@ -157,9 +195,9 @@
 
       try
       {
-      	pconn = cf.createConnection();
-      	cconn = cf.createConnection();
-      	
+         pconn = cf.createConnection();
+         cconn = cf.createConnection();
+
          cconn.start();
 
          Session ts = pconn.createSession(true, -1);
@@ -183,7 +221,7 @@
          cconn.close();
       }
    }
-   
+
    //I moved this into it's own class so we can catch any exception that occurs
    //Since this test intermittently fails.
    //(As an aside, technically this test is invalid anyway since the sessions is used for sending
@@ -191,18 +229,18 @@
    private class Sender implements Runnable
    {
       volatile Exception ex;
-      
+
       MessageProducer prod;
-      
+
       Message m;
-      
+
       Sender(MessageProducer prod, Message m)
       {
          this.prod = prod;
-         
+
          this.m = m;
       }
-      
+
       public synchronized void run()
       {
          try
@@ -212,20 +250,20 @@
          catch(Exception e)
          {
             log.error(e);
-            
+
             ex = e;
          }
       }
    }
-   
+
    public void testPersistentSendToTopic() throws Exception
    {
-   	sendToTopic(true);
+      sendToTopic(true);
    }
-   
+
    public void testNonPersistentSendToTopic() throws Exception
    {
-   	sendToTopic(false);      
+      sendToTopic(false);      
    }
 
    private void sendToTopic(boolean persistent) throws Exception
@@ -239,15 +277,15 @@
          Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          final MessageProducer p = ps.createProducer(topic1);
-         
+
          p.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
+
          MessageConsumer c = cs.createConsumer(topic1);
 
          cconn.start();
 
          TextMessage m1 = ps.createTextMessage("test");
-          
+
          Sender sender = new Sender(p, m1);
 
          Thread t = new Thread(sender, "Producer Thread");
@@ -255,7 +293,7 @@
          t.start();
 
          TextMessage m2 = (TextMessage)c.receive(5000);
-         
+
          if (sender.ex != null)
          {
             //If an exception was caught in sending we rethrow here so as not to lose it
@@ -273,9 +311,9 @@
          cconn.close();
       }
    }
-   
 
 
+
    /**
     *  Test sending via anonymous producer
     * */
@@ -342,7 +380,7 @@
          p.send(m);         
 
          TextMessage rec = (TextMessage)c.receive(3000);
-         
+
          assertEquals("something", rec.getText());
 
       }
@@ -395,36 +433,36 @@
          pconn.close();
       }
    }
-   
+
    //Is this test valid?
    //How can we check if the destination is valid if it is created on the client side only??
 
    // TODO - verify what spec says about this and enable/delete the test accordingly
 
-//   public void testCreateProducerOnInexistentDestination() throws Exception
-//   {
-//      Connection pconn = cf.createConnection();
-//
-//      try
-//      {
-//         Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-//         try
-//         {
-//            ps.createProducer(new JBossTopic("NoSuchTopic"));
-//            fail("should throw exception");
-//         }
-//         catch(InvalidDestinationException e)
-//         {
-//            // OK
-//         }
-//      }
-//      finally
-//      {
-//         pconn.close();
-//      }
-//   }  
+// public void testCreateProducerOnInexistentDestination() throws Exception
+// {
+// Connection pconn = cf.createConnection();
 
+// try
+// {
+// Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+// try
+// {
+// ps.createProducer(new JBossTopic("NoSuchTopic"));
+// fail("should throw exception");
+// }
+// catch(InvalidDestinationException e)
+// {
+// // OK
+// }
+// }
+// finally
+// {
+// pconn.close();
+// }
+// }  
+
    //
    // disabled MessageID tests
    //
@@ -781,13 +819,13 @@
    }
 
    // Package protected ---------------------------------------------
-   
+
    // Protected -----------------------------------------------------
-   
+
    // Private -------------------------------------------------------
-   
+
    // Inner classes -------------------------------------------------
-   
-   
 
+
+
 }




More information about the jboss-cvs-commits mailing list