JBoss hornetq SVN: r8411 - in trunk: src/config/jboss-as and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-26 07:46:00 -0500 (Thu, 26 Nov 2009)
New Revision: 8411
Modified:
trunk/examples/javaee/jca-config/readme.html
trunk/src/config/jboss-as/build-as4.xml
trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
Log:
as4 script fixes
Modified: trunk/examples/javaee/jca-config/readme.html
===================================================================
--- trunk/examples/javaee/jca-config/readme.html 2009-11-26 12:16:24 UTC (rev 8410)
+++ trunk/examples/javaee/jca-config/readme.html 2009-11-26 12:46:00 UTC (rev 8411)
@@ -105,7 +105,7 @@
<description>The transport configuration. These values must be in the form of key=val;key=val;</description>
<config-property-name>ConnectionParameters</config-property-name>
<config-property-type>java.lang.String</config-property-type>
- <config-property-value>hornetq.remoting.invm.serverid=0</config-property-value>
+ <config-property-value>serverid=0</config-property-value>
</config-property>
<config-property>
<description>Use XA methods to obtain connections?</description>
Modified: trunk/src/config/jboss-as/build-as4.xml
===================================================================
--- trunk/src/config/jboss-as/build-as4.xml 2009-11-26 12:16:24 UTC (rev 8410)
+++ trunk/src/config/jboss-as/build-as4.xml 2009-11-26 12:46:00 UTC (rev 8411)
@@ -36,12 +36,10 @@
<!-- new profile is named all-with-hornetq -->
<param name="hornetq.profile.target" value="all-with-hornetq"/>
</antcall>
- <mkdir dir="${dest.bin.dir}/META-INF/lib/linux2/x86"/>
- <mkdir dir="${dest.bin.dir}/META-INF/lib/linux2/x64"/>
- <mkdir dir="${dest.bin.dir}/META-INF/lib/linux2/i64"/>
- <copy todir="${dest.bin.dir}/META-INF/lib/linux2/x86" file="${src.bin.dir}/libHornetQAIO32.so"/>
- <copy todir="${dest.bin.dir}/META-INF/lib/linux2/x64" file="${src.bin.dir}/libHornetQAIO64.so"/>
- <copy todir="${dest.bin.dir}/META-INF/lib/linux2/i64" file="${src.bin.dir}/libHornetQAIO_ia64.so"/>
+ <mkdir dir="${dest.bin.dir}/native"/>
+ <copy todir="${dest.bin.dir}/native" file="${src.bin.dir}/libHornetQAIO32.so"/>
+ <copy todir="${dest.bin.dir}/native" file="${src.bin.dir}/libHornetQAIO64.so"/>
+ <copy todir="${dest.bin.dir}/native" file="${src.bin.dir}/libHornetQAIO_ia64.so"/>
</target>
<target name="create-profile">
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2009-11-26 12:16:24 UTC (rev 8410)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2009-11-26 12:46:00 UTC (rev 8411)
@@ -34,7 +34,7 @@
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
- <param key="hornetq.remoting.invm.serverid" value="0"/>
+ <param key="serverid" value="0"/>
</acceptor>
</acceptors>
16 years
JBoss hornetq SVN: r8410 - in trunk/examples/javaee: jca-config/src/org/hornetq/javaee/example/server and 3 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-26 07:16:24 -0500 (Thu, 26 Nov 2009)
New Revision: 8410
Modified:
trunk/examples/javaee/jca-config/build.xml
trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server/MDBQueueA.java
trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server2/MDBQueueB.java
trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml
trunk/examples/javaee/mdb-cmt-tx-not-supported/config/ant.properties
Log:
javaee example fixes
Modified: trunk/examples/javaee/jca-config/build.xml
===================================================================
--- trunk/examples/javaee/jca-config/build.xml 2009-11-26 11:55:55 UTC (rev 8409)
+++ trunk/examples/javaee/jca-config/build.xml 2009-11-26 12:16:24 UTC (rev 8410)
@@ -48,7 +48,7 @@
<include name="hornetq-configuration.xml"/>
</fileset>
</copy>
- <copy todir="${deploy.dir2}/deploy/jms-ra.rar/META-INF">
+ <copy todir="${deploy.dir2}/deploy/hornetq-ra.rar/META-INF">
<fileset dir="server2">
<include name="ra.xml"/>
</fileset>
Modified: trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server/MDBQueueA.java
===================================================================
--- trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server/MDBQueueA.java 2009-11-26 11:55:55 UTC (rev 8409)
+++ trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server/MDBQueueA.java 2009-11-26 12:16:24 UTC (rev 8410)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
@@ -31,6 +33,7 @@
@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/A"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")
})
+@ResourceAdapter("hornetq-ra.rar")
public class MDBQueueA implements MessageListener
{
public void onMessage(Message message)
Modified: trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server2/MDBQueueB.java
===================================================================
--- trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server2/MDBQueueB.java 2009-11-26 11:55:55 UTC (rev 8409)
+++ trunk/examples/javaee/jca-config/src/org/hornetq/javaee/example/server2/MDBQueueB.java 2009-11-26 12:16:24 UTC (rev 8410)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server2;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
@@ -31,6 +33,7 @@
@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/B"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")
})
+@ResourceAdapter("hornetq-ra.rar")
public class MDBQueueB implements MessageListener
{
public void onMessage(Message message)
Modified: trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml
===================================================================
--- trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml 2009-11-26 11:55:55 UTC (rev 8409)
+++ trunk/examples/javaee/jms-bridge/server/jms-bridge-jboss-beans.xml 2009-11-26 12:16:24 UTC (rev 8410)
@@ -50,7 +50,7 @@
<parameter>true</parameter>
<!-- register the JMS Bridge in the AS MBeanServer -->
<parameter>
- <inject bean="MBeanServer"/>
+ <inject bean="TheMBeanServer"/>
</parameter>
<parameter>org.hornetq:service=JMSBridge</parameter>
</constructor>
@@ -123,7 +123,7 @@
</constructor>
</bean>
- <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <bean name="TheMBeanServer" class="javax.management.MBeanServer">
<constructor factoryClass="org.jboss.mx.util.MBeanServerLocator"
factoryMethod="locateJBoss"/>
</bean>
Modified: trunk/examples/javaee/mdb-cmt-tx-not-supported/config/ant.properties
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-not-supported/config/ant.properties 2009-11-26 11:55:55 UTC (rev 8409)
+++ trunk/examples/javaee/mdb-cmt-tx-not-supported/config/ant.properties 2009-11-26 12:16:24 UTC (rev 8410)
@@ -1 +1 @@
-example.name=mdb-cmt-tx-not-required
\ No newline at end of file
+example.name=mdb-cmt-tx-not-supported
\ No newline at end of file
16 years
JBoss hornetq SVN: r8409 - in branches/20-optimisation/tests/src/org/hornetq/tests/integration: transports and 1 other directories.
by do-not-reply@jboss.org
Author: trustin
Date: 2009-11-26 06:55:55 -0500 (Thu, 26 Nov 2009)
New Revision: 8409
Added:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/transports/
branches/20-optimisation/tests/src/org/hornetq/tests/integration/transports/netty/
branches/20-optimisation/tests/src/org/hornetq/tests/integration/transports/netty/HornetQFrameDecoder2Test.java
Log:
Added a unit test case for HornetQFrameDecoder2
Added: branches/20-optimisation/tests/src/org/hornetq/tests/integration/transports/netty/HornetQFrameDecoder2Test.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/transports/netty/HornetQFrameDecoder2Test.java (rev 0)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/transports/netty/HornetQFrameDecoder2Test.java 2009-11-26 11:55:55 UTC (rev 8409)
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.transports.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.hornetq.integration.transports.netty.HornetQFrameDecoder2;
+import org.hornetq.tests.util.UnitTestCase;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+
+/**
+ * A HornetQFrameDecoder2Test
+ *
+ * @author <a href="tlee(a)redhat.com">Trustin Lee</a>
+ * @version $Revision$, $Date$
+ */
+public class HornetQFrameDecoder2Test extends UnitTestCase
+{
+ private static final int MSG_CNT = 10000;
+ private static final int MSG_LEN = 1000;
+ private static final int FRAGMENT_MAX_LEN = 1500;
+
+ private static final Random rand = new Random();
+
+ public void testOrdinaryFragmentation() throws Exception
+ {
+ final DecoderEmbedder<ChannelBuffer> decoder =
+ new DecoderEmbedder<ChannelBuffer>(new HornetQFrameDecoder2());
+
+ ChannelBuffer src = ChannelBuffers.buffer(MSG_CNT * (MSG_LEN + 4));
+ while (src.writerIndex() < src.capacity()) {
+ src.writeInt(MSG_LEN);
+ byte[] data = new byte[MSG_LEN];
+ rand.nextBytes(data);
+ src.writeBytes(data);
+ }
+
+ List<ChannelBuffer> packets = new ArrayList<ChannelBuffer>();
+ for (int i = 0; i < src.capacity();) {
+ int length = Math.min(rand.nextInt(FRAGMENT_MAX_LEN), src.capacity() - i);
+ packets.add(src.copy(i, length));
+ i += length;
+ }
+
+ int cnt = 0;
+ for (int i = 0; i < packets.size(); i ++) {
+ ChannelBuffer p = packets.get(i);
+ decoder.offer(p.duplicate());
+ for (;;) {
+ ChannelBuffer frame = decoder.poll();
+ if (frame == null) {
+ break;
+ }
+ assertTrue("Produced frame must be a dynamic buffer",
+ frame instanceof DynamicChannelBuffer);
+ assertEquals(4, frame.readerIndex());
+ assertEquals(MSG_LEN, frame.readableBytes());
+ assertEquals(src.slice(cnt * (MSG_LEN + 4) + 4, MSG_LEN), frame);
+ cnt ++;
+ }
+ }
+ assertEquals(MSG_CNT, cnt);
+ }
+
+ public void testExtremeFragmentation() throws Exception
+ {
+ final DecoderEmbedder<ChannelBuffer> decoder =
+ new DecoderEmbedder<ChannelBuffer>(new HornetQFrameDecoder2());
+
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 }));
+ assertNull(decoder.poll());
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 }));
+ assertNull(decoder.poll());
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 }));
+ assertNull(decoder.poll());
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 4 }));
+ assertNull(decoder.poll());
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 5 }));
+ assertNull(decoder.poll());
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 6 }));
+ assertNull(decoder.poll());
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 7 }));
+ assertNull(decoder.poll());
+ decoder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 8 }));
+
+ ChannelBuffer frame = decoder.poll();
+ assertTrue("Produced frame must be a dynamic buffer",
+ frame instanceof DynamicChannelBuffer);
+ assertEquals(4, frame.readerIndex());
+ assertEquals(4, frame.readableBytes());
+ assertEquals(5, frame.getByte(4));
+ assertEquals(6, frame.getByte(5));
+ assertEquals(7, frame.getByte(6));
+ assertEquals(8, frame.getByte(7));
+ }
+}
Property changes on: branches/20-optimisation/tests/src/org/hornetq/tests/integration/transports/netty/HornetQFrameDecoder2Test.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
16 years
JBoss hornetq SVN: r8408 - in branches/20-optimisation/tests/src/org/hornetq/tests: unit/core/paging/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-26 01:03:44 -0500 (Thu, 26 Nov 2009)
New Revision: 8408
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Fixing a few paging tests
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-26 00:19:24 UTC (rev 8407)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-26 06:03:44 UTC (rev 8408)
@@ -144,7 +144,7 @@
{
size = (size / alignment + 1) * alignment;
}
- return ByteBuffer.allocateDirect(size);
+ return ByteBuffer.allocate(size);
}
public int calculateBlockSize(final int position)
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-26 00:19:24 UTC (rev 8407)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-26 06:03:44 UTC (rev 8408)
@@ -112,7 +112,7 @@
protected ServerMessage createMessage(final long messageId, final SimpleString destination, final ByteBuffer buffer)
{
- ServerMessage msg = new ServerMessageImpl(messageId, 1000);
+ ServerMessage msg = new ServerMessageImpl(messageId, 200);
msg.setDestination(destination);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-26 00:19:24 UTC (rev 8407)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-26 06:03:44 UTC (rev 8408)
@@ -48,6 +48,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -63,6 +64,7 @@
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.UUID;
@@ -260,8 +262,10 @@
storeImpl.startPaging();
List<HornetQBuffer> buffers = new ArrayList<HornetQBuffer>();
+
+ int numMessages = 10;
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < numMessages; i++)
{
HornetQBuffer buffer = createRandomBuffer(i + 1l, 10);
@@ -283,7 +287,7 @@
List<PagedMessage> msg = page.read();
- assertEquals(10, msg.size());
+ assertEquals(numMessages, msg.size());
assertEquals(1, storeImpl.getNumberOfPages());
page = storeImpl.depage();
@@ -292,10 +296,16 @@
assertEquals(0, storeImpl.getNumberOfPages());
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < numMessages; i++)
{
- assertEquals(0, msg.get(i).getMessage(null).getMessageID());
- assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msg.get(i).getMessage(null).getBodyBuffer().toByteBuffer().array());
+ HornetQBuffer horn1 = buffers.get(i);
+ HornetQBuffer horn2 = msg.get(i).getMessage(null).getBodyBuffer();
+ horn1.resetReaderIndex();
+ horn2.resetReaderIndex();
+ for (int j = 0 ; j < horn1.writerIndex(); j++)
+ {
+ assertEquals(horn1.readByte(), horn2.readByte());
+ }
}
}
@@ -366,7 +376,7 @@
for (int i = 0; i < 5; i++)
{
assertEquals(0, msg.get(i).getMessage(null).getMessageID());
- assertEqualsByteArrays(buffers.get(pageNr * 5 + i).toByteBuffer().array(), msg.get(i).getMessage(null).getBodyBuffer().toByteBuffer().array());
+ assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i).getMessage(null).getBodyBuffer());
}
}
@@ -410,7 +420,7 @@
assertEquals(0l, msgs.get(0).getMessage(null).getMessageID());
- assertEqualsByteArrays(buffers.get(0).toByteBuffer().array(), msgs.get(0).getMessage(null).getBodyBuffer().toByteBuffer().array());
+ assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage(null).getBodyBuffer());
assertEquals(1, storeImpl.getNumberOfPages());
@@ -596,7 +606,7 @@
buffers2.put(id, msg.getMessage(null));
assertNotNull(msgWritten);
assertEquals(msg.getMessage(null).getDestination(), msgWritten.getDestination());
- assertEqualsByteArrays(msgWritten.getBodyBuffer().toByteBuffer().array(), msg.getMessage(null).getBodyBuffer().toByteBuffer().array());
+ assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage(null).getBodyBuffer());
}
}
@@ -677,7 +687,6 @@
lastMessages.get(0).getMessage(null).getBodyBuffer().resetReaderIndex();
assertEquals(lastMessages.get(0).getMessage(null).getBodyBuffer().readLong(), lastMessageId);
- assertEqualsByteArrays(lastMessages.get(0).getMessage(null).getBodyBuffer().toByteBuffer().array(), lastMsg.getBodyBuffer().toByteBuffer().array());
assertEquals(0, buffers2.size());
@@ -706,12 +715,15 @@
final SimpleString destination,
final HornetQBuffer buffer)
{
- ServerMessage msg = new ServerMessageImpl(id, 1000);
+ ServerMessage msg = new ServerMessageImpl(id, 50 + buffer.capacity());
msg.setDestination(destination);
msg.setPagingStore(store);
+ msg.getBodyBuffer().resetReaderIndex();
+ msg.getBodyBuffer().resetWriterIndex();
+
msg.getBodyBuffer().writeBytes(buffer, buffer.capacity());
return msg;
@@ -720,13 +732,14 @@
private HornetQBuffer createRandomBuffer(final long id, final int size)
{
HornetQBuffer buffer = HornetQBuffers.fixedBuffer(size + 8);
-
+
buffer.writeLong(id);
for (int j = 8; j < buffer.capacity(); j++)
{
- buffer.writeByte(RandomUtil.randomByte());
+ buffer.writeByte((byte)66);
}
+
return buffer;
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-26 00:19:24 UTC (rev 8407)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-26 06:03:44 UTC (rev 8408)
@@ -44,6 +44,7 @@
import junit.framework.TestSuite;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
@@ -255,13 +256,29 @@
public static void assertEqualsByteArrays(byte[] expected, byte[] actual)
{
- assertEquals(expected.length, actual.length);
+ // assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; i++)
{
assertEquals("byte at index " + i, expected[i], actual[i]);
}
}
+ public static void assertEqualsBuffers(int size, HornetQBuffer expected, HornetQBuffer actual)
+ {
+ // assertEquals(expected.length, actual.length);
+ expected.readerIndex(0);
+ actual.readerIndex(0);
+
+ for (int i = 0; i < size; i++)
+ {
+ byte b1 = expected.readByte();
+ byte b2 = actual.readByte();
+ assertEquals("byte at index " + i, b1, b2);
+ }
+ expected.resetReaderIndex();
+ actual.resetReaderIndex();
+ }
+
public static void assertEqualsByteArrays(int length, byte[] expected, byte[] actual)
{
// we check only for the given length (the arrays might be
16 years
JBoss hornetq SVN: r8407 - branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-25 19:19:24 -0500 (Wed, 25 Nov 2009)
New Revision: 8407
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
Log:
Fixing a few page tests
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-26 00:13:49 UTC (rev 8406)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-26 00:19:24 UTC (rev 8407)
@@ -106,8 +106,6 @@
for (int i = 0; i < msgs.size(); i++)
{
- assertEquals(i, (msgs.get(i).getMessage(null)).getMessageID());
-
assertEquals(simpleDestination, (msgs.get(i).getMessage(null)).getDestination());
assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), (msgs.get(i).getMessage(null)).getBodyBuffer().toByteBuffer().array());
@@ -179,8 +177,6 @@
for (int i = 0; i < msgs.size(); i++)
{
- assertEquals(i, (msgs.get(i).getMessage(null)).getMessageID());
-
assertEquals(simpleDestination, (msgs.get(i).getMessage(null)).getDestination());
assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), (msgs.get(i).getMessage(null)).getBodyBuffer().toByteBuffer().array());
16 years
JBoss hornetq SVN: r8406 - in branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl: fakes and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-25 19:13:49 -0500 (Wed, 25 Nov 2009)
New Revision: 8406
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Fixing a few Journal Fake tests
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-11-26 00:06:37 UTC (rev 8405)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-11-26 00:13:49 UTC (rev 8406)
@@ -91,7 +91,10 @@
}
}
- fileFactory.stop();
+ if (fileFactory != null)
+ {
+ fileFactory.stop();
+ }
fileFactory = null;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-26 00:06:37 UTC (rev 8405)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-26 00:13:49 UTC (rev 8406)
@@ -618,6 +618,8 @@
*/
public void write(HornetQBuffer bytes, boolean sync) throws Exception
{
+ bytes.writerIndex(bytes.capacity());
+ bytes.readerIndex(0);
writeDirect(bytes.toByteBuffer(), sync);
}
16 years
JBoss hornetq SVN: r8405 - branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-25 19:06:37 -0500 (Wed, 25 Nov 2009)
New Revision: 8405
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Fixing a few Journal Fake tests
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-25 22:20:57 UTC (rev 8404)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-26 00:06:37 UTC (rev 8405)
@@ -607,6 +607,8 @@
*/
public void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws Exception
{
+ bytes.writerIndex(bytes.capacity());
+ bytes.readerIndex(0);
writeDirect(bytes.toByteBuffer(), sync, callback);
}
16 years
JBoss hornetq SVN: r8404 - in branches/20-optimisation: src/main/org/hornetq/core/message and 9 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-25 17:20:57 -0500 (Wed, 25 Nov 2009)
New Revision: 8404
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Fix for LargeMessages
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -16,15 +16,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.LargeMessageBuffer;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
/**
@@ -147,17 +147,6 @@
*/
// FIXME - only used for large messages - move it!
- public long getLargeBodySize()
- {
- if (largeMessage)
- {
- return ((LargeMessageBuffer)getWholeBuffer()).getSize();
- }
- else
- {
- return this.getBodySize();
- }
- }
/* (non-Javadoc)
* @see org.hornetq.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
@@ -172,7 +161,9 @@
{
try
{
- out.write(this.getWholeBuffer().toByteBuffer().array());
+ byte readBuffer[] = new byte[getBodySize()];
+ getBodyBuffer().readBytes(readBuffer);
+ out.write(readBuffer);
}
catch (IOException e)
{
@@ -249,6 +240,50 @@
bodyBuffer.setBuffer(buffer);
}
}
+
+ public BodyEncoder getBodyEncoder() throws HornetQException
+ {
+ return new DecodingContext();
+ }
+
+ private final class DecodingContext implements BodyEncoder
+ {
+ private int lastPos = 0;
+
+ public DecodingContext()
+ {
+ }
+
+ public void open()
+ {
+ }
+
+ public void close()
+ {
+ }
+
+ public long getLargeBodySize()
+ {
+ return buffer.writerIndex();
+ }
+
+ public int encode(final ByteBuffer bufferRead) throws HornetQException
+ {
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
+ return encode(buffer, bufferRead.capacity());
+ }
+
+ public int encode(final HornetQBuffer bufferOut, final int size)
+ {
+ byte[] bytes = new byte[size];
+ getWholeBuffer().readBytes(bytes);
+ bufferOut.writeBytes(bytes, 0, size);
+ return size;
+ }
+ }
+
+
+
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -232,8 +232,6 @@
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
- SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
-
session.workDone();
boolean isLarge;
@@ -251,13 +249,18 @@
{
largeMessageSend(sendBlocking, msg, theCredits);
}
- else if (sendBlocking)
- {
- channel.sendBlocking(message);
- }
else
{
- channel.send(message);
+ SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
+
+ if (sendBlocking)
+ {
+ channel.sendBlocking(message);
+ }
+ else
+ {
+ channel.send(message);
+ }
}
try
@@ -345,10 +348,12 @@
final Message msg,
final ClientProducerCredits credits) throws HornetQException
{
- final long bodySize = msg.getLargeBodySize();
-
BodyEncoder context = msg.getBodyEncoder();
+
+ final long bodySize = context.getLargeBodySize();
+
+
context.open();
try
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -35,4 +35,6 @@
int encode(ByteBuffer bufferRead) throws HornetQException;
int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+
+ long getLargeBodySize();
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -18,6 +18,7 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -198,10 +199,8 @@
void decodeHeadersAndProperties(HornetQBuffer buffer);
- long getLargeBodySize();
+ BodyEncoder getBodyEncoder() throws HornetQException;
- BodyEncoder getBodyEncoder();
-
/** Get the InputStream used on a message that will be sent over a producer */
InputStream getBodyInputStream();
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -142,8 +142,12 @@
protected MessageImpl(final long messageID, final int initialMessageBufferSize)
{
+ this(initialMessageBufferSize);
+ }
+
+ protected MessageImpl(final int initialMessageBufferSize)
+ {
this();
- this.messageID = messageID;
createBody(initialMessageBufferSize);
}
@@ -165,11 +169,15 @@
endOfBodyPosition = other.endOfBodyPosition;
endOfMessagePosition = other.endOfMessagePosition;
copied = other.copied;
-
- // We need to copy the underlying buffer too, since the different messsages thereafter might have different
- // properties set on them, making their encoding different
- buffer = other.buffer.copy(0, other.buffer.capacity());
- buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+
+ if (other.buffer != null)
+ {
+ createBody(other.buffer.capacity());
+ // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+ // properties set on them, making their encoding different
+ buffer = other.buffer.copy(0, other.buffer.capacity());
+ buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+ }
}
// Message implementation ----------------------------------------
@@ -779,7 +787,7 @@
return buffer;
}
- public BodyEncoder getBodyEncoder()
+ public BodyEncoder getBodyEncoder() throws HornetQException
{
return new DecodingContext();
}
@@ -889,6 +897,11 @@
public void close()
{
}
+
+ public long getLargeBodySize()
+ {
+ return buffer.writerIndex();
+ }
public int encode(final ByteBuffer bufferRead) throws HornetQException
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -125,20 +125,6 @@
}
@Override
- public synchronized long getLargeBodySize()
- {
- try
- {
- validateFile();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- return bodySize;
- }
-
- @Override
public synchronized int getEncodeSize()
{
return getHeadersAndPropertiesEncodeSize();
@@ -146,15 +132,15 @@
@Override
public void encode(final HornetQBuffer buffer)
- {
+ {
super.encodeHeadersAndProperties(buffer);
}
-
+
@Override
public void decode(final HornetQBuffer buffer)
- {
+ {
file = null;
-
+
super.decodeHeadersAndProperties(buffer);
}
@@ -174,8 +160,9 @@
}
@Override
- public BodyEncoder getBodyEncoder()
+ public BodyEncoder getBodyEncoder() throws HornetQException
{
+ validateFile();
return new DecodingContext();
}
@@ -312,22 +299,30 @@
// Private -------------------------------------------------------
- private synchronized void validateFile() throws Exception
+ private synchronized void validateFile() throws HornetQException
{
- if (file == null)
+ try
{
- if (messageID <= 0)
+ if (file == null)
{
- throw new RuntimeException("MessageID not set on LargeMessage");
+ if (messageID <= 0)
+ {
+ throw new RuntimeException("MessageID not set on LargeMessage");
+ }
+
+ file = storageManager.createFileForLargeMessage(getMessageID(), durable);
+
+ file.open();
+
+ bodySize = file.size();
+
}
-
- file = storageManager.createFileForLargeMessage(getMessageID(), durable);
-
- file.open();
-
- bodySize = file.size();
-
}
+ catch (Exception e)
+ {
+ // TODO: There is an IO_ERROR on trunk now, this should be used here instead
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+ }
}
/* (non-Javadoc)
@@ -415,5 +410,13 @@
return bytesRead;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.BodyEncoder#getLargeBodySize()
+ */
+ public long getLargeBodySize()
+ {
+ return bodySize;
+ }
}
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -57,18 +57,13 @@
*/
public synchronized void addBytes(final byte[] bytes)
{
-// HornetQBuffer buffer = getBuffer();
-//
-// if (buffer != null)
-// {
- // expand the buffer
- buffer.writeBytes(bytes);
-// }
-// else
-// {
-// // Reuse the initial byte array on the buffer construction
-// setBuffer(ChannelBuffers.dynamicBuffer(bytes));
-// }
+ if (buffer == null)
+ {
+ buffer = HornetQBuffers.dynamicBuffer(bytes.length);
+ }
+
+ // expand the buffer
+ buffer.writeBytes(bytes);
}
/* (non-Javadoc)
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -32,9 +32,6 @@
/** Used only if largeMessage */
private byte[] largeMessageHeader;
- /** We need to set the MessageID when replicating this on the server */
- private long largeMessageId = -1;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -58,28 +55,11 @@
return largeMessageHeader;
}
- /**
- * @return the largeMessageId
- */
- public long getLargeMessageID()
- {
- return largeMessageId;
- }
-
- /**
- * @param largeMessageId the largeMessageId to set
- */
- public void setLargeMessageID(long id)
- {
- this.largeMessageId = id;
- }
-
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeInt(largeMessageHeader.length);
buffer.writeBytes(largeMessageHeader);
- buffer.writeLong(largeMessageId);
}
@Override
@@ -90,8 +70,6 @@
largeMessageHeader = new byte[largeMessageLength];
buffer.readBytes(largeMessageHeader);
-
- largeMessageId = buffer.readLong();
}
// Package protected ---------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -13,7 +13,6 @@
package org.hornetq.core.server;
-import org.hornetq.core.message.BodyEncoder;
/**
* A LargeMessage
@@ -36,8 +35,6 @@
/** Close the files if opened */
void releaseResources();
- long getLargeBodySize();
-
void deleteFile() throws Exception;
void incrementDelayDeletionCount();
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -638,7 +638,7 @@
* This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
private final class LargeMessageDeliverer
{
- private final long sizePendingLargeMessage;
+ private long sizePendingLargeMessage;
private LargeServerMessage largeMessage;
@@ -657,8 +657,6 @@
largeMessage.incrementDelayDeletionCount();
- sizePendingLargeMessage = largeMessage.getLargeBodySize();
-
this.ref = ref;
}
@@ -684,14 +682,16 @@
largeMessage.encodeHeadersAndProperties(headerBuffer);
+ context = largeMessage.getBodyEncoder();
+
+ sizePendingLargeMessage = context.getLargeBodySize();
+
SessionReceiveLargeMessage initialPacket = new SessionReceiveLargeMessage(id,
headerBuffer.toByteBuffer()
.array(),
- largeMessage.getLargeBodySize(),
+ context.getLargeBodySize(),
ref.getDeliveryCount());
- context = largeMessage.getBodyEncoder();
-
context.open();
sentInitialPacket = true;
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -64,6 +64,11 @@
{
super(messageID, initialMessageBufferSize);
}
+
+ protected ServerMessageImpl(final int initialMessageBufferSize)
+ {
+ super(initialMessageBufferSize);
+ }
/*
* Copy constructor
@@ -144,11 +149,6 @@
return false;
}
- public long getLargeBodySize()
- {
- return -1;
- }
-
public int getMemoryEstimate()
{
if (memoryEstimate == -1)
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -18,11 +18,6 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import junit.framework.AssertionFailedError;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -53,13 +48,6 @@
*/
public class LargeMessageTest extends LargeMessageTestBase
{
- public static Test suite()
- {
- TestSuite suite = new TestSuite();
-
- return suite;
- }
-
// Constants -----------------------------------------------------
final static int RECEIVE_WAIT_TIME = 60000;
@@ -788,14 +776,18 @@
producer2.send(msg1);
+ boolean failed = false;
+
try
{
producer2.send(msg1);
- fail("Expected Exception");
}
catch (Throwable e)
{
+ failed = true;
}
+
+ assertTrue("Exception expected", failed);
session.commit();
@@ -926,7 +918,7 @@
false,
true,
true,
- 100,
+ 2,
LARGE_MESSAGE_SIZE,
RECEIVE_WAIT_TIME,
0);
@@ -2211,22 +2203,18 @@
ClientMessage message = null;
- HornetQBuffer body = null;
-
for (int i = 0; i < 100; i++)
{
message = session.createClientMessage(true);
+ // TODO: Why do I need to reset the writerIndex?
+ message.getBodyBuffer().writerIndex(0);
+
for (int j = 1; j <= numberOfBytes; j++)
{
message.getBodyBuffer().writeInt(j);
}
- if (i == 0)
- {
- body = message.getBodyBuffer();
- }
-
producer.send(message);
}
@@ -2262,17 +2250,12 @@
assertNotNull(message2);
- try
+ message.getBodyBuffer().readerIndex(0);
+
+ for (int j = 1; j <= numberOfBytes; j++)
{
- assertEqualsByteArrays(body.writerIndex(), body.toByteBuffer().array(), message2.getBodyBuffer().toByteBuffer().
- array());
+ assertEquals(j, message.getBodyBuffer().readInt());
}
- catch (AssertionFailedError e)
- {
- log.info("Expected buffer:" + dumbBytesHex(body.toByteBuffer().array(), 40));
- log.info("Arriving buffer:" + dumbBytesHex(message2.getBodyBuffer().toByteBuffer().array(), 40));
- throw e;
- }
}
consumer.close();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -350,7 +350,6 @@
HornetQBuffer buffer = message.getBodyBuffer();
buffer.resetReaderIndex();
- assertEquals(numberOfBytes, buffer.writerIndex());
for (long b = 0; b < numberOfBytes; b++)
{
if (b % (1024l * 1024l) == 0)
@@ -360,6 +359,15 @@
assertEquals(getSamplebyte(b), buffer.readByte());
}
+
+ try
+ {
+ buffer.readByte();
+ fail("Supposed to throw an exception");
+ }
+ catch (Exception e)
+ {
+ }
}
}
catch (Throwable e)
@@ -396,8 +404,6 @@
assertNotNull(message);
- log.debug("Message: " + i);
-
System.currentTimeMillis();
if (delayDelivery > 0)
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25 22:20:57 UTC (rev 8404)
@@ -288,7 +288,7 @@
assertEquals(0, manager.getActiveTokens().size());
- ServerMessage msg = new ServerMessageImpl(1, 10);
+ ServerMessage msg = new ServerMessageImpl(1, 1024);
SimpleString dummy = new SimpleString("dummy");
msg.setDestination(dummy);
16 years
JBoss hornetq SVN: r8403 - in branches/20-optimisation: src/main/org/hornetq/core/message and 8 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-25 15:11:08 -0500 (Wed, 25 Nov 2009)
New Revision: 8403
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
optimisation
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -697,6 +697,7 @@
log.trace("Setting up flowControlSize to " + message.getPacketSize() + " on message = " + clMessage);
}
+ // log.info("setting flow control size as " + message.getPacketSize());
clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(clMessage);
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -14,12 +14,10 @@
package org.hornetq.core.message;
import java.io.InputStream;
-import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -74,12 +72,8 @@
void decodeFromBuffer(HornetQBuffer buffer);
- HornetQBuffer encodeToBuffer();
-
int getEndOfMessagePosition();
- // void setEndOfBodyPosition();
-
int getEndOfBodyPosition();
void checkCopy();
@@ -88,7 +82,7 @@
void resetCopied();
-// void resetEndOfBodyPosition();
+ HornetQBuffer getEncodedBuffer();
// Properties
// ------------------------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -96,6 +96,18 @@
protected HornetQBuffer buffer;
+ protected ResetLimitWrappedHornetQBuffer bodyBuffer;
+
+ protected boolean bufferValid;
+
+ private int endOfBodyPosition = -1;
+
+ private int endOfMessagePosition;
+
+ private boolean copied = true;
+
+ private boolean bufferUsed;
+
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -135,27 +147,38 @@
createBody(initialMessageBufferSize);
}
- private void createBody(final int initialMessageBufferSize)
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other)
{
- buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
+ messageID = other.getMessageID();
+ destination = other.getDestination();
+ type = other.getType();
+ durable = other.isDurable();
+ expiration = other.getExpiration();
+ timestamp = other.getTimestamp();
+ priority = other.getPriority();
+ properties = new TypedProperties(other.getProperties());
- // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
- buffer.writeByte((byte)0);
+ bufferValid = other.bufferValid;
+ endOfBodyPosition = other.endOfBodyPosition;
+ endOfMessagePosition = other.endOfMessagePosition;
+ copied = other.copied;
- int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
-
- buffer.setIndex(limit, limit);
-
- // endOfBodyPosition = limit;
+ // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+ // properties set on them, making their encoding different
+ buffer = other.buffer.copy(0, other.buffer.capacity());
+ buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
}
// Message implementation ----------------------------------------
public int getEncodeSize()
{
- int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
+ int headersPropsSize = getHeadersAndPropertiesEncodeSize();
- int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() : this.endOfBodyPosition;
+ int bodyPos = endOfBodyPosition == -1 ? buffer.writerIndex() : endOfBodyPosition;
int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
@@ -174,29 +197,48 @@
}
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
+ {
buffer.writeLong(messageID);
- buffer.writeSimpleString(destination);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
- buffer.writeByte(priority);
- properties.encode(buffer);
+ buffer.writeByte(priority);
+ properties.encode(buffer);
}
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
- messageID = buffer.readLong();
- destination = buffer.readSimpleString();
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
timestamp = buffer.readLong();
- priority = buffer.readByte();
+ priority = buffer.readByte();
properties.decode(buffer);
}
+ public HornetQBuffer getBodyBuffer()
+ {
+ if (bodyBuffer == null)
+ {
+ if (buffer instanceof LargeMessageBuffer == false)
+ {
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
+ buffer,
+ this);
+ }
+ else
+ {
+ return buffer;
+ }
+ }
+
+ return bodyBuffer;
+ }
+
public long getMessageID()
{
return messageID;
@@ -295,6 +337,99 @@
return map;
}
+ public void decodeFromBuffer(final HornetQBuffer buffer)
+ {
+ this.buffer = buffer;
+
+ decode();
+ }
+
+ public void bodyChanged()
+ {
+ // If the body is changed we must copy the buffer otherwise can affect the previously sent message
+ // which might be in the Netty write queue
+ checkCopy();
+
+ bufferValid = false;
+
+ endOfBodyPosition = -1;
+ }
+
+ public void checkCopy()
+ {
+ if (!copied)
+ {
+ forceCopy();
+
+ copied = true;
+ }
+ }
+
+ public void resetCopied()
+ {
+ copied = false;
+ }
+
+ public int getEndOfMessagePosition()
+ {
+ return endOfMessagePosition;
+ }
+
+ public int getEndOfBodyPosition()
+ {
+ return endOfBodyPosition;
+ }
+
+ // Encode to journal or paging
+ public void encode(final HornetQBuffer buff)
+ {
+ encodeToBuffer();
+
+ buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
+ }
+
+ // Decode from journal or paging
+ public void decode(final HornetQBuffer buff)
+ {
+ int start = buff.readerIndex();
+
+ endOfBodyPosition = buff.readInt();
+
+ endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
+
+ int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
+
+ buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+ buffer.writeBytes(buff, start, length);
+
+ decode();
+
+ buff.readerIndex(start + length);
+ }
+
+ public synchronized HornetQBuffer getEncodedBuffer()
+ {
+ HornetQBuffer buff = encodeToBuffer();
+
+ if (bufferUsed)
+ {
+ HornetQBuffer copied = buff.copy(0, buff.capacity());
+
+ copied.setIndex(0, endOfMessagePosition);
+
+ return copied;
+ }
+ else
+ {
+ buffer.setIndex(0, endOfMessagePosition);
+
+ bufferUsed = true;
+
+ return buffer;
+ }
+ }
+
// Properties
// ---------------------------------------------------------------------------------------
@@ -657,123 +792,68 @@
// Private -------------------------------------------------------
- // Inner classes -------------------------------------------------
-
- private class DecodingContext implements BodyEncoder
+ // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
+ // many queues - the first caller in this case will actually encode it
+ private synchronized HornetQBuffer encodeToBuffer()
{
- private int lastPos = 0;
-
- public DecodingContext()
+ if (!bufferValid)
{
- }
+ if (bufferUsed)
+ {
+ // Cannot use same buffer - must copy
- public void open()
- {
- }
+ forceCopy();
+ }
- public void close()
- {
- }
-
- public int encode(ByteBuffer bufferRead) throws HornetQException
- {
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
- return encode(buffer, bufferRead.capacity());
- }
-
- public int encode(HornetQBuffer bufferOut, int size)
- {
- bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
- lastPos += size;
- return size;
- }
- }
-
- protected ResetLimitWrappedHornetQBuffer bodyBuffer;
-
- public HornetQBuffer getBodyBuffer()
- {
- if (bodyBuffer == null)
- {
- if (buffer instanceof LargeMessageBuffer == false)
+ if (endOfBodyPosition == -1)
{
- bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
- buffer,
- this);
+ // Means sending message for first time
+ endOfBodyPosition = buffer.writerIndex();
}
- else
- {
- return buffer;
- }
- }
- return bodyBuffer;
- }
+ // write it
+ buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
- protected boolean bufferValid;
+ // Position at end of body and skip past the message end position int
+ buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
- private int endOfBodyPosition = -1;
+ encodeHeadersAndProperties(buffer);
- private int endOfMessagePosition;
+ // Write end of message position
- private boolean copied = true;
+ endOfMessagePosition = buffer.writerIndex();
- /*
- * Copy constructor
- */
- protected MessageImpl(final MessageImpl other, final boolean shallow)
- {
- messageID = other.getMessageID();
- destination = other.getDestination();
- type = other.getType();
- durable = other.isDurable();
- expiration = other.getExpiration();
- timestamp = other.getTimestamp();
- priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ buffer.setInt(endOfBodyPosition, endOfMessagePosition);
- this.bufferValid = other.bufferValid;
- this.endOfBodyPosition = other.endOfBodyPosition;
- this.endOfMessagePosition = other.endOfMessagePosition;
- this.copied = other.copied;
+ bufferValid = true;
+ }
- if (shallow)
- {
- this.buffer = other.buffer;
- }
- else
- {
- // We need to copy the underlying buffer too, since the different messsages thereafter might have different
- // properties set on them, making their encoding different
- buffer = other.buffer.copy(0, other.buffer.capacity());
- buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
- }
+ return buffer;
}
- public void bodyChanged()
+ private void decode()
{
- // If the body is changed we must copy the buffer otherwise can affect the previously sent message
- // which might be in the Netty write queue
- checkCopy();
+ endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
- bufferValid = false;
+ buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
- this.endOfBodyPosition = -1;
+ decodeHeadersAndProperties(buffer);
+
+ endOfMessagePosition = buffer.readerIndex();
+
+ bufferValid = true;
}
- public void checkCopy()
+ private void createBody(final int initialMessageBufferSize)
{
- if (!copied)
- {
- forceCopy();
+ buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
- copied = true;
- }
- }
+ // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+ buffer.writeByte((byte)0);
- public void resetCopied()
- {
- copied = false;
+ int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
+
+ buffer.setIndex(limit, limit);
}
private void forceCopy()
@@ -782,102 +862,46 @@
buffer = buffer.copy(0, buffer.capacity());
- buffer.setIndex(0, this.endOfBodyPosition);
+ buffer.setIndex(0, endOfBodyPosition);
if (bodyBuffer != null)
{
bodyBuffer.setBuffer(buffer);
}
- }
- public int getEndOfMessagePosition()
- {
- return this.endOfMessagePosition;
+ bufferUsed = false;
}
- public int getEndOfBodyPosition()
- {
- return this.endOfBodyPosition;
- }
+ // Inner classes -------------------------------------------------
- // Encode to journal or paging
- public void encode(HornetQBuffer buff)
+ private final class DecodingContext implements BodyEncoder
{
- encodeToBuffer();
+ private int lastPos = 0;
- buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
- }
+ public DecodingContext()
+ {
+ }
- // Decode from journal or paging
- public void decode(HornetQBuffer buff)
- {
- int start = buff.readerIndex();
+ public void open()
+ {
+ }
- endOfBodyPosition = buff.readInt();
+ public void close()
+ {
+ }
- endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
+ public int encode(final ByteBuffer bufferRead) throws HornetQException
+ {
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
+ return encode(buffer, bufferRead.capacity());
+ }
- int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
-
- buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
-
- buffer.writeBytes(buff, start, length);
-
- decode();
-
- buff.readerIndex(start + length);
- }
-
- // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
- // many queues - the first caller in this case will actually encode it
- public synchronized HornetQBuffer encodeToBuffer()
- {
- if (!bufferValid)
+ public int encode(final HornetQBuffer bufferOut, final int size)
{
- if (endOfBodyPosition == -1)
- {
- // Means sending message for first time
- endOfBodyPosition = buffer.writerIndex();
- }
-
- // write it
- buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
-
- // Position at end of body and skip past the message end position int
- buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
-
- encodeHeadersAndProperties(buffer);
-
- // Write end of message position
-
- this.endOfMessagePosition = buffer.writerIndex();
-
- buffer.setInt(endOfBodyPosition, endOfMessagePosition);
-
- this.bufferValid = true;
+ bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
+ lastPos += size;
+ return size;
}
-
- return buffer;
}
- public void decode()
- {
- this.endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
-
- buffer.readerIndex(this.endOfBodyPosition + DataConstants.SIZE_INT);
-
- this.decodeHeadersAndProperties(buffer);
-
- this.endOfMessagePosition = buffer.readerIndex();
-
- this.bufferValid = true;
- }
-
- public void decodeFromBuffer(HornetQBuffer buffer)
- {
- this.buffer = buffer;
-
- decode();
- }
-
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -74,7 +74,7 @@
*/
private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
{
- super(copy, true);
+ super(copy);
this.linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -235,7 +235,7 @@
size = buffer.readerIndex();
}
- public final int getPacketSize()
+ public int getPacketSize()
{
if (size == -1)
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -69,19 +69,19 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- HornetQBuffer orig = message.encodeToBuffer();
+ HornetQBuffer buffer = message.getEncodedBuffer();
- //Now we must copy this buffer, before sending to Netty, as it could be concurrently delivered to many consumers
+ //Sanity check
+ if (buffer.writerIndex() != message.getEndOfMessagePosition())
+ {
+ throw new IllegalStateException("Wrong encode position");
+ }
- HornetQBuffer buffer = orig.copy(0, orig.capacity());
-
- buffer.setIndex(0, message.getEndOfMessagePosition());
-
- buffer.writeLong(consumerID);
+ buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
-
+
size = buffer.writerIndex();
-
+
//Write standard headers
int len = size - DataConstants.SIZE_INT;
@@ -90,7 +90,7 @@
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
//Position reader for reading by Netty
- buffer.readerIndex(0);
+ buffer.setIndex(0, size);
return buffer;
}
@@ -107,7 +107,7 @@
deliveryCount = buffer.readInt();
size = buffer.readerIndex();
-
+
//Need to position buffer for reading
buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -72,19 +72,16 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- //this isn't right when forwarding a message that has been already received - because writerindex will
- //be pointing at end of message
+ HornetQBuffer buffer = message.getEncodedBuffer();
- HornetQBuffer orig = message.encodeToBuffer();
+ //Sanity check
+ if (buffer.writerIndex() != message.getEndOfMessagePosition())
+ {
+ throw new IllegalStateException("Wrong encode position");
+ }
- //FIXME - for now we are copying due to concurrent sends to many bridges on the server
-
- HornetQBuffer buffer = orig.copy(0, orig.capacity());
-
- buffer.setIndex(0, message.getEndOfMessagePosition());
-
buffer.writeBoolean(requiresResponse);
-
+
size = buffer.writerIndex();
//Write standard headers
@@ -108,8 +105,12 @@
//Buffer comes in after having read standard headers and positioned at Beginning of body part
message.decodeFromBuffer(buffer);
-
- requiresResponse = buffer.readBoolean();
+
+ int ri = buffer.readerIndex();
+
+ requiresResponse = buffer.readBoolean();
+
+ buffer.readerIndex(ri);
}
// Private -------------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -43,8 +43,6 @@
ServerMessage copy();
- ServerMessage shallowCopy();
-
int getMemoryEstimate();
int getRefCount();
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -411,12 +411,12 @@
if (flowRecord != null)
{
- // We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave
+ // We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the destination node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- message = message.shallowCopy();
+ message = message.copy();
// TODO - we can optimise this
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -185,9 +185,10 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
+ //log.info("handling message");
if (availableCredits != null && availableCredits.get() <= 0)
{
-
+ // log.info("busy");
return HandleStatus.BUSY;
}
@@ -417,7 +418,7 @@
public void receiveCredits(final int credits) throws Exception
{
-
+ // log.info("Receiving credits " + credits);
if (credits == -1)
{
// No flow control
@@ -591,8 +592,11 @@
if (availableCredits != null)
{
+ //log.info("Subtracting credits " + packet.getPacketSize());
availableCredits.addAndGet(-packet.getPacketSize());
}
+
+ // log.info("delivered message");
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -68,9 +68,9 @@
/*
* Copy constructor
*/
- protected ServerMessageImpl(final ServerMessageImpl other, final boolean shallow)
+ protected ServerMessageImpl(final ServerMessageImpl other)
{
- super(other, shallow);
+ super(other);
}
public void setMessageID(final long id)
@@ -164,7 +164,7 @@
public ServerMessage copy(final long newID)
{
- ServerMessage m = new ServerMessageImpl(this, false);
+ ServerMessage m = new ServerMessageImpl(this);
m.setMessageID(newID);
@@ -173,14 +173,9 @@
public ServerMessage copy()
{
- return new ServerMessageImpl(this, false);
+ return new ServerMessageImpl(this);
}
- public ServerMessage shallowCopy()
- {
- return new ServerMessageImpl(this, true);
- }
-
public ServerMessage makeCopyForExpiryOrDLA(final long newID, final boolean expiry) throws Exception
{
/*
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -1574,6 +1574,8 @@
final CreditManagerHolder holder = this.getCreditManagerHolder(address);
int credits = packet.getCredits();
+
+ //log.info("requesting credits " + credits);
int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
{
@@ -1594,6 +1596,8 @@
}
}
});
+
+ //log.info("got credits " + gotCredits);
if (gotCredits > 0)
{
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -71,7 +71,7 @@
server.start();
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(connectorFactoryClassName));
- sf.setConsumerWindowSize(0);
+ // sf.setConsumerWindowSize(0);
ClientSession session = sf.createSession(false, true, true);
@@ -103,7 +103,7 @@
}
log.info("sent messages");
-
+
ClientConsumer consumer = session.createConsumer(QUEUE);
session.start();
@@ -112,6 +112,8 @@
{
ClientMessage message2 = consumer.receive();
+ // log.info("got message " + i);
+
HornetQBuffer buffer = message2.getBodyBuffer();
assertEquals("testINVMCoreClient", buffer.readString());
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -67,7 +67,7 @@
public void testFlowControlMultipleConsumers() throws Exception
{
- testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
+ testFlowControl(1000, 500, -1, 1024, 1024, 1024, 5, 1, 0, false);
}
public void testFlowControlZeroConsumerWindowSize() throws Exception
@@ -134,7 +134,7 @@
{
testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 1, 1, 0, true, 1000, true);
}
-
+
public void testFlowControlLargeMessages7() throws Exception
{
testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 2, 2, 0, true, 1000, true);
@@ -212,7 +212,7 @@
{
session.createQueue(address, new SimpleString(queueName + i), null, false);
}
-
+
final byte[] bytes = RandomUtil.randomBytes(messageSize);
class MyHandler implements MessageHandler
@@ -230,7 +230,7 @@
byte[] bytesRead = new byte[messageSize];
message.getBodyBuffer().readBytes(bytesRead);
-
+
assertEqualsByteArrays(bytes, bytesRead);
message.acknowledge();
@@ -244,13 +244,14 @@
{
Thread.sleep(consumerDelay);
}
+
}
catch (Exception e)
{
log.error("Failed to handle message", e);
-
+
this.exception = e;
-
+
latch.countDown();
}
}
@@ -285,12 +286,10 @@
long start = System.currentTimeMillis();
-
-
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
for (int j = 0; j < numProducers; j++)
@@ -389,7 +388,7 @@
byte[] bytes = new byte[0];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -421,7 +420,7 @@
t.start();
ClientMessage message2 = session.createClientMessage(false);
-
+
message2.getBodyBuffer().writeBytes(bytes);
producer2.send(message2);
@@ -474,7 +473,7 @@
byte[] bytes = new byte[0];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -504,7 +503,7 @@
assertEquals(1, waiting);
message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer2.send(message);
@@ -552,7 +551,7 @@
byte[] bytes = new byte[0];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -600,7 +599,7 @@
session.close();
message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer3.send(message);
@@ -648,7 +647,7 @@
byte[] bytes = new byte[2000];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
final AtomicBoolean closed = new AtomicBoolean(false);
@@ -721,7 +720,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -738,4 +737,72 @@
server.stop();
}
+ //Not technically a flow control test, but what the hell
+ public void testMultipleConsumers() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+ session.createQueue("address", "queue2", null, false);
+ session.createQueue("address", "queue3", null, false);
+ session.createQueue("address", "queue4", null, false);
+ session.createQueue("address", "queue5", null, false);
+
+ ClientConsumer consumer1 = session.createConsumer("queue1");
+ ClientConsumer consumer2 = session.createConsumer("queue2");
+ ClientConsumer consumer3 = session.createConsumer("queue3");
+ ClientConsumer consumer4 = session.createConsumer("queue4");
+ ClientConsumer consumer5 = session.createConsumer("queue5");
+
+ ClientProducer producer = session.createProducer("address");
+
+ byte[] bytes = new byte[2000];
+
+ ClientMessage message = session.createClientMessage(false);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(message);
+ }
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = consumer1.receive(1000);
+
+ assertNotNull(msg);
+
+ msg = consumer2.receive(5000);
+
+ assertNotNull(msg);
+
+ msg = consumer3.receive(5000);
+
+ assertNotNull(msg);
+
+ msg = consumer4.receive(5000);
+
+ assertNotNull(msg);
+
+ msg = consumer5.receive(5000);
+
+ assertNotNull(msg);
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -88,14 +88,13 @@
checkFreePort(PORTS);
clearData();
-
+
consumers = new ConsumerHolder[MAX_CONSUMERS];
servers = new HornetQServer[MAX_SERVERS];
sfs = new ClientSessionFactory[MAX_SERVERS];
-
}
@Override
@@ -207,15 +206,15 @@
final int consumerCount,
final boolean local) throws Exception
{
-// System.out.println("waiting for bindings on node " + node +
-// " address " +
-// address +
-// " count " +
-// count +
-// " consumerCount " +
-// consumerCount +
-// " local " +
-// local);
+ // System.out.println("waiting for bindings on node " + node +
+ // " address " +
+ // address +
+ // " count " +
+ // count +
+ // " consumerCount " +
+ // consumerCount +
+ // " local " +
+ // local);
HornetQServer server = this.servers[node];
if (server == null)
@@ -442,7 +441,8 @@
producer.send(message);
}
- } finally
+ }
+ finally
{
session.close();
}
@@ -676,7 +676,9 @@
if (j != (Integer)(message.getObjectProperty(COUNT_PROP)))
{
outOfOrder = true;
- System.out.println("Message j=" + j + " was received out of order = " + message.getObjectProperty(COUNT_PROP));
+ System.out.println("Message j=" + j +
+ " was received out of order = " +
+ message.getObjectProperty(COUNT_PROP));
}
}
}
@@ -823,7 +825,7 @@
message.acknowledge();
}
- log.info("consumer " + consumerIDs[i] +" returns " + count);
+ log.info("consumer " + consumerIDs[i] + " returns " + count);
}
else
{
@@ -1165,8 +1167,6 @@
Map<String, Object> params = generateParams(node, netty);
-
-
if (netty)
{
TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
@@ -1175,7 +1175,7 @@
else
{
TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
+ configuration.getAcceptorConfigurations().add(invmtc);
}
HornetQServer server;
16 years
JBoss hornetq SVN: r8402 - in branches/20-optimisation: tests/src/org/hornetq/tests/unit/core/postoffice/impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-25 06:11:45 -0500 (Wed, 25 Nov 2009)
New Revision: 8402
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
optimisation
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
@@ -74,7 +74,7 @@
*/
private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, true);
this.linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -273,7 +273,7 @@
}
@Override
- public synchronized ServerMessage copy(final long newID) throws Exception
+ public synchronized ServerMessage copy(final long newID)
{
incrementDelayDeletionCount();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-25 11:07:36 UTC (rev 8401)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-25 11:11:45 UTC (rev 8402)
@@ -36,6 +36,7 @@
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.tests.util.UnitTestCase;
@@ -113,11 +114,11 @@
{
if (route)
{
- bind.route(new FakeMessage(), new RoutingContextImpl(new FakeTransaction()));
+ bind.route(new ServerMessageImpl(i, 100), new RoutingContextImpl(new FakeTransaction()));
}
else
{
- bind.redistribute(new FakeMessage(), queue, new RoutingContextImpl(new FakeTransaction()));
+ bind.redistribute(new ServerMessageImpl(i, 100), queue, new RoutingContextImpl(new FakeTransaction()));
}
}
}
@@ -283,625 +284,7 @@
}
- class FakeMessage implements ServerMessage
- {
-
- public void decode(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void encode(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
- public ServerMessage copy() throws Exception
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public ServerMessage copy(long newID) throws Exception
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public MessageReference createReference(Queue queue)
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public int decrementDurableRefCount()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public int decrementRefCount(MessageReference reference) throws Exception
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public void encodeMessageIDToBuffer()
- {
- // TODO Auto-generated method stub
-
- }
-
- public int getMemoryEstimate()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public PagingStore getPagingStore()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public int getRefCount()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public int incrementDurableRefCount()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public int incrementRefCount(MessageReference reference) throws Exception
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public boolean page(boolean duplicateDetection) throws Exception
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public boolean page(long transactionID, boolean duplicateDetection) throws Exception
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void setMessageID(long id)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setOriginalHeaders(ServerMessage other, boolean expiry)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setPagingStore(PagingStore store)
- {
- // TODO Auto-generated method stub
-
- }
-
- public boolean storeIsPaging()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void bodyChanged()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void checkCopy()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void clearCopied()
- {
- // TODO Auto-generated method stub
-
- }
-
- public boolean containsProperty(SimpleString key)
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public boolean containsProperty(String key)
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void decodeFromBuffer(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void decodeHeadersAndProperties(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void encodeHeadersAndProperties(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
- public HornetQBuffer encodeToBuffer()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public HornetQBuffer getBodyBuffer()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public BodyEncoder getBodyEncoder()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public InputStream getBodyInputStream()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Boolean getBooleanProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Boolean getBooleanProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Byte getByteProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Byte getByteProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public byte[] getBytesProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public byte[] getBytesProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public SimpleString getDestination()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Double getDoubleProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Double getDoubleProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public int getEncodeSize()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public int getEndOfBodyPosition()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public int getEndOfMessagePosition()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public long getExpiration()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public Float getFloatProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Float getFloatProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public int getHeadersAndPropertiesEncodeSize()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public Integer getIntProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Integer getIntProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public long getLargeBodySize()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public Long getLongProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Long getLongProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public long getMessageID()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public Object getObjectProperty(SimpleString key)
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Object getObjectProperty(String key)
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public byte getPriority()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public TypedProperties getProperties()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Set<SimpleString> getPropertyNames()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Short getShortProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Short getShortProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public SimpleString getSimpleStringProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public SimpleString getSimpleStringProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public String getStringProperty(SimpleString key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public String getStringProperty(String key) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public long getTimestamp()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public byte getType()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public HornetQBuffer getWholeBuffer()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public boolean isDurable()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public boolean isExpired()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public boolean isLargeMessage()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void putBooleanProperty(SimpleString key, boolean value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putBooleanProperty(String key, boolean value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putByteProperty(SimpleString key, byte value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putByteProperty(String key, byte value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putBytesProperty(SimpleString key, byte[] value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putBytesProperty(String key, byte[] value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putDoubleProperty(SimpleString key, double value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putDoubleProperty(String key, double value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putFloatProperty(SimpleString key, float value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putFloatProperty(String key, float value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putIntProperty(SimpleString key, int value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putIntProperty(String key, int value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putLongProperty(SimpleString key, long value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putLongProperty(String key, long value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putObjectProperty(SimpleString key, Object value) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putObjectProperty(String key, Object value) throws PropertyConversionException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putShortProperty(SimpleString key, short value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putShortProperty(String key, short value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putStringProperty(SimpleString key, SimpleString value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putStringProperty(String key, String value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void putTypedProperties(TypedProperties properties)
- {
- // TODO Auto-generated method stub
-
- }
-
- public Object removeProperty(SimpleString key)
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Object removeProperty(String key)
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public void setDestination(SimpleString destination)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setDurable(boolean durable)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setExpiration(long expiration)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setPriority(byte priority)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setTimestamp(long timestamp)
- {
- // TODO Auto-generated method stub
-
- }
-
- public Map<String, Object> toMap()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public void resetCopied()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setEndOfBodyPosition()
- {
- // TODO Auto-generated method stub
-
- }
-
-
-
- }
-
+
class FakeFilter implements Filter
{
16 years