[jboss-cvs] JBoss Messaging SVN: r4799 - in branches/Branch_Message_Chunking_new: examples/jms/src/org/jboss/jms/example and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 13 08:49:05 EDT 2008


Author: ataylor
Date: 2008-08-13 08:49:04 -0400 (Wed, 13 Aug 2008)
New Revision: 4799

Modified:
   branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
   branches/Branch_Message_Chunking_new/messaging.ipr
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
Log:
tidied up

Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -49,7 +49,7 @@
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          final MessageProducer producer = session.createProducer(queue);
          final BytesMessage message = session.createBytesMessage();
-         byte[] bytes = new byte[500];
+         byte[] bytes = new byte[900];
          for(int i = 0; i < bytes.length; i++)
          {
             bytes[i] = (byte) i;
@@ -58,6 +58,7 @@
          message.writeBytes(bytes);
          log.info("sending message to queue");
          producer.send(message);
+         producer.send(message);
         /* new Thread(new Runnable()
          {
             public void run()
@@ -106,6 +107,21 @@
                throw new RuntimeException(i+"");
             }
          }
+         message2 = (BytesMessage) messageConsumer.receive(50000000);
+         log.info("message received from queue");
+         message2.readBytes(newbytes);
+         //log.info("message = " + new String(newbytes));
+         for (int i = 0; i < newbytes.length; i++)
+         {
+            byte newbyte = newbytes[i];
+            System.out.print(newbyte);
+            System.out.print(" = ");
+            System.out.println(bytes[i]);
+            if(bytes[i] != newbyte)
+            {
+               throw new RuntimeException(i+"");
+            }
+         }
          /*message2 = (BytesMessage) messageConsumer.receive(50000000);
          log.info("message received from queue");
          message2.readBytes(newbytes);

Modified: branches/Branch_Message_Chunking_new/messaging.ipr
===================================================================
--- branches/Branch_Message_Chunking_new/messaging.ipr	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/messaging.ipr	2008-08-13 12:49:04 UTC (rev 4799)
@@ -468,6 +468,7 @@
       <JAVADOC />
       <SOURCES>
         <root url="jar://$PROJECT_DIR$/thirdparty/jboss/jnpserver/lib/jnpserver-sources.jar!/" />
+        <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1-sources.jar!/" />
       </SOURCES>
       <jarDirectory url="file://$PROJECT_DIR$/examples/jms/config" recursive="false" />
       <jarDirectory url="file://$PROJECT_DIR$/src/etc" recursive="false" />

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -40,4 +40,6 @@
    long getDeliveryID();
    
    void setDeliveryID(long deliveryID);
+
+   void prepareBuffers();
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -60,7 +60,15 @@
    {
       super(type, durable, expiration, timestamp, priority, body);
    }
-   
+
+   protected void resetBodyForUse()
+   {
+      for (MessagingBuffer messagingBuffer : buffers)
+      {
+         messagingBuffer.position(9);
+      }
+   }
+
    public ClientMessageImpl(final byte type, final boolean durable, MessagingBuffer body)
    {
       super(type, durable, 0, System.currentTimeMillis(), (byte)4, body);
@@ -96,4 +104,14 @@
       return this.deliveryID;
    }
 
+
+   public void prepareBuffers()
+   {
+      bodySize = 0;
+      for (MessagingBuffer buffer : buffers)
+      {
+         buffersSizes.add(buffer.limit() - 9);
+         bodySize += (buffer.limit() - 9);
+      }
+   }
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -68,6 +68,10 @@
    void setPriority(byte priority);
    
    int getEncodeSize();
+
+   void encodeForTransport(Encoder buffer);
+
+   void decodeForTransport(Encoder buffer);
    
    // Properties
    // ------------------------------------------------------------------

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -22,11 +22,13 @@
 
 package org.jboss.messaging.core.message.impl;
 
+import org.apache.mina.core.buffer.IoBuffer;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.remoting.Encoder;
 import org.jboss.messaging.core.remoting.MessagingBuffer;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.util.DataConstants;
 import static org.jboss.messaging.util.DataConstants.*;
@@ -68,7 +70,7 @@
 
    private SimpleString destination;
 
-   private byte type;
+   protected byte type;
 
    protected boolean durable;
 
@@ -81,11 +83,12 @@
 
    private byte priority;
 
-   private List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
-   private List<Integer> buffersSizes = new ArrayList<Integer>();
+   protected List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
+   protected List<Integer> buffersSizes = new ArrayList<Integer>();
    private int bufferPos = 0;
    private int pos = 0;
-   private int bodySize = 0;
+   protected int bodySize = 0;
+   private boolean isBodySplit = true;
 
    // Constructors --------------------------------------------------
 
@@ -138,51 +141,128 @@
    }
 
    // Message implementation ----------------------------------------
-    public void encode(Encoder buff)
+
+   public void encodeForTransport(Encoder buff)
    {
-      buff.putSimpleString(destination);
-      buff.putByte(type);
+      buff.putSimpleString(getDestination());
+      buff.putByte(getType());
       buff.putBoolean(durable);
-      buff.putLong(expiration);
-      buff.putLong(timestamp);
-      buff.putByte(priority);
-      properties.encode(buff);
-      buff.putInt(getBodySize() - (buffers.size() * 9));
-      for (MessagingBuffer buffer : buffers)
+      buff.putLong(getExpiration());
+      buff.putLong(getTimestamp());
+      buff.putByte(getPriority());
+      getProperties().encode(buff);
+      if(buffers.size() == 1 && buffers.get(0).limit() <= buff.remaining())
       {
-         buff.putBuffer(buffer);
+         int offSet = isBodySplit?9:0;
+         buff.putInt(getBodySize());
+         //are we splitting the buffers.
+         buff.putBoolean(false);
+         buff.putBytes(buffers.get(0).array(), offSet, buffers.get(0).limit());
       }
+      else
+      {
+         buff.putInt(getBodySize());
+         buff.putBoolean(true);
+         for (int i = 0; i < buffers.size(); i++)
+         {
+            MessagingBuffer buffer = buffers.get(i);
+            IoBuffer copied = IoBuffer.allocate(buffer.limit());
+            copied.put(buffer.array(), 0, buffer.limit());
+            copied.setAutoExpand(true);
+            buff.putBuffer(new IoBufferWrapper(copied));
+         }
+      }
    }
 
-   public int getEncodeSize()
+    public void decodeForTransport(Encoder buffer)
    {
-      return /* Destination */ SimpleString.sizeofString(destination) +
-      /* Type */ SIZE_BYTE +
-      /* Durable */ SIZE_BOOLEAN +
-      /* Expiration */ SIZE_LONG +
-      /* Timestamp */ SIZE_LONG +
-      /* Priority */ SIZE_BYTE +
-      /* PropertySize and Properties */ properties.getEncodeSize() +
-      /* BodySize and Body */ SIZE_INT + getBodySize();
+      setDestination(buffer.getSimpleString());
+      type = buffer.getByte();
+      durable = buffer.getBoolean();
+      setExpiration(buffer.getLong());
+      setTimestamp(buffer.getLong());
+      setPriority(buffer.getByte());
+
+      getProperties().decode(buffer);
+      int len = buffer.getInt();
+      isBodySplit = buffer.getBoolean();
+      if(isBodySplit)
+      {
+         buffer.transferRemainingBuffer(this, len);
+         //we now need to reposition the buffers for use
+         resetBodyForUse();
+      }
+      else
+      {
+         buffers.add(new ByteBufferWrapper(ByteBuffer.allocate(PacketImpl.INITIAL_BUFFER_SIZE)));
+         byte[] bytes = new byte[len];
+         buffer.getBytes(bytes);
+         getCurrentBuffer().putBytes(bytes);
+         getCurrentBuffer().flip();
+         bodySize = len;
+         pos = 0;
+      }
    }
 
+    abstract protected void resetBodyForUse();
+
+    public void encode(Encoder buff)
+   {
+      buff.putSimpleString(getDestination());
+      buff.putByte(getType());
+      buff.putBoolean(durable);
+      buff.putLong(getExpiration());
+      buff.putLong(getTimestamp());
+      buff.putByte(getPriority());
+      getProperties().encode(buff);
+      if(isBodySplit)
+      {
+         buff.putInt(getBodySize());
+         for (int i = 0; i < buffers.size(); i++)
+         {
+            MessagingBuffer buffer = buffers.get(i);
+            buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
+         }
+      }
+      else
+      {
+         buff.putInt(getBodySize());
+         buff.putBytes(buffers.get(0).array(), 0, buffers.get(0).limit());
+      }
+   }
+
    public void decode(final Encoder buffer)
    {
-      destination = buffer.getSimpleString();
+      setDestination(buffer.getSimpleString());
       type = buffer.getByte();
       durable = buffer.getBoolean();
-      expiration = buffer.getLong();
-      timestamp = buffer.getLong();
-      priority = buffer.getByte();
+      setExpiration(buffer.getLong());
+      setTimestamp(buffer.getLong());
+      setPriority(buffer.getByte());
 
-      properties.decode(buffer);
+      getProperties().decode(buffer);
       int len = buffer.getInt();
       buffers.add(new ByteBufferWrapper(ByteBuffer.allocate(PacketImpl.INITIAL_BUFFER_SIZE)));
       buffer.transferRemainingBuffer(this, len);
       flip();
    }
 
+
+   public int getEncodeSize()
+   {
+      return /* Destination */ SimpleString.sizeofString(destination) +
+      /* Type */ SIZE_BYTE +
+      /* Durable */ SIZE_BOOLEAN +
+      /* Expiration */ SIZE_LONG +
+      /* Timestamp */ SIZE_LONG +
+      /* Priority */ SIZE_BYTE +
+      /* PropertySize and Properties */ properties.getEncodeSize() +
+      /* BodySize and Body */ SIZE_INT + getBodySize();
+   }
+
    
+
+   
    public SimpleString getDestination()
    {
       return destination;
@@ -396,7 +476,7 @@
 
    public void putBytes(byte[] bytes, int i, int i1)
    {
-      if(getCurrentBuffer().remaining() <= (i1 - i))
+      if(getCurrentBuffer().remaining() > (i1 - i))
       {
          getCurrentBuffer().putBytes(bytes, i, i1);
          pos += (i1 - i);
@@ -654,12 +734,13 @@
 
    public int getBodySize()
    {
-      int size = 0;
+      /*int size = 0;
       for (MessagingBuffer buffer : buffers)
       {
          size += buffer.limit();
       }
-      return size;
+      return size;*/
+      return bodySize;
    }
 
    public void reset()
@@ -699,33 +780,17 @@
 
    public void putBuffer(MessagingBuffer body)
    {
-      if(getCurrentBuffer().remaining() > body.limit())
-      {
-         //copy contents
-         putBytes(body.array(), 0, body.limit());
-      }
-      else
-      {
-         buffers.add(body);
-      }
+      buffers.add(body);
+      buffersSizes.add(body.limit() - 5);
+      bodySize += body.limit() - 5;
    }
 
    public void transferRemainingBuffer(Encoder buffer, int len)
    {
-      if(getCurrentBuffer().remaining() > 0)
+      for(int i = bufferPos + 1; i < buffers.size(); i++)
       {
-         //copy contents
-         byte[] bytes = new byte[len];
-         getBytes(bytes);
-         buffer.putBytes(bytes);
+         buffer.putBuffer(buffers.get(i));
       }
-      else
-      {
-         for(int i = bufferPos + 1; i < buffers.size(); i++)
-         {
-            buffer.putBuffer(buffers.get(i));
-         }
-      }
    }
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -69,7 +69,11 @@
 
    boolean getBoolean();
 
-   void putBuffer(MessagingBuffer buffer);
+   void putBuffer(MessagingBuffer buffern);
 
    void transferRemainingBuffer(Encoder buffer, int len);
+
+   void putBytes(byte[] bytes, int i, int i1);
+
+   int remaining();
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -378,7 +378,7 @@
 
    public void putBuffer(MessagingBuffer buffer)
    {
-      buffer.putBytes(buffer.array(), buffer.position(), buffer.limit());
+      buffer.putBytes(buffer.array());
    }
 
    public void transferRemainingBuffer(Encoder buffer, int len)

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -361,7 +361,7 @@
 
    public void putBuffer(MessagingBuffer buffer)
    {
-      putBytes(buffer.array(), buffer.position(), buffer.limit());
+      putBytes(buffer.array());
    }
 
    public void transferRemainingBuffer(Encoder buffer, int len)

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -370,7 +370,7 @@
 
    public void putBuffer(MessagingBuffer buffer)
    {
-      buffer.putBytes(buffer.array(), buffer.position(), buffer.limit());
+      buffer.putBytes(buffer.array());
    }
 
    public void transferRemainingBuffer(Encoder buffer, int len)

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -94,6 +94,7 @@
       IoBuffer copied = IoBuffer.allocate(in.remaining());
       copied.put(in);
       copied.setAutoExpand(true);
+      copied.limit(length + SIZE_INT);
       copied.flip();
 
       in.position(start + length + SIZE_INT);

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -140,6 +140,7 @@
    private List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
    private List<Integer> buffersSizes = new ArrayList<Integer>();
    private int currentPos = 0;
+   private int packetid;
 
    // Static --------------------------------------------------------
 
@@ -204,8 +205,10 @@
 
    public void encode(MessagingBuffer buffer, int packetId)
    {      
+      this.packetid = packetId;
       //The standard header fields
       buffers.add(buffer);
+      //buffersSizes.add(-1);
       //length filled at end
       getCurrentBuffer().putInt(-1);
       //are we the last packet, filled at end.
@@ -218,6 +221,16 @@
       putLong(executorID);
 
       encodeBody();
+
+      for (int i = 0; i < buffers.size(); i++)
+      {
+         MessagingBuffer messagingBuffer = buffers.get(i);
+         if(messagingBuffer.position() != 0)
+         {
+            messagingBuffer.flip();
+         }
+         buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
+      }
    }
 
    public void decode()
@@ -582,33 +595,17 @@
 
    public void putBuffer(MessagingBuffer body)
    {
-      if(getCurrentBuffer().remaining() > body.limit())
-      {
-         //copy contents
-         putBytes(body.array(), 9, body.limit());
-      }
-      else
-      {
-         buffers.add(body);
-      }
+      body.putInt(SIZE_INT, packetid);
+      buffers.add(body);
+      //buffersSizes.add(len);
    }
 
    public void transferRemainingBuffer(Encoder buffer, int len)
    {
-      if(getCurrentBuffer().remaining() > 0)
+      for(int i = currentPos + 1; i < buffers.size(); i++)
       {
-         //copy contents
-         byte[] bytes = new byte[len];
-         getBytes(bytes);
-         buffer.putBytes(bytes);
+         buffer.putBuffer(buffers.get(i));
       }
-      else
-      {
-         for(int i = currentPos + 1; i < buffers.size(); i++)
-         {
-            buffer.putBuffer(buffers.get(i));
-         }
-      }
    }
 
    public List<MessagingBuffer> getBuffers()
@@ -616,15 +613,17 @@
       for (int i = 0; i < buffers.size(); i++)
       {
          MessagingBuffer buffer = buffers.get(i);
-         buffer.putInt(0, buffer.position() - SIZE_INT);
+         if(buffer.position() != 0)
+         {
+            buffer.flip();
+         }
+         buffer.putInt(0, buffersSizes.get(i));
+         buffer.putInt(SIZE_INT, packetid);
          if(i == buffers.size() - 1)
          {
             buffer.putBoolean(SIZE_INT * 2, true);
          }
-         if(buffer.position() != 0)
-         {
-            buffer.flip();
-         }
+
       }
       return buffers;
    }
@@ -663,6 +662,11 @@
       }
    }
 
+   public int remaining()
+   {
+      return getCurrentBuffer().remaining();
+   }
+
    // Package protected ---------------------------------------------
 
      protected String getParentString()

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -82,7 +82,8 @@
    
    public void encodeBody()
    {
-      clientMessage.encode(this);
+      clientMessage.prepareBuffers();
+      clientMessage.encodeForTransport(this);
    }
    
    public void decodeBody()
@@ -91,7 +92,7 @@
       
       serverMessage = new ServerMessageImpl();
       
-      serverMessage.decode(this);
+      serverMessage.decodeForTransport(this);
       
    }
 

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -106,7 +106,7 @@
    {
       putInt(deliveryCount);
       putLong(deliveryID);
-      serverMessage.encode(this);
+      serverMessage.encodeForTransport(this);
    }
    
    public void decodeBody()
@@ -118,7 +118,7 @@
       
       clientMessage = new ClientMessageImpl(deliveryCount, deliveryID);
       
-      clientMessage.decode(this);
+      clientMessage.decodeForTransport(this);
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-08-13 12:49:04 UTC (rev 4799)
@@ -83,7 +83,15 @@
    {
       super(type, durable, expiration, timestamp, priority, buffer);
    }
-   
+
+   protected void resetBodyForUse()
+   {
+      for (MessagingBuffer messagingBuffer : buffers)
+      {
+         messagingBuffer.position(0);
+      }
+   }
+
    public long getMessageID()
    {
       return messageID;
@@ -134,6 +142,7 @@
    public ServerMessage copy()
    {
       return new ServerMessageImpl(this);
-   }  
+   }
+
 }
 




More information about the jboss-cvs-commits mailing list