Author: timfox
Date: 2009-11-20 05:05:37 -0500 (Fri, 20 Nov 2009)
New Revision: 8335
Added:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
branches/20-optimisation/tests/src/org/hornetq/tests/opt/
branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
Log:
optimisation
Added:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java
(rev 0)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveLargeMessage.java 2009-11-20
10:05:37 UTC (rev 8335)
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A SessionReceiveLargeMessage
+ *
+ * @author Clebert Suconic
+ *
+ *
+ */
+public class SessionReceiveLargeMessage extends PacketImpl
+{
+ private byte[] largeMessageHeader;
+
+ /** Since we receive the message before the entire message was received, */
+ private long largeMessageSize;
+
+ private long consumerID;
+
+ private int deliveryCount;
+
+ public SessionReceiveLargeMessage()
+ {
+ super(SESS_RECEIVE_LARGE_MSG);
+ }
+
+ public SessionReceiveLargeMessage(final long consumerID,
+ final byte[] largeMessageHeader,
+ final long largeMessageSize,
+ final int deliveryCount)
+ {
+ super(SESS_RECEIVE_LARGE_MSG);
+
+ this.consumerID = consumerID;
+
+ this.largeMessageHeader = largeMessageHeader;
+
+ this.deliveryCount = deliveryCount;
+
+ this.largeMessageSize = largeMessageSize;
+ }
+
+ public byte[] getLargeMessageHeader()
+ {
+ return largeMessageHeader;
+ }
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public int getDeliveryCount()
+ {
+ return deliveryCount;
+ }
+
+ /**
+ * @return the largeMessageSize
+ */
+ public long getLargeMessageSize()
+ {
+ return largeMessageSize;
+ }
+
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+ buffer.writeInt(deliveryCount);
+ buffer.writeLong(largeMessageSize);
+ buffer.writeInt(largeMessageHeader.length);
+ buffer.writeBytes(largeMessageHeader);
+ }
+
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ consumerID = buffer.readLong();
+ deliveryCount = buffer.readInt();
+ largeMessageSize = buffer.readLong();
+ int size = buffer.readInt();
+ largeMessageHeader = new byte[size];
+ buffer.readBytes(largeMessageHeader);
+ }
+
+}
Added:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
(rev 0)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20
10:05:37 UTC (rev 8335)
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import static org.hornetq.utils.DataConstants.SIZE_INT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.hornetq.core.remoting.impl.AbstractBufferHandler;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+
+/**
+ * A Netty decoder used to decode messages.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
+ * @author <a href="tlee(a)redhat.com">Trustin Lee</a>
+ *
+ * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
+ */
+@ChannelPipelineCoverage("one")
+public class HornetQFrameDecoder2 extends SimpleChannelUpstreamHandler
+{
+ private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
+
+ // SimpleChannelUpstreamHandler overrides
+ //
-------------------------------------------------------------------------------------
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ {
+ ChannelBuffer in = (ChannelBuffer) e.getMessage();
+ if (previousData.readable())
+ {
+ if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
+ append(in, 512); // Length is unknown. Bet at 512.
+ return;
+ }
+
+ // Decode the first message. The first message requires a special
+ // treatment because it is the only message that spans over the two
+ // buffers.
+ final int length;
+ final ChannelBuffer frame;
+ switch (previousData.readableBytes()) {
+ case 1:
+ length = (previousData.getUnsignedByte(previousData.readerIndex())
<< 24) |
+ in.getMedium(in.readerIndex());
+ if (in.readableBytes() - 3 < length) {
+ append(in, length);
+ return;
+ } else {
+ frame = in.slice(in.readerIndex() + 3, length);
+ in.skipBytes(length + 3);
+ }
+ break;
+ case 2:
+ length = (previousData.getUnsignedShort(previousData.readerIndex())
<< 16) |
+ in.getUnsignedShort(in.readerIndex());
+ if (in.readableBytes() - 2 < length) {
+ append(in, length);
+ return;
+ } else {
+ frame = in.slice(in.readerIndex() + 2, length);
+ in.skipBytes(length + 2);
+ }
+ break;
+ case 3:
+ length = (previousData.getUnsignedMedium(previousData.readerIndex())
<< 8) |
+ in.getUnsignedByte(in.readerIndex());
+ if (in.readableBytes() - 1 < length) {
+ append(in, length);
+ return;
+ } else {
+ frame = in.slice(in.readerIndex() + 1, length);
+ in.skipBytes(length + 1);
+ }
+ break;
+ case 4:
+ length = previousData.getInt(previousData.readerIndex());
+ if (in.readableBytes() - 4 < length) {
+ append(in, length);
+ return;
+ } else {
+ frame = in.slice(in.readerIndex(), length);
+ in.skipBytes(length);
+ }
+ break;
+ default:
+ length = previousData.getInt(previousData.readerIndex());
+ if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
+ append(in, length);
+ return;
+ } else {
+ if (previousData instanceof DynamicChannelBuffer) {
+ // It's safe to reuse the current dynamic buffer
+ // because previousData will be reassigned to
+ // EMPTY_BUFFER or 'in' later.
+ previousData.skipBytes(4);
+ previousData.writeBytes(in, length - previousData.readableBytes());
+ frame = previousData.slice();
+ } else {
+ frame = ChannelBuffers.buffer(length);
+ frame.writeBytes(previousData, previousData.readerIndex() + 4,
previousData.readableBytes() - 4);
+ frame.writeBytes(in, length - frame.writerIndex());
+ }
+ }
+ }
+
+ Channels.fireMessageReceived(ctx, frame);
+
+ if (!in.readable()) {
+ previousData = ChannelBuffers.EMPTY_BUFFER;
+ return;
+ }
+ }
+
+ // And then handle the rest - we don't need to deal with the
+ // composite buffer anymore because the second or later messages
+ // always belong to the second buffer.
+ decode(ctx, in);
+
+ // Handle the leftover.
+ if (in.readable())
+ {
+ previousData = in;
+ }
+ else
+ {
+ previousData = ChannelBuffers.EMPTY_BUFFER;
+ }
+ }
+
+ private void decode(ChannelHandlerContext ctx, ChannelBuffer in)
+ {
+ for (;;) {
+ final int readableBytes = in.readableBytes();
+ if (readableBytes < SIZE_INT) {
+ break;
+ }
+
+ final int length = in.getInt(in.readerIndex());
+ if (readableBytes < length + SIZE_INT) {
+ break;
+ }
+
+ ChannelBuffer frame = in.slice(in.readerIndex() + SIZE_INT, length);
+ in.skipBytes(SIZE_INT + length);
+ Channels.fireMessageReceived(ctx, frame);
+ }
+ }
+
+ private void append(ChannelBuffer in, int length)
+ {
+ // Need more data to decode the first message. This can happen when
+ // a client is very slow. (e.g.sending each byte one by one)
+ if (previousData instanceof DynamicChannelBuffer)
+ {
+ previousData.writeBytes(in);
+ }
+ else
+ {
+ ChannelBuffer newPreviousData =
+ ChannelBuffers.dynamicBuffer(
+ Math.max(previousData.readableBytes() + in.readableBytes(), length +
4));
+ newPreviousData.writeBytes(previousData);
+ newPreviousData.writeBytes(in);
+ previousData = newPreviousData;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ final boolean useNextGeneration = true;
+ final ChannelUpstreamHandler handler;
+
+ if (useNextGeneration) {
+ handler = new HornetQFrameDecoder2();
+ } else {
+ handler = new HornetQFrameDecoder(new AbstractBufferHandler()
+ {
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ { // noop
+ }
+ });
+ }
+
+ final DecoderEmbedder<ChannelBuffer> decoder =
+ new DecoderEmbedder<ChannelBuffer>(handler);
+
+ ChannelBuffer src = ChannelBuffers.buffer(30000 * 1004);
+ while (src.writerIndex() < src.capacity()) {
+ src.writeInt(1000);
+ src.writeZero(1000);
+ }
+
+ Random rand = new Random();
+ List<ChannelBuffer> packets = new ArrayList<ChannelBuffer>();
+ for (int i = 0; i < src.capacity();) {
+ int length = Math.min(rand.nextInt(3000), src.capacity() - i);
+ packets.add(src.copy(i, length));
+ i += length;
+ }
+
+ long startTime = System.nanoTime();
+
+ for (int i = 0; i < 100; i ++) {
+ int cnt = 0;
+ for (ChannelBuffer p: packets) {
+ decoder.offer(p.duplicate());
+ for (;;) {
+ ChannelBuffer frame = decoder.poll();
+ if (frame == null) {
+ break;
+ }
+ if (frame.readableBytes() != 1000) {
+ System.out.println("ARGH 1: " + frame.readableBytes());
+ }
+ cnt ++;
+ }
+ }
+ if (cnt != 30000) {
+ System.out.println("ARGH 2: " + cnt);
+ }
+ }
+
+ long endTime = System.nanoTime();
+ System.out.println(
+ handler.getClass().getSimpleName() + ": " +
+ (endTime - startTime) / 1000000);
+ }
+}
Added: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
(rev 0)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-20
10:05:37 UTC (rev 8335)
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.opt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A SendTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class SendTest
+{
+ private static final Logger log = Logger.getLogger(SendTest.class);
+
+ public static void main(String[] args)
+ {
+ try
+ {
+ new SendTest().runTextMessage();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private HornetQServer server;
+
+ private void startServer() throws Exception
+ {
+ log.info("*** Starting server");
+
+ System.setProperty("org.hornetq.opt.dontadd", "true");
+ // System.setProperty("org.hornetq.opt.routeblast", "true");
+
+ Configuration configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ configuration.setJMXManagementEnabled(false);
+ configuration.setJournalMinFiles(10);
+
+ configuration.setPersistenceEnabled(false);
+ configuration.setFileDeploymentEnabled(false);
+ //configuration.setJournalFlushOnSync(true);
+ // configuration.setRunSyncSpeedTest(true);
+ //configuration.setJournalPerfBlastPages(10000);
+
+ configuration.setJournalType(JournalType.NIO);
+
+ //configuration.setLogJournalWriteRate(true);
+ //configuration.setRunSyncSpeedTest(true);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+ TransportConfiguration transportConfig1 = new
TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
+ params);
+ TransportConfiguration transportConfig2 = new
TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(),
+ null);
+ configuration.getAcceptorConfigurations().add(transportConfig1);
+ configuration.getAcceptorConfigurations().add(transportConfig2);
+
+ server = HornetQ.newHornetQServer(configuration);
+
+ server.start();
+
+ log.info("Started server");
+
+ }
+
+ public void runRouteBlast() throws Exception
+ {
+ this.startServer();
+ }
+
+ public void runTextMessage() throws Exception
+ {
+ startServer();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+ // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+
+ params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+ //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+ TransportConfiguration tc = new
TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+ //TransportConfiguration tc = new
TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+
+ cf.setProducerWindowSize(1024 * 1024);
+
+ Connection conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ coreSession.createQueue("jms.queue.test_queue",
"jms.queue.test_queue");
+
+ Queue queue = new HornetQQueue("test_queue");
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDisableMessageID(true);
+
+ prod.setDisableMessageTimestamp(true);
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ byte[] bytes1 = new byte[] { (byte)'A',
(byte)'B',(byte)'C',(byte)'D'};
+
+ String s = new String(bytes1);
+
+ System.out.println("Str is " + s);
+
+ byte[] bytes = RandomUtil.randomBytes(512);
+
+ String str = new String(bytes);
+
+ final int warmup = 50000;
+
+ log.info("Warming up");
+
+ TextMessage tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ for (int i = 0; i < warmup; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ log.info("** WARMUP DONE");
+
+ final int numMessages = 1000000;
+
+ tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ sess.close();
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ System.out.println("Rate of " + rate + " msgs / sec");
+
+ server.stop();
+ }
+
+ public void runObjectMessage() throws Exception
+ {
+ startServer();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+ // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+
+ params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+ //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+ TransportConfiguration tc = new
TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+ //TransportConfiguration tc = new
TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+
+ cf.setProducerWindowSize(1024 * 1024);
+
+ Connection conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ coreSession.createQueue("jms.queue.test_queue",
"jms.queue.test_queue");
+
+ Queue queue = new HornetQQueue("test_queue");
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDisableMessageID(true);
+
+ prod.setDisableMessageTimestamp(true);
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ byte[] bytes = RandomUtil.randomBytes(512);
+
+ String str = new String(bytes);
+
+ log.info("str length " + str.length());
+
+ final int warmup = 50000;
+
+ log.info("sending messages");
+
+ for (int i = 0; i < warmup; i++)
+ {
+ ObjectMessage om = sess.createObjectMessage(str);
+
+ prod.send(om);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+
+ om.setObject(str);
+ }
+
+ log.info("** WARMUP DONE");
+
+ final int numMessages = 500000;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ObjectMessage om = sess.createObjectMessage(str);
+
+ prod.send(om);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+
+ om.setObject(str);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ System.out.println("Rate of " + rate + " msgs / sec");
+
+ server.stop();
+ }
+
+}