[hornetq-commits] JBoss hornetq SVN: r8335 - in branches/20-optimisation: src/main/org/hornetq/integration/transports/netty and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 20 05:05:37 EST 2009


Author: timfox
Date: 2009-11-20 05:05:37 -0500 (Fri, 20 Nov 2009)
New Revision: 8335

Added:
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
   branches/20-optimisation/tests/src/org/hornetq/tests/opt/
   branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
Log:
optimisation

Added: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java	                        (rev 0)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java	2009-11-20 10:05:37 UTC (rev 8335)
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A SessionReceiveLargeMessage
+ *
+ * @author Clebert Suconic
+ *
+ *
+ */
+public class SessionReceiveLargeMessage extends PacketImpl
+{
+   private byte[] largeMessageHeader;
+
+   /** Since we receive the message before the entire message was received, */
+   private long largeMessageSize;
+
+   private long consumerID;
+
+   private int deliveryCount;
+
+   public SessionReceiveLargeMessage()
+   {
+      super(SESS_RECEIVE_LARGE_MSG);
+   }
+
+   public SessionReceiveLargeMessage(final long consumerID,
+                                     final byte[] largeMessageHeader,
+                                     final long largeMessageSize,
+                                     final int deliveryCount)
+   {
+      super(SESS_RECEIVE_LARGE_MSG);
+
+      this.consumerID = consumerID;
+
+      this.largeMessageHeader = largeMessageHeader;
+
+      this.deliveryCount = deliveryCount;
+
+      this.largeMessageSize = largeMessageSize;
+   }
+
+   public byte[] getLargeMessageHeader()
+   {
+      return largeMessageHeader;
+   }
+
+   public long getConsumerID()
+   {
+      return consumerID;
+   }
+   
+   public int getDeliveryCount()
+   {
+      return deliveryCount;
+   }
+
+   /**
+    * @return the largeMessageSize
+    */
+   public long getLargeMessageSize()
+   {
+      return largeMessageSize;
+   }
+   
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(consumerID);
+      buffer.writeInt(deliveryCount);
+      buffer.writeLong(largeMessageSize);
+      buffer.writeInt(largeMessageHeader.length);
+      buffer.writeBytes(largeMessageHeader);
+   }
+
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      consumerID = buffer.readLong();
+      deliveryCount = buffer.readInt();
+      largeMessageSize = buffer.readLong();
+      int size = buffer.readInt();
+      largeMessageHeader = new byte[size];
+      buffer.readBytes(largeMessageHeader);
+   }
+
+}

Added: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	                        (rev 0)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2009-11-20 10:05:37 UTC (rev 8335)
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import static org.hornetq.utils.DataConstants.SIZE_INT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.hornetq.core.remoting.impl.AbstractBufferHandler;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+
+/**
+ * A Netty decoder used to decode messages.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tlee at redhat.com">Trustin Lee</a>
+ *
+ * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
+ */
+ at ChannelPipelineCoverage("one")
+public class HornetQFrameDecoder2 extends SimpleChannelUpstreamHandler
+{
+   private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
+
+   // SimpleChannelUpstreamHandler overrides
+   // -------------------------------------------------------------------------------------
+
+   @Override
+   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+   {
+      ChannelBuffer in = (ChannelBuffer) e.getMessage();
+      if (previousData.readable())
+      {
+         if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
+            append(in, 512); // Length is unknown. Bet at 512.
+            return;
+         }
+         
+         // Decode the first message.  The first message requires a special
+         // treatment because it is the only message that spans over the two
+         // buffers.
+         final int length;
+         final ChannelBuffer frame;
+         switch (previousData.readableBytes()) {
+            case 1:
+               length = (previousData.getUnsignedByte(previousData.readerIndex()) << 24) |
+                        in.getMedium(in.readerIndex());
+               if (in.readableBytes() - 3 < length) {
+                  append(in, length);
+                  return;
+               } else {
+                  frame = in.slice(in.readerIndex() + 3, length);
+                  in.skipBytes(length + 3);
+               }
+               break;
+            case 2:
+               length = (previousData.getUnsignedShort(previousData.readerIndex()) << 16) |
+                        in.getUnsignedShort(in.readerIndex());
+               if (in.readableBytes() - 2 < length) {
+                  append(in, length);
+                  return;
+               } else {
+                  frame = in.slice(in.readerIndex() + 2, length);
+                  in.skipBytes(length + 2);
+               }
+               break;
+            case 3:
+               length = (previousData.getUnsignedMedium(previousData.readerIndex()) << 8) |
+                        in.getUnsignedByte(in.readerIndex());
+               if (in.readableBytes() - 1 < length) {
+                  append(in, length);
+                  return;
+               } else {
+                  frame = in.slice(in.readerIndex() + 1, length);
+                  in.skipBytes(length + 1);
+               }
+               break;
+            case 4:
+               length = previousData.getInt(previousData.readerIndex());
+               if (in.readableBytes() - 4 < length) {
+                  append(in, length);
+                  return;
+               } else {
+                  frame = in.slice(in.readerIndex(), length);
+                  in.skipBytes(length);
+               }
+               break;
+            default:
+               length = previousData.getInt(previousData.readerIndex());
+               if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
+                  append(in, length);
+                  return;
+               } else {
+                  if (previousData instanceof DynamicChannelBuffer) {
+                     // It's safe to reuse the current dynamic buffer
+                     // because previousData will be reassigned to
+                     // EMPTY_BUFFER or 'in' later.
+                     previousData.skipBytes(4);
+                     previousData.writeBytes(in, length - previousData.readableBytes());
+                     frame = previousData.slice();
+                  } else {
+                     frame = ChannelBuffers.buffer(length);
+                     frame.writeBytes(previousData, previousData.readerIndex() + 4, previousData.readableBytes() - 4);
+                     frame.writeBytes(in, length - frame.writerIndex());
+                  }
+               }
+         }
+         
+         Channels.fireMessageReceived(ctx, frame);
+
+         if (!in.readable()) {
+            previousData = ChannelBuffers.EMPTY_BUFFER;
+            return;
+         }
+      }
+
+      // And then handle the rest - we don't need to deal with the
+      // composite buffer anymore because the second or later messages
+      // always belong to the second buffer.
+      decode(ctx, in);
+      
+      // Handle the leftover.
+      if (in.readable())
+      {
+         previousData = in;
+      }
+      else
+      {
+         previousData = ChannelBuffers.EMPTY_BUFFER;
+      }
+   }
+   
+   private void decode(ChannelHandlerContext ctx, ChannelBuffer in)
+   {
+      for (;;) {
+         final int readableBytes = in.readableBytes();
+         if (readableBytes < SIZE_INT) {
+            break;
+         }
+         
+         final int length = in.getInt(in.readerIndex());
+         if (readableBytes < length + SIZE_INT) {
+            break;
+         }
+         
+         ChannelBuffer frame = in.slice(in.readerIndex() + SIZE_INT, length);
+         in.skipBytes(SIZE_INT + length);
+         Channels.fireMessageReceived(ctx, frame);
+      }
+   }
+   
+   private void append(ChannelBuffer in, int length)
+   {
+      // Need more data to decode the first message. This can happen when
+      // a client is very slow. (e.g.sending each byte one by one)
+      if (previousData instanceof DynamicChannelBuffer)
+      {
+         previousData.writeBytes(in);
+      }
+      else
+      {
+         ChannelBuffer newPreviousData =
+             ChannelBuffers.dynamicBuffer(
+                 Math.max(previousData.readableBytes() + in.readableBytes(), length + 4));
+         newPreviousData.writeBytes(previousData);
+         newPreviousData.writeBytes(in);
+         previousData = newPreviousData;
+      }
+   }
+
+   public static void main(String[] args) throws Exception {
+      final boolean useNextGeneration = true;
+      final ChannelUpstreamHandler handler;
+
+      if (useNextGeneration) {
+         handler = new HornetQFrameDecoder2();
+      } else {
+         handler = new HornetQFrameDecoder(new AbstractBufferHandler()
+         {
+            public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+            { // noop
+            }
+         });
+      }
+
+      final DecoderEmbedder<ChannelBuffer> decoder =
+         new DecoderEmbedder<ChannelBuffer>(handler);
+
+      ChannelBuffer src = ChannelBuffers.buffer(30000 * 1004);
+      while (src.writerIndex() < src.capacity()) {
+         src.writeInt(1000);
+         src.writeZero(1000);
+      }
+
+      Random rand = new Random();
+      List<ChannelBuffer> packets = new ArrayList<ChannelBuffer>();
+      for (int i = 0; i < src.capacity();) {
+         int length = Math.min(rand.nextInt(3000), src.capacity() - i);
+         packets.add(src.copy(i, length));
+         i += length;
+      }
+      
+      long startTime = System.nanoTime();
+         
+      for (int i = 0; i < 100; i ++) {
+         int cnt = 0;
+         for (ChannelBuffer p: packets) {
+            decoder.offer(p.duplicate());
+            for (;;) {
+               ChannelBuffer frame = decoder.poll();
+               if (frame == null) {
+                  break;
+               }
+               if (frame.readableBytes() != 1000) {
+                  System.out.println("ARGH 1: " + frame.readableBytes());
+               }
+               cnt ++;
+            }
+         }
+         if (cnt != 30000) {
+            System.out.println("ARGH 2: " + cnt);
+         }
+      }
+      
+      long endTime = System.nanoTime();
+      System.out.println(
+          handler.getClass().getSimpleName() + ": " + 
+          (endTime - startTime) / 1000000);
+   }
+}

Added: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java	                        (rev 0)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java	2009-11-20 10:05:37 UTC (rev 8335)
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.opt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A SendTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class SendTest
+{
+   private static final Logger log = Logger.getLogger(SendTest.class);
+
+   public static void main(String[] args)
+   {
+      try
+      {
+         new SendTest().runTextMessage();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private HornetQServer server;
+
+   private void startServer() throws Exception
+   {
+      log.info("*** Starting server");
+
+      System.setProperty("org.hornetq.opt.dontadd", "true");
+     // System.setProperty("org.hornetq.opt.routeblast", "true");
+
+      Configuration configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      configuration.setJMXManagementEnabled(false);
+      configuration.setJournalMinFiles(10);
+      
+      configuration.setPersistenceEnabled(false);
+      configuration.setFileDeploymentEnabled(false);
+      //configuration.setJournalFlushOnSync(true);
+     // configuration.setRunSyncSpeedTest(true);
+      //configuration.setJournalPerfBlastPages(10000);
+
+      configuration.setJournalType(JournalType.NIO);
+      
+      //configuration.setLogJournalWriteRate(true);
+      //configuration.setRunSyncSpeedTest(true);
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+      TransportConfiguration transportConfig1 = new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
+                                                                           params);
+      TransportConfiguration transportConfig2 = new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(),
+                                                                           null);
+      configuration.getAcceptorConfigurations().add(transportConfig1);
+      configuration.getAcceptorConfigurations().add(transportConfig2);
+
+      server = HornetQ.newHornetQServer(configuration);
+
+      server.start();
+
+      log.info("Started server");
+
+   }
+   
+   public void runRouteBlast() throws Exception
+   {
+      this.startServer();
+   }
+
+   public void runTextMessage() throws Exception
+   {
+      startServer();
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+
+      // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+      // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+      
+      params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+      //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+      //TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+      HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+      
+      cf.setProducerWindowSize(1024 * 1024);
+
+      Connection conn = cf.createConnection();
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+      coreSession.createQueue("jms.queue.test_queue", "jms.queue.test_queue");
+
+      Queue queue = new HornetQQueue("test_queue");
+
+      MessageProducer prod = sess.createProducer(queue);
+      
+      prod.setDisableMessageID(true);
+      
+      prod.setDisableMessageTimestamp(true);
+
+      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+      byte[] bytes1 = new byte[] { (byte)'A', (byte)'B',(byte)'C',(byte)'D'};
+      
+      String s = new String(bytes1);
+      
+      System.out.println("Str is " + s);
+      
+      byte[] bytes = RandomUtil.randomBytes(512);
+
+      String str = new String(bytes);
+      
+      final int warmup = 50000;
+      
+      log.info("Warming up");
+      
+      TextMessage tm = sess.createTextMessage();
+      
+      tm.setText(str);
+                             
+      for (int i = 0; i < warmup; i++)
+      {                  
+         prod.send(tm);
+
+         if (i % 10000 == 0)
+         {
+            log.info("sent " + i);
+         }
+      }
+      
+      log.info("** WARMUP DONE");
+      
+      final int numMessages = 1000000;
+      
+      tm = sess.createTextMessage();
+
+      tm.setText(str);
+      
+      long start = System.currentTimeMillis();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         prod.send(tm);
+
+         if (i % 10000 == 0)
+         {
+            log.info("sent " + i);
+         }
+      }
+      
+      sess.close();
+
+      long end = System.currentTimeMillis();
+
+      double rate = 1000 * (double)numMessages / (end - start);
+
+      System.out.println("Rate of " + rate + " msgs / sec");
+
+      server.stop();
+   }
+   
+   public void runObjectMessage() throws Exception
+   {
+      startServer();
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+
+      // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+      // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+      
+      params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+      //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+      //TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+      HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+      
+      cf.setProducerWindowSize(1024 * 1024);
+
+      Connection conn = cf.createConnection();
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+      coreSession.createQueue("jms.queue.test_queue", "jms.queue.test_queue");
+
+      Queue queue = new HornetQQueue("test_queue");
+
+      MessageProducer prod = sess.createProducer(queue);
+      
+      prod.setDisableMessageID(true);
+      
+      prod.setDisableMessageTimestamp(true);
+
+      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      
+      byte[] bytes = RandomUtil.randomBytes(512);
+
+      String str = new String(bytes);
+      
+      log.info("str length " + str.length());
+
+      final int warmup = 50000;
+      
+      log.info("sending messages");
+                             
+      for (int i = 0; i < warmup; i++)
+      {
+         ObjectMessage om = sess.createObjectMessage(str);
+         
+         prod.send(om);
+
+         if (i % 10000 == 0)
+         {
+            log.info("sent " + i);
+         }
+         
+         om.setObject(str);
+      }
+      
+      log.info("** WARMUP DONE");
+      
+      final int numMessages = 500000;
+            
+      long start = System.currentTimeMillis();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ObjectMessage om = sess.createObjectMessage(str);
+         
+         prod.send(om);
+
+         if (i % 10000 == 0)
+         {
+            log.info("sent " + i);
+         }
+         
+         om.setObject(str);
+      }
+
+      long end = System.currentTimeMillis();
+
+      double rate = 1000 * (double)numMessages / (end - start);
+
+      System.out.println("Rate of " + rate + " msgs / sec");
+
+      server.stop();
+   }
+   
+}



More information about the hornetq-commits mailing list