Author: timfox
Date: 2009-11-22 13:49:03 -0500 (Sun, 22 Nov 2009)
New Revision: 8368
Added:
branches/20-optimisation/src/main/org/hornetq/core/buffers/ResetLimitWrappedHornetQBuffer.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java
Log:
optimisation
Added:
branches/20-optimisation/src/main/org/hornetq/core/buffers/ResetLimitWrappedHornetQBuffer.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/buffers/ResetLimitWrappedHornetQBuffer.java
(rev 0)
+++
branches/20-optimisation/src/main/org/hornetq/core/buffers/ResetLimitWrappedHornetQBuffer.java 2009-11-22
18:49:03 UTC (rev 8368)
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.buffers;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A ResetLimitWrappedHornetQBuffer
+ *
+ * @author Tim Fox
+ *
+ */
+public class ResetLimitWrappedHornetQBuffer implements HornetQBuffer
+{
+ private static final Logger log =
Logger.getLogger(ResetLimitWrappedHornetQBuffer.class);
+
+ private final int limit;
+
+ private HornetQBuffer buffer;
+
+ public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer)
+ {
+ this.limit = limit;
+
+ this.buffer = buffer;
+
+ this.resetReaderIndex();
+ }
+
+ public void setBuffer(HornetQBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public byte[] array()
+ {
+ return buffer.array();
+ }
+
+ public int capacity()
+ {
+ return buffer.capacity();
+ }
+
+ public void clear()
+ {
+ buffer.clear();
+
+ buffer.setIndex(limit, limit);
+ }
+
+ public HornetQBuffer copy()
+ {
+ return buffer.copy();
+ }
+
+ public Object getUnderlyingBuffer()
+ {
+ return buffer.getUnderlyingBuffer();
+ }
+
+ public boolean readable()
+ {
+ return buffer.readable();
+ }
+
+ public int readableBytes()
+ {
+ return buffer.readableBytes();
+ }
+
+ public boolean readBoolean()
+ {
+ return buffer.readBoolean();
+ }
+
+ public byte readByte()
+ {
+ return buffer.readByte();
+ }
+
+ public void readBytes(byte[] bytes, int offset, int length)
+ {
+ buffer.readBytes(bytes, offset, length);
+ }
+
+ public void readBytes(byte[] bytes)
+ {
+ buffer.readBytes(bytes);
+ }
+
+ public char readChar()
+ {
+ return buffer.readChar();
+ }
+
+ public double readDouble()
+ {
+ return buffer.readDouble();
+ }
+
+ public int readerIndex()
+ {
+ return buffer.readerIndex();
+ }
+
+ public void readerIndex(int readerIndex)
+ {
+ if (readerIndex < limit)
+ {
+ readerIndex = limit;
+ }
+
+ buffer.readerIndex(readerIndex);
+ }
+
+ public float readFloat()
+ {
+ return buffer.readFloat();
+ }
+
+ public int readInt()
+ {
+ return buffer.readInt();
+ }
+
+ public int readInt(int pos)
+ {
+ return buffer.readInt(pos);
+ }
+
+ public long readLong()
+ {
+ return buffer.readLong();
+ }
+
+ public SimpleString readNullableSimpleString()
+ {
+ return buffer.readNullableSimpleString();
+ }
+
+ public String readNullableString()
+ {
+ return buffer.readNullableString();
+ }
+
+ public short readShort()
+ {
+ return buffer.readShort();
+ }
+
+ public SimpleString readSimpleString()
+ {
+ return buffer.readSimpleString();
+ }
+
+ public String readString()
+ {
+ return buffer.readString();
+ }
+
+ public short readUnsignedByte()
+ {
+ return buffer.readUnsignedByte();
+ }
+
+ public int readUnsignedShort()
+ {
+ return buffer.readUnsignedShort();
+ }
+
+ public String readUTF() throws Exception
+ {
+ return buffer.readUTF();
+ }
+
+ public void resetReaderIndex()
+ {
+ buffer.readerIndex(limit);
+ }
+
+ public void resetWriterIndex()
+ {
+ buffer.writerIndex(limit);
+ }
+
+ public void setIndex(int readerIndex, int writerIndex)
+ {
+ if (readerIndex < limit)
+ {
+ readerIndex = limit;
+ }
+ if (writerIndex < limit)
+ {
+ writerIndex = limit;
+ }
+ buffer.setIndex(readerIndex, writerIndex);
+ }
+
+ public void setInt(int pos, int val)
+ {
+ buffer.setInt(pos, val);
+ }
+
+ public HornetQBuffer slice(int index, int length)
+ {
+ return buffer.slice(index, length);
+ }
+
+ public boolean writable()
+ {
+ return buffer.writable();
+ }
+
+ public int writableBytes()
+ {
+ return buffer.writableBytes();
+ }
+
+ public void writeBoolean(boolean val)
+ {
+ buffer.writeBoolean(val);
+ }
+
+ public void writeByte(byte val)
+ {
+ buffer.writeByte(val);
+ }
+
+ public void writeBytes(byte[] bytes, int offset, int length)
+ {
+ buffer.writeBytes(bytes, offset, length);
+ }
+
+ public void writeBytes(byte[] bytes)
+ {
+ buffer.writeBytes(bytes);
+ }
+
+ public void writeBytes(HornetQBuffer src, int srcIndex, int length)
+ {
+ buffer.writeBytes(src, srcIndex, length);
+ }
+
+ public void writeChar(char val)
+ {
+ buffer.writeChar(val);
+ }
+
+ public void writeDouble(double val)
+ {
+ buffer.writeDouble(val);
+ }
+
+ public void writeFloat(float val)
+ {
+ buffer.writeFloat(val);
+ }
+
+ public void writeInt(int val)
+ {
+ buffer.writeInt(val);
+ }
+
+ public void writeLong(long val)
+ {
+ buffer.writeLong(val);
+ }
+
+ public void writeNullableSimpleString(SimpleString val)
+ {
+ buffer.writeNullableSimpleString(val);
+ }
+
+ public void writeNullableString(String val)
+ {
+ buffer.writeNullableString(val);
+ }
+
+ public int writerIndex()
+ {
+ return buffer.writerIndex();
+ }
+
+ public void writerIndex(int writerIndex)
+ {
+ if (writerIndex < limit)
+ {
+ writerIndex = limit;
+ }
+ buffer.writerIndex(writerIndex);
+ }
+
+ public void writeShort(short val)
+ {
+ buffer.writeShort(val);
+ }
+
+ public void writeSimpleString(SimpleString val)
+ {
+ buffer.writeSimpleString(val);
+ }
+
+ public void writeString(String val)
+ {
+ buffer.writeString(val);
+ }
+
+ public void writeUTF(String utf) throws Exception
+ {
+ buffer.writeUTF(utf);
+ }
+
+}
Added:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
(rev 0)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2009-11-22
18:49:03 UTC (rev 8368)
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.DataConstants;
+
+public class InVMNonPersistentMessageBufferTest extends ServiceTestBase
+{
+ private static final Logger log =
Logger.getLogger(InVMNonPersistentMessageBufferTest.class);
+
+ public static final String address = "testaddress";
+
+ public static final String queueName = "testqueue";
+
+ private HornetQServer server;
+
+ private ClientSession session;
+
+ private ClientProducer producer;
+
+ private ClientConsumer consumer;
+
+ /*
+ * Test message can be read after being sent
+ * Message can be sent multiple times
+ * After sending, local message can be read
+ * After sending, local message body can be added to and sent
+ * When reset message body it should only reset to after packet headers
+ * Should not be able to read past end of body into encoded message
+ */
+
+ public void testSimpleSendReceive() throws Exception
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ final String body = RandomUtil.randomString();
+
+ message.getBodyBuffer().writeString(body);
+
+ ClientMessage received = sendAndReceive(message);
+
+ assertNotNull(received);
+
+ assertEquals(body, received.getBodyBuffer().readString());
+ }
+
+ public void testSendSameMessageMultipleTimes() throws Exception
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ final String body = RandomUtil.randomString();
+
+ message.getBodyBuffer().writeString(body);
+
+ int bodySize = message.getBodySize();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage received = sendAndReceive(message);
+
+ assertNotNull(received);
+
+ assertEquals(bodySize, received.getBodySize());
+
+ assertEquals(body, received.getBodyBuffer().readString());
+
+ assertFalse(received.getBodyBuffer().readable());
+ }
+ }
+
+ public void testSendMessageResetSendAgainDifferentBody() throws Exception
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ for (int i = 0; i < 10; i++)
+ {
+ final String body = RandomUtil.randomString();
+
+ message.getBodyBuffer().writeString(body);
+
+ int bodySize = message.getBodySize();
+
+ ClientMessage received = sendAndReceive(message);
+
+ assertNotNull(received);
+
+ assertEquals(bodySize, received.getBodySize());
+
+ assertEquals(body, received.getBodyBuffer().readString());
+
+ assertFalse(received.getBodyBuffer().readable());
+
+ message.getBodyBuffer().clear();
+
+ assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
message.getBodyBuffer().writerIndex());
+
+ assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
message.getBodyBuffer().readerIndex());
+ }
+ }
+
+ public void testCannotReadPastEndOfMessageBody() throws Exception
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ final String body = RandomUtil.randomString();
+
+ message.getBodyBuffer().writeString(body);
+
+ ClientMessage received = sendAndReceive(message);
+
+ assertNotNull(received);
+
+ assertEquals(body, received.getBodyBuffer().readString());
+
+ try
+ {
+ received.getBodyBuffer().readByte();
+
+ fail("Should throw exception");
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // OK
+ }
+ }
+
+ public void testCanReReadBodyAfterReaderReset() throws Exception
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ final String body = RandomUtil.randomString();
+
+ message.getBodyBuffer().writeString(body);
+
+ assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
message.getBodyBuffer().readerIndex());
+
+ String body2 = message.getBodyBuffer().readString();
+
+ assertEquals(body, body2);
+
+ message.getBodyBuffer().resetReaderIndex();
+
+ assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
message.getBodyBuffer().readerIndex());
+
+ String body3 = message.getBodyBuffer().readString();
+
+ assertEquals(body, body3);
+
+ ClientMessage received = sendAndReceive(message);
+
+ assertNotNull(received);
+
+ assertEquals(body, received.getBodyBuffer().readString());
+
+ received.getBodyBuffer().resetReaderIndex();
+
+ assertEquals(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
received.getBodyBuffer().readerIndex());
+
+ String body4 = received.getBodyBuffer().readString();
+
+ assertEquals(body, body4);
+
+ }
+
+ protected ClientSessionFactory createFactory()
+ {
+ if (isNetty())
+ {
+ return this.createNettyFactory();
+ }
+ else
+ {
+ return this.createInVMFactory();
+ }
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isPersistent()
+ {
+ return false;
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(isPersistent(), isNetty());
+
+ server.start();
+
+ ClientSessionFactory cf = createFactory();
+
+ session = cf.createSession();
+
+ session.createQueue(address, queueName);
+
+ producer = session.createProducer(address);
+
+ consumer = session.createConsumer(queueName);
+
+ session.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (session != null)
+ {
+ consumer.close();
+
+ session.deleteQueue(queueName);
+
+ session.close();
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+
+ super.tearDown();
+ }
+
+ private ClientMessage sendAndReceive(final ClientMessage message) throws Exception
+ {
+ producer.send(message);
+
+ ClientMessage received = consumer.receive(10000);
+
+ return received;
+ }
+
+}
Added:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java
(rev 0)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java 2009-11-22
18:49:03 UTC (rev 8368)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+/**
+ * A InVMPersistentMessageBufferTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class InVMPersistentMessageBufferTest extends InVMNonPersistentMessageBufferTest
+{
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public boolean isNetty()
+ {
+ return false;
+ }
+}
Added:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java
(rev 0)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java 2009-11-22
18:49:03 UTC (rev 8368)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+/**
+ * A NettyNonPersistentMessageBufferTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class NettyNonPersistentMessageBufferTest extends
InVMNonPersistentMessageBufferTest
+{
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ public boolean isNetty()
+ {
+ return true;
+ }
+}
Added:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java
(rev 0)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java 2009-11-22
18:49:03 UTC (rev 8368)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+/**
+ * A NettyPersistentMessageBufferTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class NettyPersistentMessageBufferTest extends InVMNonPersistentMessageBufferTest
+{
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public boolean isNetty()
+ {
+ return true;
+ }
+}