[hornetq-commits] JBoss hornetq SVN: r8413 - in branches/20-optimisation: src/main/org/hornetq/core/client/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 26 08:30:42 EST 2009


Author: timfox
Date: 2009-11-26 08:30:41 -0500 (Thu, 26 Nov 2009)
New Revision: 8413

Modified:
   branches/20-optimisation/build-hornetq.xml
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
fixed some tests and re-instated new HornetQDecoder

Modified: branches/20-optimisation/build-hornetq.xml
===================================================================
--- branches/20-optimisation/build-hornetq.xml	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/build-hornetq.xml	2009-11-26 13:30:41 UTC (rev 8413)
@@ -1223,9 +1223,9 @@
                     haltonerror="${junit.batchtest.haltonerror}"
                     failureproperty="tests.failed">
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
-            <fileset dir="${test.classes.dir}">
-               <!-- <exclude name="**/integration/http/*" /> -->
+            <fileset dir="${test.classes.dir}">            
                <include name="${tests.param}"/>
+               <exclude name="**/integration/cluster/reattach/Netty*" />
             </fileset>
          </batchtest>
       </junit>

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -697,7 +697,6 @@
             log.trace("Setting up flowControlSize to " + message.getPacketSize() + " on message = " + clMessage);
          }
        
-        // log.info("setting flow control size as " + message.getPacketSize());
          clMessage.setFlowControlSize(message.getPacketSize());
 
          consumer.handleMessage(clMessage);

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -259,9 +259,12 @@
 
    public void setDestination(final SimpleString destination)
    {
-      this.destination = destination;
+      if (this.destination != destination)
+      {
+         this.destination = destination;
 
-      bufferValid = false;
+         bufferValid = false;
+      }
    }
 
    public byte getType()
@@ -276,9 +279,12 @@
 
    public void setDurable(final boolean durable)
    {
-      this.durable = durable;
+      if (this.durable != durable)
+      {
+         this.durable = durable;
 
-      bufferValid = false;
+         bufferValid = false;
+      }
    }
 
    public long getExpiration()
@@ -288,9 +294,12 @@
 
    public void setExpiration(final long expiration)
    {
+      if (this.expiration != expiration)
+      {
       this.expiration = expiration;
 
       bufferValid = false;
+      }
    }
 
    public long getTimestamp()
@@ -300,9 +309,12 @@
 
    public void setTimestamp(final long timestamp)
    {
+      if (this.timestamp != timestamp)
+      {
       this.timestamp = timestamp;
 
       bufferValid = false;
+      }
    }
 
    public byte getPriority()
@@ -312,9 +324,12 @@
 
    public void setPriority(final byte priority)
    {
-      this.priority = priority;
+      if (this.priority != priority)
+      {
+         this.priority = priority;
 
-      bufferValid = false;
+         bufferValid = false;
+      }
    }
 
    public boolean isExpired()

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -387,7 +387,7 @@
       {
          lastReceivedCommandID++;
 
-         receivedBytes += packet.getPacketSize();
+         receivedBytes += packet.getPacketSize();         
 
          if (receivedBytes >= confWindowSize)
          {

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -111,6 +111,7 @@
       requiresResponse = buffer.readBoolean(); 
             
       buffer.readerIndex(ri);
+            
    }
    
    // Private -------------------------------------------------------

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -81,9 +81,7 @@
       // We must make a copy of the message, otherwise things like returning credits to the page won't work
       // properly on ack, since the original destination will be overwritten
 
-      // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
-
-      log.info("making copy for divert");
+      // TODO we can optimise this so it doesn't copy if it's not routed anywhere else   
       
       ServerMessage copy = message.copy();
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -185,10 +185,8 @@
 
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
-      //log.info("handling message");
       if (availableCredits != null && availableCredits.get() <= 0)
       {
-        // log.info("busy");
          return HandleStatus.BUSY;
       }
 
@@ -418,7 +416,6 @@
 
    public void receiveCredits(final int credits) throws Exception
    {
-  //    log.info("Receiving credits " + credits);
       if (credits == -1)
       {
          // No flow control
@@ -592,12 +589,9 @@
 
       if (availableCredits != null)
       {
-        //log.info("Subtracting credits " + packet.getPacketSize());
          availableCredits.addAndGet(-packet.getPacketSize());
       }
       
-    //  log.info("delivered message");
-
    }
 
    // Inner classes

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -83,11 +83,6 @@
       messageID = id;
    }
 
-   public void setType(final byte type)
-   {
-      this.type = type;
-   }
-
    public MessageReference createReference(final Queue queue)
    {
       MessageReference ref = new MessageReferenceImpl(this, queue);

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -1574,8 +1574,6 @@
       final CreditManagerHolder holder = this.getCreditManagerHolder(address);
 
       int credits = packet.getCredits();
-      
-      //log.info("requesting credits " + credits);
 
       int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
       {
@@ -1597,8 +1595,6 @@
          }
       });
       
-      //log.info("got credits " + gotCredits);
-
       if (gotCredits > 0)
       {
          sendProducerCredits(holder, gotCredits, address);

Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -47,7 +47,7 @@
    public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
    {
       assert pipeline != null;
-      pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+      pipeline.addLast("decoder", new HornetQFrameDecoder2());
    }
 
    public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -60,7 +60,7 @@
       String operationName = randomString();
       String param = randomString();
       String[] params = new String[] { randomString(), randomString(), randomString() };
-      Message msg = new ClientMessageImpl();
+      Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
       ManagementHelper.putOperationInvocation(msg, resource, operationName, param, params);
 
       Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -148,7 +148,7 @@
 
       Object[] params = new Object[] { i, s, d, b, l, map, strArray, maps };
 
-      Message msg = new ClientMessageImpl();
+      Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
       ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
 
       Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -214,7 +214,7 @@
       
       Object[] params = new Object[] { "hello", map };
 
-      Message msg = new ClientMessageImpl();      
+      Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
       ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
 
       Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -15,11 +15,14 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -38,6 +41,7 @@
 import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.jms.HornetQQueue;
 import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQMessage;
 import org.hornetq.jms.client.HornetQSession;
 import org.hornetq.tests.util.RandomUtil;
 
@@ -56,7 +60,7 @@
    {
       try
       {
-         new SendTest().runTextMessage();
+         new SendTest().runConsume();
       }
       catch (Exception e)
       {
@@ -70,8 +74,9 @@
    {
       log.info("*** Starting server");
 
-      System.setProperty("org.hornetq.opt.dontadd", "true");
+      //System.setProperty("org.hornetq.opt.dontadd", "true");
      // System.setProperty("org.hornetq.opt.routeblast", "true");
+      System.setProperty("org.hornetq.opt.generatemessages", "true");
 
       Configuration configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
@@ -210,6 +215,134 @@
       server.stop();
    }
    
+   public void runSendConsume() throws Exception
+   {
+      startServer();
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+
+      // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+      // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+      
+      params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+      //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+      //TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+      HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+      
+      cf.setProducerWindowSize(1024 * 1024);
+
+      Connection conn = cf.createConnection();
+
+      Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+      coreSession.createQueue("jms.queue.test_queue", "jms.queue.test_queue");
+            
+      Queue queue = new HornetQQueue("test_queue");
+      
+      MessageConsumer cons = sess.createConsumer(queue);
+      
+      conn.start();
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      final int warmup = 500000;
+            
+      final int numMessages = 2000000;
+      
+      MessageListener listener = new MessageListener()
+      {
+         int count;
+         public void onMessage(Message message)
+         {
+            count++;
+            
+            if (count % 10000 == 0)
+            {
+               log.info("received " + count);
+            }
+            
+            if (count == numMessages + warmup)
+            {
+               latch.countDown();
+            }
+         }
+      };
+      
+      cons.setMessageListener(listener);
+
+      MessageProducer prod = sess.createProducer(queue);
+      
+      prod.setDisableMessageID(true);
+      
+      prod.setDisableMessageTimestamp(true);
+
+      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+      byte[] bytes1 = new byte[] { (byte)'A', (byte)'B',(byte)'C',(byte)'D'};
+      
+      String s = new String(bytes1);
+      
+      System.out.println("Str is " + s);
+      
+      byte[] bytes = RandomUtil.randomBytes(512);
+
+      String str = new String(bytes);
+      
+      
+      log.info("Warming up");
+      
+      TextMessage tm = sess.createTextMessage();
+      
+      tm.setText(str);
+                             
+      for (int i = 0; i < warmup; i++)
+      {                  
+         prod.send(tm);
+
+         if (i % 10000 == 0)
+         {
+            log.info("sent " + i);
+         }
+      }
+      
+      log.info("** WARMUP DONE");
+       
+      tm = sess.createTextMessage();
+
+      tm.setText(str);
+      
+      long start = System.currentTimeMillis();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         prod.send(tm);
+
+         if (i % 10000 == 0)
+         {
+            log.info("sent " + i);
+         }
+      }
+      
+      latch.countDown();
+      
+      sess.close();
+
+      long end = System.currentTimeMillis();
+
+      double rate = 1000 * (double)numMessages / (end - start);
+
+      System.out.println("Rate of " + rate + " msgs / sec");
+
+      server.stop();
+   }
+   
    public void runObjectMessage() throws Exception
    {
       startServer();
@@ -259,18 +392,24 @@
       
       log.info("sending messages");
                              
+      
+      
       for (int i = 0; i < warmup; i++)
       {
-         ObjectMessage om = sess.createObjectMessage(str);
+//         ObjectMessage om = sess.createObjectMessage(str);
+//         
+//         prod.send(om);
          
-         prod.send(om);
+         TextMessage tm = sess.createTextMessage(str);
+         
+         prod.send(tm);
 
          if (i % 10000 == 0)
          {
             log.info("sent " + i);
          }
          
-         om.setObject(str);
+         //om.setObject(str);
       }
       
       log.info("** WARMUP DONE");
@@ -279,18 +418,24 @@
             
       long start = System.currentTimeMillis();
 
+      
+      
       for (int i = 0; i < numMessages; i++)
-      {
-         ObjectMessage om = sess.createObjectMessage(str);
+      {                 
+//         ObjectMessage om = sess.createObjectMessage(str);
+//         
+//         prod.send(om);
          
-         prod.send(om);
+         TextMessage tm = sess.createTextMessage(str);
+         
+         prod.send(tm);
 
          if (i % 10000 == 0)
          {
             log.info("sent " + i);
          }
          
-         om.setObject(str);
+         //om.setObject(str);
       }
 
       long end = System.currentTimeMillis();
@@ -302,4 +447,120 @@
       server.stop();
    }
    
+   public void runConsume() throws Exception
+   {
+      startServer();
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+
+      // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+      // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+      
+      params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+      //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+      //TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+      HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+      
+      Connection conn = cf.createConnection();
+
+      Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+      coreSession.createQueue("jms.queue.test_queue", "jms.queue.test_queue");
+            
+      Queue queue = new HornetQQueue("test_queue");
+      
+      MessageConsumer cons = sess.createConsumer(queue);
+                  
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      final int warmup = 50000;
+            
+      final int numMessages = 2000000;
+      
+      MessageListener listener = new MessageListener()
+      {
+         int count;
+         long start;
+         public void onMessage(Message message)
+         {
+            count++;
+            
+            log.info("got message " + ((HornetQMessage)message).getCoreMessage().getMessageID());
+            
+            if (count == warmup)
+            {
+               log.info("** WARMED UP");
+               
+               start = System.currentTimeMillis();
+            }
+            
+            if (count % 10000 == 0)
+            {
+               log.info("received " + count);
+            }
+            
+            if (count == numMessages + warmup)
+            {
+               long end = System.currentTimeMillis();
+
+               double rate = 1000 * (double)numMessages / (end - start);
+
+               System.out.println("Rate of " + rate + " msgs / sec");
+
+               latch.countDown();
+            }
+         }
+      };
+      
+      cons.setMessageListener(listener);
+
+      MessageProducer prod = sess.createProducer(queue);
+      
+      prod.setDisableMessageID(true);
+      
+      prod.setDisableMessageTimestamp(true);
+
+      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+      byte[] bytes = RandomUtil.randomBytes(1);
+
+      String str = new String(bytes);
+            
+      
+      //Load up the queue with messages
+      
+      TextMessage tm = sess.createTextMessage();
+      
+      tm.setText(str);
+      
+      log.info("loading queue with messages");
+      
+      for (int i = 0; i < numMessages + warmup; i++)
+      {        
+         prod.send(tm);
+         
+         if (i % 10000 == 0)
+         {
+            log.info("sent " + i);
+         }
+      }
+      
+      log.info("** loaded queue");
+      
+      conn.start();
+      
+      latch.await();
+      
+      sess.close();
+      
+      server.stop();
+   }
+   
 }

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-11-26 13:30:41 UTC (rev 8413)
@@ -353,7 +353,7 @@
 
    public String getTextMessage(ClientMessage m)
    {
-      //m.getBodyBuffer().resetReaderIndex();
+      m.getBodyBuffer().resetReaderIndex();
       return m.getBodyBuffer().readString();
    }
 



More information about the hornetq-commits mailing list