[hornetq-commits] JBoss hornetq SVN: r8396 - in branches/20-optimisation: examples/jms/interceptor/src/org/hornetq/jms/example and 18 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 24 19:45:53 EST 2009


Author: timfox
Date: 2009-11-24 19:45:52 -0500 (Tue, 24 Nov 2009)
New Revision: 8396

Modified:
   branches/20-optimisation/.classpath
   branches/20-optimisation/build-hornetq.xml
   branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
   branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java
   branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java
   branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
   branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java
   branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
   branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
optimisation

Modified: branches/20-optimisation/.classpath
===================================================================
--- branches/20-optimisation/.classpath	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/.classpath	2009-11-25 00:45:52 UTC (rev 8396)
@@ -100,7 +100,7 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="lib" path="tests/tmpfiles"/>
 	<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
-	<classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+	<classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/home/tim/workspace/netty-3.1.5.GA/src/main"/>
 	<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
 	<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
 	<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>

Modified: branches/20-optimisation/build-hornetq.xml
===================================================================
--- branches/20-optimisation/build-hornetq.xml	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/build-hornetq.xml	2009-11-25 00:45:52 UTC (rev 8396)
@@ -172,6 +172,10 @@
    <!-- Classpath definition                                                                     -->
    <!-- ======================================================================================== -->
 
+   <path id="core.compilation.classpath">
+      <path refid="org.jboss.netty.classpath"/>
+   </path>
+	
    <path id="jms.compilation.classpath">
       <path location="${build.core.classes.dir}"/>
       <path refid="org.jboss.javaee.classpath"/>
@@ -401,6 +405,7 @@
          </src>
          <include name="**/hornetq/core/**/*.java"/>
          <include name="**/hornetq/utils/**/*.java"/>
+      	 <classpath refid="core.compilation.classpath"/>
       </javac>
       <javah class="org.hornetq.core.asyncio.impl.AsynchronousFileImpl"
              classpath="${build.core.classes.dir}" destdir="./native/src"/>

Modified: branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
===================================================================
--- branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -38,7 +38,7 @@
       if (packet instanceof SessionSendMessage)
       {
          SessionSendMessage realPacket = (SessionSendMessage)packet;
-         Message msg = realPacket.getServerMessage();
+         Message msg = realPacket.getMessage();
          msg.putStringProperty(new SimpleString("newproperty"), new SimpleString("Hello from interceptor!"));
       }
       //We return true which means "call next interceptor" (if there is one) or target.

Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -207,7 +207,7 @@
 
    String readString();
 
-   String readUTF() throws Exception;
+   String readUTF();
 
    void writeBoolean(boolean val);
 
@@ -219,5 +219,5 @@
 
    void writeString(String val);
 
-   void writeUTF(String utf) throws Exception;
+   void writeUTF(String utf);
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -37,7 +37,7 @@
    protected ChannelBuffer buffer;
 
    public ChannelBufferWrapper(final ChannelBuffer buffer)
-   {
+   { 
       this.buffer = buffer;
    }
    
@@ -48,69 +48,79 @@
 
    public SimpleString readNullableSimpleString()
    {
-      int b = readByte();
+      int b = buffer.readByte();
       if (b == DataConstants.NULL)
       {
          return null;
       }
       else
       {
-         return readSimpleString();
+         return readSimpleStringInternal();
       }
    }
 
    public String readNullableString()
    {
-      int b = readByte();
+      int b = buffer.readByte();
       if (b == DataConstants.NULL)
       {
          return null;
       }
       else
       {
-         return readString();
+         return readStringInternal();
       }
    }
 
    public SimpleString readSimpleString()
    {
-      int len = readInt();
+      return readSimpleStringInternal();
+   }
+   
+   private SimpleString readSimpleStringInternal()
+   {
+      int len = buffer.readInt();
       byte[] data = new byte[len];
-      readBytes(data);
+      buffer.readBytes(data);
       return new SimpleString(data);
    }
 
    public String readString()
    {
-      int len = readInt();
+      return readStringInternal();
+   }
+   
+   private String readStringInternal()
+   {
+      int len = buffer.readInt();
       char[] chars = new char[len];
       for (int i = 0; i < len; i++)
       {
-         chars[i] = (char)readShort();
+         chars[i] = (char)buffer.readShort();
       }
       return new String(chars);
    }
 
-   public String readUTF() throws Exception
+   public String readUTF()
    {
       return UTF8Util.readUTF(this);
    }
 
    public void writeBoolean(final boolean val)
    {
-      writeByte((byte)(val ? -1 : 0));
+      buffer.writeByte((byte)(val ? -1 : 0));
    }
 
    public void writeNullableSimpleString(final SimpleString val)
    {
       if (val == null)
       {
-         writeByte(DataConstants.NULL);
+         buffer.writeByte(DataConstants.NULL);
       }
       else
       {
-         writeByte(DataConstants.NOT_NULL);
-         writeSimpleString(val);
+         buffer.writeByte(DataConstants.NOT_NULL);
+         writeSimpleStringInternal(val);
       }
    }
 
@@ -118,32 +128,42 @@
    {
       if (val == null)
       {
-         writeByte(DataConstants.NULL);
+         buffer.writeByte(DataConstants.NULL);
       }
       else
       {
-         writeByte(DataConstants.NOT_NULL);
-         writeString(val);
+         buffer.writeByte(DataConstants.NOT_NULL);
+         writeStringInternal(val);
       }
    }
 
    public void writeSimpleString(final SimpleString val)
    {
+      writeSimpleStringInternal(val);
+   }
+   
+   private void writeSimpleStringInternal(final SimpleString val)
+   {
       byte[] data = val.getData();
-      writeInt(data.length);
-      writeBytes(data);
+      buffer.writeInt(data.length);
+      buffer.writeBytes(data);
    }
 
    public void writeString(final String val)
    {
-      writeInt(val.length());
+      writeStringInternal(val);
+   }
+   
+   private void writeStringInternal(final String val)
+   {
+      buffer.writeInt(val.length());
       for (int i = 0; i < val.length(); i++)
       {
-         writeShort((short)val.charAt(i));
+         buffer.writeShort((short)val.charAt(i));
       }
    }
 
-   public void writeUTF(final String utf) throws Exception
+   public void writeUTF(final String utf)
    {
       UTF8Util.saveUTF(this, utf);
    }
@@ -154,7 +174,6 @@
       return buffer.capacity();
    }
 
-
    public ChannelBuffer channelBuffer()
    {
       return buffer;

Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -13,8 +13,12 @@
 
 package org.hornetq.core.buffers.impl;
 
+import java.nio.ByteBuffer;
+
 import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.utils.SimpleString;
 
 /**
  * A ResetLimitWrappedHornetQBuffer
@@ -27,8 +31,11 @@
    private static final Logger log = Logger.getLogger(ResetLimitWrappedHornetQBuffer.class);
 
    private final int limit;
-      
-   public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer)
+   
+   private Message message;
+   
+   public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer,
+                                         final Message message)
    {
       super(buffer.channelBuffer());
       
@@ -40,7 +47,14 @@
       }
       
       readerIndex(limit);
+      
+      this.message = message;
    }
+      
+   private void changed()
+   {
+      message.bodyChanged();
+   }
    
    public void setBuffer(HornetQBuffer buffer)
    {      
@@ -49,9 +63,12 @@
    
    public void clear()
    {
+      changed();
+      
       buffer.clear();
       
       buffer.setIndex(limit, limit);
+
    }
 
    public void readerIndex(int readerIndex)
@@ -71,11 +88,15 @@
 
    public void resetWriterIndex()
    {
+      changed();
+      
       buffer.writerIndex(limit);
    }
 
    public void setIndex(int readerIndex, int writerIndex)
    {
+      changed();
+      
       if (readerIndex < limit)
       {
          readerIndex = limit;
@@ -89,10 +110,261 @@
 
    public void writerIndex(int writerIndex)
    {
+      changed();
+      
       if (writerIndex < limit)
       {
          writerIndex = limit;
       }
+      
       buffer.writerIndex(writerIndex);
    }
+
+   @Override
+   public void setByte(int index, byte value)
+   {
+      changed();
+      
+      super.setByte(index, value);
+   }
+
+   @Override
+   public void setBytes(int index, byte[] src, int srcIndex, int length)
+   {      
+      changed();
+      
+      super.setBytes(index, src, srcIndex, length);
+   }
+
+   @Override
+   public void setBytes(int index, byte[] src)
+   {      
+      changed();
+      
+      super.setBytes(index, src);
+   }
+
+   @Override
+   public void setBytes(int index, ByteBuffer src)
+   {
+      changed();
+      
+      super.setBytes(index, src);
+   }
+
+   @Override
+   public void setBytes(int index, HornetQBuffer src, int srcIndex, int length)
+   {
+      changed();
+      
+      super.setBytes(index, src, srcIndex, length);
+   }
+
+   @Override
+   public void setBytes(int index, HornetQBuffer src, int length)
+   {
+      changed();
+      
+      super.setBytes(index, src, length);
+   }
+
+   @Override
+   public void setBytes(int index, HornetQBuffer src)
+   {
+      changed();
+      
+      super.setBytes(index, src);
+   }
+
+   @Override
+   public void setChar(int index, char value)
+   {
+      changed();
+      
+      super.setChar(index, value);
+   }
+
+   @Override
+   public void setDouble(int index, double value)
+   {
+      changed();
+      
+      super.setDouble(index, value);
+   }
+
+   @Override
+   public void setFloat(int index, float value)
+   {
+      changed();
+      
+      super.setFloat(index, value);
+   }
+
+   @Override
+   public void setInt(int index, int value)
+   {
+      changed();
+      
+      super.setInt(index, value);
+   }
+
+   @Override
+   public void setLong(int index, long value)
+   {
+      changed();
+      
+      super.setLong(index, value);
+   }
+
+   @Override
+   public void setShort(int index, short value)
+   {
+      changed();
+      
+      super.setShort(index, value);
+   }
+
+   @Override
+   public void writeBoolean(boolean val)
+   {
+      changed();
+      
+      super.writeBoolean(val);
+   }
+
+   @Override
+   public void writeByte(byte value)
+   {
+      changed();
+      
+      super.writeByte(value);
+   }
+
+   @Override
+   public void writeBytes(byte[] src, int srcIndex, int length)
+   {
+      changed();
+      
+      super.writeBytes(src, srcIndex, length);
+   }
+
+   @Override
+   public void writeBytes(byte[] src)
+   {
+      changed();
+      
+      super.writeBytes(src);
+   }
+
+   @Override
+   public void writeBytes(ByteBuffer src)
+   {
+      changed();
+      
+      super.writeBytes(src);
+   }
+
+   @Override
+   public void writeBytes(HornetQBuffer src, int srcIndex, int length)
+   {
+      changed();
+      
+      super.writeBytes(src, srcIndex, length);
+   }
+
+   @Override
+   public void writeBytes(HornetQBuffer src, int length)
+   {
+      changed();
+      
+      super.writeBytes(src, length);
+   }
+
+   @Override
+   public void writeChar(char chr)
+   {
+      changed();
+      
+      super.writeChar(chr);
+   }
+
+   @Override
+   public void writeDouble(double value)
+   {
+      changed();
+      
+      super.writeDouble(value);
+   }
+
+   @Override
+   public void writeFloat(float value)
+   {
+      changed();
+      
+      super.writeFloat(value);
+   }
+
+   @Override
+   public void writeInt(int value)
+   {
+      changed();
+      
+      super.writeInt(value);
+   }
+
+   @Override
+   public void writeLong(long value)
+   {
+      changed();
+      
+      super.writeLong(value);
+   }
+
+   @Override
+   public void writeNullableSimpleString(SimpleString val)
+   {
+      changed();
+      
+      super.writeNullableSimpleString(val);
+   }
+
+   @Override
+   public void writeNullableString(String val)
+   {
+      changed();
+      
+      super.writeNullableString(val);
+    }
+
+   @Override
+   public void writeShort(short value)
+   {
+      changed();
+      
+      super.writeShort(value);
+   }
+
+   @Override
+   public void writeSimpleString(SimpleString val)
+   {
+      changed();
+      
+      super.writeSimpleString(val);
+   }
+
+   @Override
+   public void writeString(String val)
+   {
+      changed();
+      
+      super.writeString(val);
+   }
+
+   @Override
+   public void writeUTF(String utf)
+   {
+      changed();
+      
+      super.writeUTF(utf);
+   }
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -216,7 +216,7 @@
                      
                      // we only force delivery once per call to receive
                      if (!deliveryForced)
-                     {
+                     {                   
                         session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
                         
                         deliveryForced = true;
@@ -249,7 +249,7 @@
                session.workDone();
                
                if (m.containsProperty(FORCED_DELIVERY_MESSAGE))
-               {
+               {               
                   long seq = m.getLongProperty(FORCED_DELIVERY_MESSAGE);
                   if (seq >= forceDeliveryCount.longValue())
                   {
@@ -451,7 +451,7 @@
          // This is ok - we just ignore the message
          return;
       }
-
+ 
       ClientMessageInternal messageToHandle = message;
 
       messageToHandle.onReceipt(this);

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -239,5 +239,16 @@
    {
       this.bodyInputStream = bodyInputStream;
    }
+   
+ public void setBuffer(HornetQBuffer buffer)
+ {
+    this.buffer = buffer; 
+    
+    if (bodyBuffer != null)
+    {
+       bodyBuffer.setBuffer(buffer);
+    }
+ }
 
+
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -34,13 +34,13 @@
    void setFlowControlSize(int flowControlSize);
 
    void onReceipt(ClientConsumerInternal consumer);
-   
+
    void setLargeMessage(boolean largeMessage);
 
    /**
     * Discard unused packets (used on large-message)
     */
-   void discardLargeBody();    
-   
+   void discardLargeBody();
+
    void setBuffer(HornetQBuffer buffer);
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -689,6 +689,8 @@
       if (consumer != null)
       {
          ClientMessageInternal clMessage = (ClientMessageInternal)message.getMessage();
+         
+         clMessage.setDeliveryCount(message.getDeliveryCount());
 
          if (trace)
          {

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -68,7 +68,7 @@
                break;
             }
             case SESS_RECEIVE_MSG:
-            {
+            {              
                SessionReceiveMessage message = (SessionReceiveMessage) packet;
                
                clientSession.handleReceiveMessage(message.getConsumerID(), message);               

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -1098,7 +1098,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.remoting.spi.HornetQBuffer#readUTF()
     */
-   public String readUTF() throws Exception
+   public String readUTF()
    {
       return UTF8Util.readUTF(this);
    }
@@ -1172,7 +1172,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.remoting.spi.HornetQBuffer#writeUTF(java.lang.String)
     */
-   public void writeUTF(final String utf) throws Exception
+   public void writeUTF(final String utf)
    {
       throw new IllegalAccessError(READ_ONLY_ERROR_MESSAGE);
    }

Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -420,7 +420,7 @@
       bb.writeInt(fileId);
       bb.writeLong(id);
       bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
+      bb.writeByte(recordType);      
       record.encode(bb);       
       bb.writeInt(size);     
    }

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -69,16 +69,27 @@
    
    HornetQBuffer getBodyBuffer();
    
+   
+   //Should the following methods really be on the public API?
+   
    void decodeFromBuffer(HornetQBuffer buffer);
    
    HornetQBuffer encodeToBuffer();
    
    int getEndOfMessagePosition();
    
+  // void setEndOfBodyPosition();
+   
    int getEndOfBodyPosition();
    
-   void forceCopy();
+   void checkCopy();
    
+   void bodyChanged();
+   
+   void resetCopied();
+   
+//   void resetEndOfBodyPosition();
+   
    // Properties
    // ------------------------------------------------------------------
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -15,6 +15,7 @@
 
 import static org.hornetq.utils.DataConstants.SIZE_BOOLEAN;
 import static org.hornetq.utils.DataConstants.SIZE_BYTE;
+import static org.hornetq.utils.DataConstants.SIZE_INT;
 import static org.hornetq.utils.DataConstants.SIZE_LONG;
 
 import java.nio.ByteBuffer;
@@ -73,7 +74,7 @@
    public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("_HQ_FROM_CLUSTER");
 
    public static final SimpleString HDR_LAST_VALUE_NAME = new SimpleString("_HQ_LVQ_NAME");
-   
+
    // Attributes ----------------------------------------------------
 
    protected long messageID;
@@ -94,9 +95,9 @@
    protected byte priority;
 
    protected HornetQBuffer buffer;
-   
-   private int encodeSize;
-     
+
+  // private int encodeSize;
+
    // Constructors --------------------------------------------------
 
    protected MessageImpl()
@@ -126,61 +127,76 @@
       this.expiration = expiration;
       this.timestamp = timestamp;
       this.priority = priority;
-      createBody(initialMessageBufferSize);      
+      createBody(initialMessageBufferSize);
    }
-         
+
    protected MessageImpl(final long messageID, final int initialMessageBufferSize)
    {
       this();
       this.messageID = messageID;
       createBody(initialMessageBufferSize);
    }
-   
+
    private void createBody(final int initialMessageBufferSize)
    {
       buffer = HornetQChannelBuffers.dynamicBuffer(initialMessageBufferSize);
-      
-      //There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+
+      // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
       buffer.writeByte((byte)0);
-      
+
       int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
-      
+
       buffer.setIndex(limit, limit);
-      
-      endOfBodyPosition = -1;
+
+      //endOfBodyPosition = limit;
    }
 
    // Message implementation ----------------------------------------
 
    public int getEncodeSize()
+   {    
+      int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();     
+      
+      int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() : this.endOfBodyPosition;
+                       
+      int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
+
+      return SIZE_INT + bodySize + SIZE_INT + headersPropsSize;
+   }
+
+   public int getHeadersAndPropertiesEncodeSize()
    {
-      return encodeSize;
+      return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
+      /* Type */SIZE_BYTE +
+      /* Durable */SIZE_BOOLEAN +
+      /* Expiration */SIZE_LONG +
+      /* Timestamp */SIZE_LONG +
+      /* Priority */SIZE_BYTE +
+      /* PropertySize and Properties */properties.getEncodeSize();
    }
-  
+
    public void encodeHeadersAndProperties(final HornetQBuffer buffer)
-   {           
-      buffer.writeLong(messageID);          
-      buffer.writeSimpleString(destination);    
+   {
+      buffer.writeLong(messageID);
+      buffer.writeSimpleString(destination);
       buffer.writeByte(type);
       buffer.writeBoolean(durable);
       buffer.writeLong(expiration);
       buffer.writeLong(timestamp);
       buffer.writeByte(priority);
-      properties.encode(buffer);
-      encodeSize = buffer.writerIndex() - PacketImpl.PACKET_HEADERS_SIZE;
+      properties.encode(buffer);      
    }
-   
+
    public void decodeHeadersAndProperties(final HornetQBuffer buffer)
-   {             
-      messageID = buffer.readLong();        
-      destination = buffer.readSimpleString();      
+   {
+      messageID = buffer.readLong();
+      destination = buffer.readSimpleString();
       type = buffer.readByte();
       durable = buffer.readBoolean();
       expiration = buffer.readLong();
       timestamp = buffer.readLong();
       priority = buffer.readByte();
-      properties.decode(buffer);
-      encodeSize = buffer.readerIndex() - PacketImpl.PACKET_HEADERS_SIZE;
+      properties.decode(buffer);      
    }
 
    public long getMessageID()
@@ -196,6 +212,8 @@
    public void setDestination(final SimpleString destination)
    {
       this.destination = destination;
+
+      bufferValid = false;
    }
 
    public byte getType()
@@ -211,6 +229,8 @@
    public void setDurable(final boolean durable)
    {
       this.durable = durable;
+
+      bufferValid = false;
    }
 
    public long getExpiration()
@@ -221,6 +241,8 @@
    public void setExpiration(final long expiration)
    {
       this.expiration = expiration;
+
+      bufferValid = false;
    }
 
    public long getTimestamp()
@@ -231,6 +253,8 @@
    public void setTimestamp(final long timestamp)
    {
       this.timestamp = timestamp;
+
+      bufferValid = false;
    }
 
    public byte getPriority()
@@ -241,6 +265,8 @@
    public void setPriority(final byte priority)
    {
       this.priority = priority;
+
+      bufferValid = false;
    }
 
    public boolean isExpired()
@@ -252,7 +278,7 @@
 
       return System.currentTimeMillis() - expiration >= 0;
    }
- 
+
    public Map<String, Object> toMap()
    {
       Map<String, Object> map = new HashMap<String, Object>();
@@ -277,46 +303,64 @@
    public void putBooleanProperty(final SimpleString key, final boolean value)
    {
       properties.putBooleanProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putByteProperty(final SimpleString key, final byte value)
    {
       properties.putByteProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putBytesProperty(final SimpleString key, final byte[] value)
    {
       properties.putBytesProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putShortProperty(final SimpleString key, final short value)
    {
       properties.putShortProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putIntProperty(final SimpleString key, final int value)
    {
       properties.putIntProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putLongProperty(final SimpleString key, final long value)
    {
       properties.putLongProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putFloatProperty(final SimpleString key, final float value)
    {
       properties.putFloatProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putDoubleProperty(final SimpleString key, final double value)
    {
       properties.putDoubleProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putStringProperty(final SimpleString key, final SimpleString value)
    {
       properties.putSimpleStringProperty(key, value);
+
+      bufferValid = false;
    }
 
    public void putObjectProperty(final SimpleString key, final Object value) throws PropertyConversionException
@@ -324,10 +368,10 @@
       if (value == null)
       {
          // This is ok - when we try to read the same key it will return null too
-         return;
+
+         properties.removeProperty(key);
       }
-
-      if (value instanceof Boolean)
+      else if (value instanceof Boolean)
       {
          properties.putBooleanProperty(key, (Boolean)value);
       }
@@ -363,61 +407,85 @@
       {
          throw new PropertyConversionException(value.getClass() + " is not a valid property type");
       }
+
+      bufferValid = false;
    }
 
    public void putObjectProperty(final String key, final Object value) throws PropertyConversionException
    {
       putObjectProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putBooleanProperty(final String key, final boolean value)
    {
       properties.putBooleanProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putByteProperty(final String key, final byte value)
    {
       properties.putByteProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putBytesProperty(final String key, final byte[] value)
    {
       properties.putBytesProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putShortProperty(final String key, final short value)
    {
       properties.putShortProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putIntProperty(final String key, final int value)
    {
       properties.putIntProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putLongProperty(final String key, final long value)
    {
       properties.putLongProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putFloatProperty(final String key, final float value)
    {
       properties.putFloatProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putDoubleProperty(final String key, final double value)
    {
       properties.putDoubleProperty(new SimpleString(key), value);
+
+      bufferValid = false;
    }
 
    public void putStringProperty(final String key, final String value)
    {
       properties.putSimpleStringProperty(new SimpleString(key), new SimpleString(value));
+
+      bufferValid = false;
    }
 
    public void putTypedProperties(final TypedProperties otherProps)
    {
       properties.putTypedProperties(otherProps);
+
+      bufferValid = false;
    }
 
    public Object getObjectProperty(final SimpleString key)
@@ -541,11 +609,15 @@
 
    public Object removeProperty(final SimpleString key)
    {
+      bufferValid = false;
+
       return properties.removeProperty(key);
    }
 
    public Object removeProperty(final String key)
    {
+      bufferValid = false;
+
       return properties.removeProperty(new SimpleString(key));
    }
 
@@ -573,32 +645,22 @@
    {
       return buffer;
    }
-   
-   public void setBuffer(HornetQBuffer buffer)
-   {
-      this.buffer = buffer; 
-      
-      if (bodyBuffer != null)
-      {
-         bodyBuffer.setBuffer(buffer);
-      }
-   }
 
    public BodyEncoder getBodyEncoder()
    {
       return new DecodingContext();
    }
-      
+
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-   
+
    private class DecodingContext implements BodyEncoder
    {
       private int lastPos = 0;
@@ -628,151 +690,185 @@
          return size;
       }
    }
-   
-   //FIXME - all this stuff only used by large messages, move it!
-   
-   public int getHeadersAndPropertiesEncodeSize()
-   {
-      return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
-      /* Type */SIZE_BYTE +
-      /* Durable */SIZE_BOOLEAN +
-      /* Expiration */SIZE_LONG +
-      /* Timestamp */SIZE_LONG +
-      /* Priority */SIZE_BYTE +
-      /* PropertySize and Properties */properties.getEncodeSize();
-   }
-   
-   
-   
-   
-   
-   private ResetLimitWrappedHornetQBuffer bodyBuffer;
 
+
+   protected ResetLimitWrappedHornetQBuffer bodyBuffer;
+
    public HornetQBuffer getBodyBuffer()
    {
       if (bodyBuffer == null)
       {
          if (buffer instanceof LargeMessageBuffer == false)
          {
-            bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, buffer);
+            bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
+                                                            buffer,
+                                                            this);
          }
          else
          {
             return buffer;
          }
       }
-      
+
       return bodyBuffer;
    }
-   
-     
-   
+
    protected boolean bufferValid;
 
-   private int endOfBodyPosition;
-   
+   private int endOfBodyPosition = -1;
+
    private int endOfMessagePosition;
+
+   private boolean copied = true;
+
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other)
+   {
+      messageID = other.getMessageID();
+      destination = other.getDestination();
+      type = other.getType();
+      durable = other.isDurable();
+      expiration = other.getExpiration();
+      timestamp = other.getTimestamp();
+      priority = other.getPriority();
+      properties = new TypedProperties(other.getProperties());
+      // Note, this is a shallow copy - does not copy the buffer
+      buffer = other.buffer;
+      this.bufferValid = other.bufferValid;
+      this.endOfBodyPosition = other.endOfBodyPosition;
+      this.endOfMessagePosition = other.endOfMessagePosition;
+      this.copied = other.copied;
+   }
+
+   public void bodyChanged()
+   {
+      // If the body is changed we must copy the buffer otherwise can affect the previously sent message
+      // which might be in the Netty write queue
+      checkCopy();
+
+      bufferValid = false;
+      
+      this.endOfBodyPosition = -1;
+   }
+
+   public void checkCopy()
+   {
+      if (!copied)
+      {
+         forceCopy();
+
+         copied = true;
+      }
+   }
    
-   public void forceCopy()
+   public void resetCopied()
    {
-      // Must copy buffer before sending it      
-      int wi = buffer.writerIndex();
+      copied = false;
+   }
+
+   private void forceCopy()
+   {
+      // Must copy buffer before sending it
       
-      this.buffer = buffer.copy(0, buffer.capacity());
+      buffer = buffer.copy(0, buffer.capacity());
       
-      this.buffer.setIndex(0, wi);      
+      buffer.setIndex(0, this.endOfBodyPosition);
+            
+      if (bodyBuffer != null)
+      {
+         bodyBuffer.setBuffer(buffer);
+      }
    }
-      
+   
    public int getEndOfMessagePosition()
    {
       return this.endOfMessagePosition;
    }
-   
+
    public int getEndOfBodyPosition()
    {
       return this.endOfBodyPosition;
    }
    
-   //Encode to journal or paging
+   // Encode to journal or paging
    public void encode(HornetQBuffer buff)
    {
       encodeToBuffer();
       
       buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
    }
-   
-   //Decode from journal or paging
+
+   // Decode from journal or paging
    public void decode(HornetQBuffer buff)
    {
       int start = buff.readerIndex();
 
       endOfBodyPosition = buff.readInt();
-      
+
       endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
-      
-      int endPos = endOfMessagePosition + start -
-                   PacketImpl.PACKET_HEADERS_SIZE;
 
-      this.buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+      int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
 
-      buff.writeBytes(buffer, start, endPos - start);
-      
-      decode(); 
+      buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+      buffer.writeBytes(buff, start, length);
+
+      decode();
+
+      buff.readerIndex(start + length);
    }
 
-   public HornetQBuffer encodeToBuffer()
-   {      
-      log.info("encoding msg to buffer, valid " + bufferValid);
-      
+   //This must be synchronized as it can be called concurrently id the message is being delivered concurently to
+   //many queues - the first caller in this case will actually encode it
+   public synchronized HornetQBuffer encodeToBuffer()
+   {     
       if (!bufferValid)
-      {
+      {                 
          if (endOfBodyPosition == -1)
-         {            
-            //Means sending message for first time
-            endOfBodyPosition = buffer.writerIndex();
-
-            //write it
-            buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
-            
-            log.info("setting end of body pos as " + endOfBodyPosition);
+         {
+            // Means sending message for first time
+            endOfBodyPosition = buffer.writerIndex();           
          }
          
+         // write it
+         buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
+
          // Position at end of body and skip past the message end position int
-         buffer.writerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
+         buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
 
          encodeHeadersAndProperties(buffer);
 
          // Write end of message position
-         
+
          this.endOfMessagePosition = buffer.writerIndex();
 
          buffer.setInt(endOfBodyPosition, endOfMessagePosition);
-                 
+
          this.bufferValid = true;
       }
-
+            
       return buffer;
    }
 
    public void decode()
    {
       this.endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
-            
+
       buffer.readerIndex(this.endOfBodyPosition + DataConstants.SIZE_INT);
 
       this.decodeHeadersAndProperties(buffer);
-      
+
       this.endOfMessagePosition = buffer.readerIndex();
-      
-      log.info("decoded end of body pos as " + this.endOfBodyPosition);
 
       this.bufferValid = true;
    }
-   
+
    public void decodeFromBuffer(HornetQBuffer buffer)
    {
       this.buffer = buffer;
-      
+
       decode();
    }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -115,7 +115,7 @@
                int oldPos = fileBuffer.readerIndex();
                if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == END_BYTE)
                {
-                  PagedMessage msg = new PagedMessageImpl();
+                  PagedMessage msg = new PagedMessageImpl();                  
                   msg.decode(fileBuffer);
                   byte b = fileBuffer.readByte();
                   if (b != END_BYTE)
@@ -124,7 +124,7 @@
                      // constraint was already checked
                      throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
                   }
-                  messages.add(msg);
+                  messages.add(msg);                  
                }
                else
                {
@@ -147,15 +147,11 @@
 
    public void write(final PagedMessage message) throws Exception
    {
-      log.info("encode size is " + message.getEncodeSize());
-      
       ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
       
       HornetQBuffer wrap = HornetQChannelBuffers.wrappedBuffer(buffer);
       wrap.clear();
       
-      log.info("wrapped " + wrap.channelBuffer());
-      
       wrap.writeByte(START_BYTE);
       wrap.writeInt(0);
       int startIndex = wrap.writerIndex();
@@ -172,8 +168,6 @@
       size.addAndGet(buffer.limit());
       
       storageManager.pageWrite(message, pageId);
-      
-      log.info("wrote page");
    }
 
    public void sync() throws Exception

Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -68,8 +68,7 @@
    }
 
    public PagedMessageImpl()
-   {
-      this(new ServerMessageImpl());
+   {      
    }
 
    public ServerMessage getMessage(final StorageManager storage)
@@ -109,11 +108,10 @@
       {
          buffer.readInt(); // This value is only used on LargeMessages for now
          
-         message = new ServerMessageImpl();
+         message = new ServerMessageImpl(-1, 50);         
          
          message.decode(buffer);
       }
-
    }
 
    public void encode(final HornetQBuffer buffer)

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -13,20 +13,20 @@
 
 package org.hornetq.core.remoting.impl.wireformat;
 
-import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.utils.DataConstants;
 
 /**
  * A MessagePacket
  *
- * @author tim
+ * @author Tim Fox
  *
  *
  */
 public abstract class MessagePacket extends PacketImpl
 {
+   private static final Logger log = Logger.getLogger(MessagePacket.class);
+
    protected Message message;
       
    public MessagePacket(final byte type, final Message message)
@@ -41,41 +41,4 @@
       return message;
    }
    
-   @Override
-   public HornetQBuffer encode(final RemotingConnection connection)
-   {
-      HornetQBuffer buffer = message.encodeToBuffer();
-      
-      buffer.setIndex(0, message.getEndOfMessagePosition());
-      
-      encodeExtraData(buffer);
-      
-      size = buffer.writerIndex();
-                       
-      //Write standard headers
-      
-      int len = size - DataConstants.SIZE_INT;
-      buffer.setInt(0, len);
-      buffer.setByte(DataConstants.SIZE_INT, type);
-      buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
-      
-      //Position reader for reading by Netty
-      buffer.readerIndex(0);
-      
-      return buffer;
-   }
-   
-   @Override
-   public void decodeRest(HornetQBuffer buffer)
-   {
-      //Buffer comes in after having read standard headers and positioned at Beginning of body part
-      
-      message.decodeFromBuffer(buffer);
-      
-      decodeExtraData(buffer);      
-   }
-   
-   protected abstract void encodeExtraData(HornetQBuffer buffer);
-   
-   protected abstract void decodeExtraData(HornetQBuffer buffer);
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -78,8 +78,7 @@
       buffer.writeByte(journalID);
       buffer.writeBoolean(isUpdate);
       buffer.writeLong(id);
-      buffer.writeByte(recordType);
-      log.info("encode size is " + encodingData.getEncodeSize());
+      buffer.writeByte(recordType);     
       buffer.writeInt(encodingData.getEncodeSize());
       encodingData.encode(buffer);
    }

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -17,6 +17,7 @@
 import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.utils.DataConstants;
 
 /**
@@ -43,7 +44,12 @@
 
       this.deliveryCount = deliveryCount;
       
-      message.forceCopy();
+      //If the message hasn't already been copied when the headers/properties/body was changed since last send
+      //(which will prompt an invalidate(), which will cause a copy if not copied already)
+      //Then the message needs to be copied before delivering - the previous send may be in the Netty write queue
+      //so we can't just use the same buffer. Also we can't just duplicate, since the extra data (consumerID, deliveryCount)
+      //may well be different on different calls
+      //message.forceCopy();
    }
 
    public SessionReceiveMessage()
@@ -67,31 +73,55 @@
 
    // Protected -----------------------------------------------------
    
-   protected void encodeExtraData(HornetQBuffer buffer)
+   @Override
+   public HornetQBuffer encode(final RemotingConnection connection)
    {
+      //message.setEndOfBodyPosition();
+      
+      HornetQBuffer orig = message.encodeToBuffer();
+      
+      //Now we must copy this buffer, before sending to Netty, as it could be concurrently delivered to many consumers
+      
+      HornetQBuffer buffer = orig.copy(0, orig.capacity());
+
+      buffer.setIndex(0, message.getEndOfMessagePosition());
+      
       buffer.writeLong(consumerID);
       buffer.writeInt(deliveryCount);
+      
+      size = buffer.writerIndex();
+                       
+      //Write standard headers
+      
+      int len = size - DataConstants.SIZE_INT;
+      buffer.setInt(0, len);
+      buffer.setByte(DataConstants.SIZE_INT, type);
+      buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+      
+      //Position reader for reading by Netty
+      buffer.readerIndex(0);
+      
+      return buffer;
    }
    
-   protected void decodeExtraData(HornetQBuffer buffer)
-   {
-      consumerID = buffer.readLong();
-      deliveryCount = buffer.readInt();
-   }
    @Override
-   public void decodeRest(HornetQBuffer buffer)
+   public void decode(HornetQBuffer buffer)
    {
-      //Buffer comes in after having read standard headers and positioned at Beginning of body part
-      
+      channelID = buffer.readLong();
+
       message.decodeFromBuffer(buffer);
       
-      decodeExtraData(buffer);      
+      consumerID = buffer.readLong();
       
+      deliveryCount = buffer.readInt();  
+      
+      size = buffer.readerIndex();
+      
       //Need to position buffer for reading
       
-      buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
+      buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());    
    }
-
+   
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -16,7 +16,9 @@
 import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -43,7 +45,12 @@
       
       this.requiresResponse = requiresResponse;
       
-      message.forceCopy();
+      //If the message hasn't already been copied when the headers/properties/body was changed since last send
+      //(which will prompt an invalidate(), which will cause a copy if not copied already)
+      //Then the message needs to be copied before sending - the previous send may be in the Netty write queue
+      //so we can't just use the same buffer. Also we can't just duplicate, since the extra data (requiresResponse)
+      //may be different on different calls
+      message.checkCopy();
    }
    
    public SessionSendMessage()
@@ -62,16 +69,49 @@
 
    // Protected -----------------------------------------------------
    
-   protected void encodeExtraData(HornetQBuffer buffer)
+   @Override
+   public HornetQBuffer encode(final RemotingConnection connection)
    {
+      //this isn't right when forwarding a message that has been already received - because writerindex will
+      //be pointing at end of message
+      
+      HornetQBuffer orig = message.encodeToBuffer();
+      
+      //FIXME - for now we are copying due to concurrent sends to many bridges on the server
+      
+      HornetQBuffer buffer = orig.copy(0, orig.capacity());
+      
+      buffer.setIndex(0, message.getEndOfMessagePosition());
+      
       buffer.writeBoolean(requiresResponse);
+      
+      size = buffer.writerIndex();
+                       
+      //Write standard headers
+      
+      int len = size - DataConstants.SIZE_INT;
+      buffer.setInt(0, len);
+      buffer.setByte(DataConstants.SIZE_INT, type);
+      buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+      
+      //Position reader for reading by Netty
+      buffer.readerIndex(0);
+      
+      message.resetCopied();
+      
+      return buffer;
    }
    
-   protected void decodeExtraData(HornetQBuffer buffer)
+   @Override
+   public void decodeRest(HornetQBuffer buffer)
    {
-      requiresResponse = buffer.readBoolean();
+      //Buffer comes in after having read standard headers and positioned at Beginning of body part
+      
+      message.decodeFromBuffer(buffer);
+      
+      requiresResponse = buffer.readBoolean();     
    }
-
+   
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -178,7 +178,7 @@
    }
    
    public void route(final ServerMessage message, final RoutingContext context)
-   {
+   { 
       byte[] ids = message.getBytesProperty(idsHeaderName);
       
       if (ids == null)

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -184,12 +184,13 @@
    }
 
    public HandleStatus handle(final MessageReference ref) throws Exception
-   { 
+   {
       if (availableCredits != null && availableCredits.get() <= 0)
-      {         
+      {
+
          return HandleStatus.BUSY;
       }
-      
+
       lock.lock();
 
       try
@@ -210,7 +211,7 @@
          }
 
          final ServerMessage message = ref.getMessage();
-         
+
          if (filter != null && !filter.match(message))
          {
             return HandleStatus.NO_MATCH;
@@ -344,17 +345,23 @@
       {
          public void run()
          {
-            promptDelivery(false);
+            try
+            {            
+               promptDelivery(false);
 
-            ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(),
-                                                                        50);
+               ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
 
-            forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
-            forcedDeliveryMessage.setDestination(messageQueue.getName());
+               forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+               forcedDeliveryMessage.setDestination(messageQueue.getName());
 
-            final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+               final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
 
-            channel.send(packet);
+               channel.send(packet);
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to send forced delivery message", e);
+            }
          }
       });
    }
@@ -409,7 +416,8 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {      
+   {
+
       if (credits == -1)
       {
          // No flow control
@@ -449,7 +457,7 @@
 
       // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
       // acknowledged
-      
+
       MessageReference ref;
       do
       {
@@ -576,13 +584,13 @@
     * @param message
     */
    private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
-   {
+   {     
       final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
-      
+
       channel.send(packet);
-      
+
       if (availableCredits != null)
-      {         
+      {
          availableCredits.addAndGet(-packet.getPacketSize());
       }
 
@@ -673,18 +681,19 @@
                largeMessage.encodeHeadersAndProperties(headerBuffer);
 
                SessionReceiveLargeMessage initialPacket = new SessionReceiveLargeMessage(id,
-                                                                                         headerBuffer.toByteBuffer().array(),
+                                                                                         headerBuffer.toByteBuffer()
+                                                                                                     .array(),
                                                                                          largeMessage.getLargeBodySize(),
                                                                                          ref.getDeliveryCount());
 
                context = largeMessage.getBodyEncoder();
 
                context.open();
-               
+
                sentInitialPacket = true;
 
                channel.send(initialPacket);
-               
+
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-initialPacket.getPacketSize());
@@ -712,16 +721,16 @@
                SessionReceiveContinuationMessage chunk = createChunkSend(context);
 
                int chunkLen = chunk.getBody().length;
-                              
+
                channel.send(chunk);
-               
+
                if (trace)
                {
                   trace("deliverLargeMessage: Sending " + chunk.getPacketSize() +
                         " availableCredits now is " +
                         availableCredits);
                }
-               
+
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-chunk.getPacketSize());
@@ -847,7 +856,7 @@
          {
             MessageReference ref = iterator.next();
             try
-            {
+            {              
                HandleStatus status = handle(ref);
                if (status == HandleStatus.BUSY)
                {

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -17,7 +17,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.PropertyConversionException;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
@@ -71,17 +70,7 @@
     */
    protected ServerMessageImpl(final ServerMessageImpl other)
    {
-      this();
-      messageID = other.getMessageID();
-      destination = other.getDestination();
-      type = other.getType();
-      durable = other.isDurable();
-      expiration = other.getExpiration();
-      timestamp = other.getTimestamp();
-      priority = other.getPriority();
-      properties = new TypedProperties(other.getProperties());
-      //Note, this is a shallow copy - does not copy the buffer
-      buffer = other.buffer;
+      super(other);           
    }
 
    public void setMessageID(final long id)
@@ -310,219 +299,7 @@
    {
       // We first set the message id - this needs to be set on the buffer since this buffer will be re-used
       
-      //log.info(System.identityHashCode(this) + " encoded message id " + messageID + " etb " + this.encodedToBuffer);
-      
       buffer.setLong(buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE) + DataConstants.SIZE_INT, messageID);
    }
 
-   @Override
-   public void putBooleanProperty(SimpleString key, boolean value)
-   {      
-      super.putBooleanProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putBooleanProperty(String key, boolean value)
-   {      
-      super.putBooleanProperty(key, value);
-      
-      bufferValid = false;;
-   }
-
-   @Override
-   public void putByteProperty(SimpleString key, byte value)
-   {      
-      super.putByteProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putByteProperty(String key, byte value)
-   {      
-      super.putByteProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putBytesProperty(SimpleString key, byte[] value)
-   {      
-      super.putBytesProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putBytesProperty(String key, byte[] value)
-   {      
-      super.putBytesProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putDoubleProperty(SimpleString key, double value)
-   {      
-      super.putDoubleProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putDoubleProperty(String key, double value)
-   {      
-      super.putDoubleProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putFloatProperty(SimpleString key, float value)
-   {      
-      super.putFloatProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putFloatProperty(String key, float value)
-   {      
-      super.putFloatProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putIntProperty(SimpleString key, int value)
-   {      
-      super.putIntProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putIntProperty(String key, int value)
-   {      
-      super.putIntProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putLongProperty(SimpleString key, long value)
-   {      
-      super.putLongProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putLongProperty(String key, long value)
-   {      
-      super.putLongProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putObjectProperty(SimpleString key, Object value) throws PropertyConversionException
-   {      
-      super.putObjectProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putObjectProperty(String key, Object value) throws PropertyConversionException
-   {      
-      super.putObjectProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putShortProperty(SimpleString key, short value)
-   {      
-      super.putShortProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putShortProperty(String key, short value)
-   {      
-      super.putShortProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putStringProperty(SimpleString key, SimpleString value)
-   {      
-      super.putStringProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putStringProperty(String key, String value)
-   {      
-      super.putStringProperty(key, value);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void putTypedProperties(TypedProperties otherProps)
-   {      
-      super.putTypedProperties(otherProps);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void setDestination(SimpleString destination)
-   {      
-      super.setDestination(destination);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void setDurable(boolean durable)
-   {      
-      super.setDurable(durable);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void setExpiration(long expiration)
-   {      
-      super.setExpiration(expiration);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void setPriority(byte priority)
-   {      
-      super.setPriority(priority);
-      
-      bufferValid = false;
-   }
-
-   @Override
-   public void setTimestamp(long timestamp)
-   {      
-      super.setTimestamp(timestamp);
-      
-      bufferValid = false;
-   }
-   
-   
-
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -1434,7 +1434,7 @@
 
    public void handleSendLargeMessage(final SessionSendLargeMessage packet)
    {
- 
+
       // need to create the LargeMessage before continue
       long id = storageManager.generateUniqueID();
 
@@ -1454,7 +1454,7 @@
    }
 
    public void handleSend(final SessionSendMessage packet)
-   {  
+   {
       Packet response = null;
 
       ServerMessage message = (ServerMessage)packet.getMessage();
@@ -1574,7 +1574,7 @@
       final CreditManagerHolder holder = this.getCreditManagerHolder(address);
 
       int credits = packet.getCredits();
-            
+
       int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
       {
          public boolean run(final int credits)
@@ -1594,7 +1594,7 @@
             }
          }
       });
-      
+
       if (gotCredits > 0)
       {
          sendProducerCredits(holder, gotCredits, address);
@@ -1942,7 +1942,7 @@
    private void sendProducerCredits(final CreditManagerHolder holder, final int credits, final SimpleString address)
    {
       holder.outstandingCredits += credits;
-      
+
       Packet packet = new SessionProducerCreditsMessage(credits, address, -1);
 
       channel.send(packet);
@@ -1967,7 +1967,7 @@
       }
 
       if (tx == null || autoCommitSends)
-      {         
+      {
          postOffice.route(msg);
       }
       else

Modified: branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -41,7 +41,7 @@
 
    private static ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<SoftReference<StringUtilBuffer>>();
 
-   public static void saveUTF(final HornetQBuffer out, final String str) throws IOException
+   public static void saveUTF(final HornetQBuffer out, final String str)
    {
       StringUtilBuffer buffer = getThreadLocalBuffer();
 
@@ -107,7 +107,7 @@
       }
    }
 
-   public static String readUTF(final HornetQBuffer input) throws IOException
+   public static String readUTF(final HornetQBuffer input)
    {
       StringUtilBuffer buffer = getThreadLocalBuffer();
 

Modified: branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
--- branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -158,11 +158,13 @@
 			browser = session.createBrowser(queue1);
 			en = browser.getEnumeration();
 
+			log.info("browsing");
+			
 			count = 0;
 			while (en.hasMoreElements())
 			{
 				Message mess = (Message)en.nextElement();
-				log.trace("message:" + mess);
+				log.info("message:" + mess);
 				count++;
 			}
 

Modified: branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
===================================================================
--- branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -37,7 +37,6 @@
 
    // Public --------------------------------------------------------
 
-  
    public void testSimpleRollback() throws Exception
    {
       // send a message
@@ -45,44 +44,44 @@
 
       try
       {
-	      conn = cf.createConnection();
-	      Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-	      s.createProducer(queue1).send(s.createTextMessage("one"));
+         conn = cf.createConnection();
+         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         s.createProducer(queue1).send(s.createTextMessage("one"));
 
-	      s.close();
+         s.close();
 
-	      s = conn.createSession(true, Session.SESSION_TRANSACTED);
-	      MessageConsumer c = s.createConsumer(queue1);
-	      conn.start();
-	      Message m = c.receive(1000);
-	      assertNotNull(m);
+         s = conn.createSession(true, Session.SESSION_TRANSACTED);
+         MessageConsumer c = s.createConsumer(queue1);
+         conn.start();
+         Message m = c.receive(1000);
+         assertNotNull(m);
 
-	      assertEquals("one", ((TextMessage)m).getText());
-	      assertFalse(m.getJMSRedelivered());
-	      assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
+         assertEquals("one", ((TextMessage)m).getText());
+         assertFalse(m.getJMSRedelivered());
+         assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
 
-	      s.rollback();
+         s.rollback();
 
-	      // get the message again
-	      m = c.receive(1000);
-	      assertNotNull(m);
-	      
-	      assertTrue(m.getJMSRedelivered());
-	      assertEquals(2, m.getIntProperty("JMSXDeliveryCount"));
+         // get the message again
+         m = c.receive(1000);
+         assertNotNull(m);
 
-	      conn.close();
+         assertTrue(m.getJMSRedelivered());
+         assertEquals(2, m.getIntProperty("JMSXDeliveryCount"));
 
-	      Integer i = getMessageCountForQueue("Queue1");
+         conn.close();
 
+         Integer i = getMessageCountForQueue("Queue1");
+
          assertEquals(1, i.intValue());
       }
       finally
       {
-      	if (conn != null)
-      	{
-      		conn.close();
-      	}
-      	removeAllMessages(queue1.getQueueName(), true);
+         if (conn != null)
+         {
+            conn.close();
+         }
+         removeAllMessages(queue1.getQueueName(), true);
       }
    }
 
@@ -107,11 +106,11 @@
 
          TextMessage mRec1 = (TextMessage)consumer1.receive(2000);
          assertNotNull(mRec1);
-         
+
          assertEquals("igloo", mRec1.getText());
          assertFalse(mRec1.getJMSRedelivered());
 
-         sess1.rollback(); //causes redelivery for session
+         sess1.rollback(); // causes redelivery for session
 
          mRec1 = (TextMessage)consumer1.receive(2000);
          assertEquals("igloo", mRec1.getText());
@@ -128,7 +127,6 @@
       }
    }
 
-
    /** Test redelivery works ok for Topic */
    public void testRedeliveredTopic() throws Exception
    {
@@ -187,8 +185,10 @@
          MessageConsumer consumer = sess.createConsumer(topic1);
          conn.start();
 
+         log.info("sending message first time");
          TextMessage mSent = sess.createTextMessage("igloo");
          producer.send(mSent);
+         log.info("sent message first time");
 
          sess.commit();
 
@@ -197,12 +197,15 @@
 
          sess.commit();
 
+         log.info("sending message again");
          mSent.setText("rollback");
          producer.send(mSent);
+         log.info("sent message again");
 
          sess.commit();
 
          mRec = (TextMessage)consumer.receive(2000);
+         assertEquals("rollback", mRec.getText());
          sess.rollback();
 
          TextMessage mRec2 = (TextMessage)consumer.receive(2000);
@@ -243,7 +246,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -283,7 +286,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -296,7 +299,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
 
@@ -335,7 +339,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -345,7 +349,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
 
@@ -378,8 +383,6 @@
 
    }
 
-
-
    /*
     * Send some messages in a transacted session.
     * Rollback the session.
@@ -402,7 +405,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -503,7 +506,7 @@
          assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
 
          sess.rollback();
-         
+
          sess.close();
 
          Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -537,7 +540,6 @@
       MessageConsumer consumer = sess.createConsumer(queue1);
       conn.start();
 
-
       TextMessage mSent = sess.createTextMessage("igloo");
       producer.send(mSent);
       log.trace("sent1");
@@ -560,7 +562,7 @@
       log.trace("Receiving 2");
       mRec = (TextMessage)consumer.receive(1000);
       assertNotNull(mRec);
-      
+
       log.trace("Received 2");
       assertNotNull(mRec);
       assertEquals("rollback", mRec.getText());
@@ -599,7 +601,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -617,9 +619,6 @@
       }
    }
 
-
-
-
    /**
     * Send some messages in transacted session. Commit.
     * Verify message are received by consumer.
@@ -641,7 +640,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -654,7 +653,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
 
@@ -671,7 +671,6 @@
 
    }
 
-
    /**
     * Test IllegateStateException is thrown if commit is called on a non-transacted session
     */
@@ -706,7 +705,6 @@
       }
    }
 
-
    /**
     * Send some messages.
     * Receive them in a transacted session.
@@ -733,7 +731,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -744,7 +742,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
 
@@ -752,9 +751,9 @@
 
          conn.stop();
          consumer.close();
-   
+
          conn.close();
-   
+
          conn = cf.createConnection();
 
          consumerSess = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -762,15 +761,15 @@
          conn.start();
 
          count = 0;
-         
 
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
-         
+
          assertEquals(NUM_MESSAGES, count);
       }
       finally
@@ -783,9 +782,6 @@
       }
    }
 
-
-
-
    /**
     * Send some messages.
     * Receive them in a transacted session.
@@ -810,7 +806,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -821,7 +817,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
 
@@ -854,8 +851,6 @@
 
    }
 
-
-
    /*
     * Send some messages in a transacted session.
     * Rollback the session.
@@ -878,7 +873,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -900,7 +895,6 @@
       }
    }
 
-
    /*
     * Test IllegateStateException is thrown if rollback is
     * called on a non-transacted session
@@ -938,7 +932,6 @@
       }
    }
 
-
    /*
     * Send some messages.
     * Receive them in a transacted session.
@@ -965,7 +958,7 @@
 
          final int NUM_MESSAGES = 10;
 
-         //Send some messages
+         // Send some messages
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             Message m = producerSess.createMessage();
@@ -976,7 +969,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
 
@@ -999,7 +993,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
          }
 
@@ -1017,7 +1012,6 @@
 
    }
 
-
    /*
     * Send multiple messages in multiple contiguous sessions
     */
@@ -1039,7 +1033,7 @@
          final int NUM_MESSAGES = 10;
          final int NUM_TX = 10;
 
-         //Send some messages
+         // Send some messages
 
          for (int j = 0; j < NUM_TX; j++)
          {
@@ -1056,7 +1050,8 @@
          while (true)
          {
             Message m = consumer.receive(500);
-            if (m == null) break;
+            if (m == null)
+               break;
             count++;
             m.acknowledge();
          }
@@ -1080,5 +1075,3 @@
 
    // Inner classes -------------------------------------------------
 }
-
-

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -78,21 +78,18 @@
             cp.send(sendSession.createClientMessage(false));
          }
 
-         log.info("sent messages");
-
          ClientConsumer consumer = session.createConsumer(queueA);
          session.start();
          for (int i = 0; i < numMessages - 1; i++)
          {
             ClientMessage m = consumer.receive(5000);
             
-            log.info("got message " + i);
             m.acknowledge();
          }
 
          ClientMessage m = consumer.receive(5000);
          Queue q = (Queue)server.getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(1, q.getDeliveringCount());
+         assertEquals(100, q.getDeliveringCount());
          m.acknowledge();
          assertEquals(0, q.getDeliveringCount());
          sendSession.close();

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -105,7 +105,7 @@
       log.info("sent messages");
 
       ClientConsumer consumer = session.createConsumer(QUEUE);
-
+      
       session.start();
 
       for (int i = 0; i < numMessages; i++)

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -26,6 +26,7 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
@@ -39,6 +40,8 @@
  */
 public class DeadLetterAddressTest extends UnitTestCase
 {
+   private static final Logger log = Logger.getLogger(DeadLetterAddressTest.class);
+
    private HornetQServer server;
 
    private ClientSession clientSession;
@@ -236,7 +239,9 @@
       for (int i = 0; i < deliveryAttempt; i++)
       {
          ClientMessage m = clientConsumer.receive(500);
-         assertNotNull(m);
+         assertNotNull(m);  
+         log.info("i is " + i);
+         log.info("delivery cout is " +m.getDeliveryCount());
          assertEquals(i + 1, m.getDeliveryCount());
          m.acknowledge();
          clientSession.rollback();

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -85,12 +85,8 @@
       
       int bodySize = message.getBodySize();
       
-      log.info("body size is " + bodySize);
-      
       for (int i = 0; i < 10; i++)
       {  
-         log.info("sending " + i);
-         
          ClientMessage received = sendAndReceive(message);
 
          assertNotNull(received);
@@ -107,10 +103,12 @@
    {
       ClientMessage message = session.createClientMessage(false);
 
+      String body = RandomUtil.randomString();
+      
       for (int i = 0; i < 10; i++)
       {
-         log.info("iteration " + i);
-         final String body = RandomUtil.randomString();
+         //Make the body a bit longer each time
+         body += "XX";
          
          message.getBodyBuffer().writeString(body);
          

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -23,7 +23,6 @@
 import junit.framework.TestSuite;
 
 import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientProducer;
@@ -41,7 +40,6 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
 import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleString;
 
 /**

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -64,8 +64,6 @@
 
          startServers();
 
-         log.info("********** started servers");
-
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
@@ -110,8 +108,6 @@
 
          this.closeAllSessionFactories();
 
-         log.info("** stopping servers");
-
          stopServers(0, 1, 2, 3, 4);
 
          startServers();
@@ -167,33 +163,24 @@
 
       startServers();
       
-      log.info("*** started servers");
-
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
       setupSessionFactory(3, isNetty());
       setupSessionFactory(4, isNetty());
       
-      log.info("** created session factories");
-
-      createQueue(0, "queues.testaddress", "queue0", null, false);
-           
+      createQueue(0, "queues.testaddress", "queue0", null, false);           
       createQueue(1, "queues.testaddress", "queue0", null, false);
       createQueue(2, "queues.testaddress", "queue0", null, false);
       createQueue(3, "queues.testaddress", "queue0", null, false);
       createQueue(4, "queues.testaddress", "queue0", null, false);
       
-      log.info("**** created queues");
-
       addConsumer(0, 0, "queue0", null);
       addConsumer(1, 1, "queue0", null);
       addConsumer(2, 2, "queue0", null);
       addConsumer(3, 3, "queue0", null);
       addConsumer(4, 4, "queue0", null);
       
-      log.info("*** created consumers");
-
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 1, true);
       waitForBindings(2, "queues.testaddress", 1, 1, true);
@@ -206,14 +193,8 @@
       waitForBindings(3, "queues.testaddress", 4, 4, false);
       waitForBindings(4, "queues.testaddress", 4, 4, false);
 
-      log.info("** sending messages");
-      
       send(0, "queues.testaddress", 10, false, null);
       
-      log.info("** sent messages");
-      
-    //  this.checkReceive(0, 1, 2, 3, 4);
-
       verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
 
       this.verifyNotReceive(0, 1, 2, 3, 4);
@@ -1181,6 +1162,10 @@
       waitForBindings(2, "queues.testaddress", 4, 4, false);
       waitForBindings(3, "queues.testaddress", 4, 4, false);
       waitForBindings(4, "queues.testaddress", 4, 4, false);
+      
+     // this.checkReceive(0, 1, 2, 3, 4);
+      
+      //Thread.sleep(300000);
 
       verifyReceiveAll(10, 0, 1, 2, 3, 4);
    }
@@ -1436,12 +1421,9 @@
       closeSessionFactory(3);
 
       stopServers(0, 3);
-      log.info("stopped servers");
 
       startServers(3, 0);
-      
-      log.info("restarted servers");
-      
+       
       Thread.sleep(2000);
 
       setupSessionFactory(0, isNetty());

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -17,6 +17,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQ;
 
 
@@ -30,6 +31,8 @@
  */
 public class MultiThreadRandomReattachTest extends MultiThreadRandomReattachTestBase
 {
+   private static final Logger log = Logger.getLogger(MultiThreadRandomReattachTest.class);
+   
    @Override
    protected void start() throws Exception
    {      
@@ -47,6 +50,8 @@
    @Override
    protected void setBody(final ClientMessage message) throws Exception
    {
+      //Give each msg a body
+      message.getBodyBuffer().writeBytes(new byte[500]);
    }
 
    /* (non-Javadoc)
@@ -55,7 +60,7 @@
    @Override
    protected boolean checkSize(final ClientMessage message)
    {
-      return 0 == message.getBodyBuffer().writerIndex();
+      return message.getBodyBuffer().readableBytes() == 500;
    }
 
 }

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -167,12 +167,12 @@
 
                   final SimpleString address = new SimpleString("Destination " + i);
 
-                  ServerMessageImpl implMsg = new ServerMessageImpl();
+                  ServerMessageImpl implMsg = new ServerMessageImpl(i, 1000);
 
                   implMsg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
 
                   implMsg.setMessageID(i);
-                  implMsg.setBuffer(HornetQChannelBuffers.wrappedBuffer(bytes));
+                  implMsg.getBodyBuffer().writeBytes(bytes);
 
                   implMsg.setDestination(address);
 

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -25,10 +25,12 @@
 import org.hornetq.core.paging.impl.PageImpl;
 import org.hornetq.core.paging.impl.PagedMessageImpl;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -207,9 +209,9 @@
 
       for (int i = 0; i < numberOfElements; i++)
       {
-         ServerMessage msg = new ServerMessageImpl(i, 1000);                 
+         ServerMessage msg = new ServerMessageImpl(i, 100);                 
 
-         for (int j = 0; j < msg.getBodyBuffer().capacity(); j++)
+         for (int j = 0; j < 10; j++)
          {           
             msg.getBodyBuffer().writeByte((byte)'b');
          }
@@ -225,8 +227,6 @@
       return buffers;
    }
 
-
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -238,5 +238,4 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-
 }

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-25 00:45:52 UTC (rev 8396)
@@ -286,6 +286,18 @@
    class FakeMessage implements ServerMessage
    {
 
+      public void decode(HornetQBuffer buffer)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public void encode(HornetQBuffer buffer)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
       public ServerMessage copy() throws Exception
       {
          // TODO Auto-generated method stub
@@ -322,12 +334,6 @@
          
       }
 
-      public int getEndMessagePosition()
-      {
-         // TODO Auto-generated method stub
-         return 0;
-      }
-
       public int getMemoryEstimate()
       {
          // TODO Auto-generated method stub
@@ -376,43 +382,43 @@
          return false;
       }
 
-      public void setEndMessagePosition(int pos)
+      public void setMessageID(long id)
       {
          // TODO Auto-generated method stub
          
       }
 
-      public void setMessageID(long id)
+      public void setOriginalHeaders(ServerMessage other, boolean expiry)
       {
          // TODO Auto-generated method stub
          
       }
 
-      public void setNeedsEncoding()
+      public void setPagingStore(PagingStore store)
       {
          // TODO Auto-generated method stub
          
       }
 
-      public void setOriginalHeaders(ServerMessage other, boolean expiry)
+      public boolean storeIsPaging()
       {
          // TODO Auto-generated method stub
-         
+         return false;
       }
 
-      public void setPagingStore(PagingStore store)
+      public void bodyChanged()
       {
          // TODO Auto-generated method stub
          
       }
 
-      public boolean storeIsPaging()
+      public void checkCopy()
       {
          // TODO Auto-generated method stub
-         return false;
+         
       }
 
-      public void afterSend()
+      public void clearCopied()
       {
          // TODO Auto-generated method stub
          
@@ -430,7 +436,7 @@
          return false;
       }
 
-      public void decodeFromWire(HornetQBuffer buffer)
+      public void decodeFromBuffer(HornetQBuffer buffer)
       {
          // TODO Auto-generated method stub
          
@@ -448,6 +454,12 @@
          
       }
 
+      public HornetQBuffer encodeToBuffer()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
       public HornetQBuffer getBodyBuffer()
       {
          // TODO Auto-generated method stub
@@ -526,6 +538,18 @@
          return 0;
       }
 
+      public int getEndOfBodyPosition()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      public int getEndOfMessagePosition()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
       public long getExpiration()
       {
          // TODO Auto-generated method stub
@@ -670,24 +694,12 @@
          return null;
       }
 
-      public boolean isBufferWritten()
-      {
-         // TODO Auto-generated method stub
-         return false;
-      }
-
       public boolean isDurable()
       {
          // TODO Auto-generated method stub
          return false;
       }
 
-      public boolean isEncodedToBuffer()
-      {
-         // TODO Auto-generated method stub
-         return false;
-      }
-
       public boolean isExpired()
       {
          // TODO Auto-generated method stub
@@ -838,12 +850,6 @@
          return null;
       }
 
-      public void setBuffer(HornetQBuffer buffer)
-      {
-         // TODO Auto-generated method stub
-         
-      }
-
       public void setDestination(SimpleString destination)
       {
          // TODO Auto-generated method stub
@@ -880,91 +886,20 @@
          return null;
       }
 
-      public void decode(HornetQBuffer buffer)
+      public void resetCopied()
       {
          // TODO Auto-generated method stub
          
       }
 
-      public void encode(HornetQBuffer buffer)
+      public void setEndOfBodyPosition()
       {
          // TODO Auto-generated method stub
          
       }
 
-      public void setEncodedToBuffer(boolean encoded)
-      {
-         // TODO Auto-generated method stub
-         
-      }
 
-      public void decodeFromBuffer(HornetQBuffer buffer)
-      {
-         // TODO Auto-generated method stub
-         
-      }
 
-      public HornetQBuffer encodeToBuffer()
-      {
-         // TODO Auto-generated method stub
-         return null;
-      }
-
-      public int getBodySize()
-      {
-         // TODO Auto-generated method stub
-         return 0;
-      }
-
-      public int getEndOfMessagePosition()
-      {
-         // TODO Auto-generated method stub
-         return 0;
-      }
-
-      public void saveToOutputStream(OutputStream out) throws HornetQException
-      {
-         // TODO Auto-generated method stub
-         
-      }
-
-      public void setBodyInputStream(InputStream bodyInputStream)
-      {
-         // TODO Auto-generated method stub
-         
-      }
-
-      public void setOutputStream(OutputStream out) throws HornetQException
-      {
-         // TODO Auto-generated method stub
-         
-      }
-
-      public boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException
-      {
-         // TODO Auto-generated method stub
-         return false;
-      }
-
-      public void beforeDeliver()
-      {
-         // TODO Auto-generated method stub
-         
-      }
-
-      public void beforeSend()
-      {
-         // TODO Auto-generated method stub
-         
-      }
-
-      public void forceCopy()
-      {
-         // TODO Auto-generated method stub
-         
-      }
-
-
    }
 
    class FakeFilter implements Filter



More information about the hornetq-commits mailing list