[jboss-cvs] JBoss Messaging SVN: r4800 - in branches/Branch_Message_Chunking_new: src/main/org/jboss/messaging/core/message/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 13 10:43:30 EDT 2008


Author: ataylor
Date: 2008-08-13 10:43:30 -0400 (Wed, 13 Aug 2008)
New Revision: 4800

Added:
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
Modified:
   branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
Log:
tidied up

Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-13 12:49:04 UTC (rev 4799)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-13 14:43:30 UTC (rev 4800)
@@ -49,7 +49,7 @@
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          final MessageProducer producer = session.createProducer(queue);
          final BytesMessage message = session.createBytesMessage();
-         byte[] bytes = new byte[900];
+         byte[] bytes = new byte[40000];
          for(int i = 0; i < bytes.length; i++)
          {
             bytes[i] = (byte) i;
@@ -58,38 +58,8 @@
          message.writeBytes(bytes);
          log.info("sending message to queue");
          producer.send(message);
-         producer.send(message);
-        /* new Thread(new Runnable()
-         {
-            public void run()
-            {
+         //producer.send(message);
 
-               try
-               {
-                  producer.send(message);
-               }
-               catch (JMSException e)
-               {
-                  e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-               }
-            }
-         }).start();*/
-         /* new Thread(new Runnable()
-         {
-            public void run()
-            {
-
-               try
-               {
-                  producer.send(message);
-               }
-               catch (JMSException e)
-               {
-                  e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-               }
-            }
-         }).start();*/
-
          MessageConsumer messageConsumer = session.createConsumer(queue);
          connection.start();
          BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
@@ -107,7 +77,7 @@
                throw new RuntimeException(i+"");
             }
          }
-         message2 = (BytesMessage) messageConsumer.receive(50000000);
+         /*message2 = (BytesMessage) messageConsumer.receive(50000000);
          log.info("message received from queue");
          message2.readBytes(newbytes);
          //log.info("message = " + new String(newbytes));
@@ -121,7 +91,7 @@
             {
                throw new RuntimeException(i+"");
             }
-         }
+         }*/
          /*message2 = (BytesMessage) messageConsumer.receive(50000000);
          log.info("message received from queue");
          message2.readBytes(newbytes);

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-13 12:49:04 UTC (rev 4799)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
@@ -28,17 +28,14 @@
 import org.jboss.messaging.core.remoting.Encoder;
 import org.jboss.messaging.core.remoting.MessagingBuffer;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.EncoderImpl;
 import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.util.DataConstants;
 import static org.jboss.messaging.util.DataConstants.*;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TypedProperties;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -56,14 +53,12 @@
  *
  * $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
  */
-public abstract class MessageImpl implements Message
+public abstract class MessageImpl extends EncoderImpl implements Message
 {
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MessageImpl.class);
 
-   private static final Charset utf8 = Charset.forName("UTF-8");
-
    public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBMActualExpiryTime");
 
    // Attributes ----------------------------------------------------
@@ -83,11 +78,6 @@
 
    private byte priority;
 
-   protected List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
-   protected List<Integer> buffersSizes = new ArrayList<Integer>();
-   private int bufferPos = 0;
-   private int pos = 0;
-   protected int bodySize = 0;
    private boolean isBodySplit = true;
 
    // Constructors --------------------------------------------------
@@ -166,6 +156,7 @@
          for (int i = 0; i < buffers.size(); i++)
          {
             MessagingBuffer buffer = buffers.get(i);
+            //todo fix horrible hack, there is a mina bug so slicing the buffer doesnt seem to work, so i need to make a copy in case of multiple sends
             IoBuffer copied = IoBuffer.allocate(buffer.limit());
             copied.put(buffer.array(), 0, buffer.limit());
             copied.setAutoExpand(true);
@@ -260,7 +251,6 @@
       /* BodySize and Body */ SIZE_INT + getBodySize();
    }
 
-   
 
    
    public SimpleString getDestination()
@@ -404,441 +394,9 @@
    // Body
    // -------------------------------------------------------------------------------------
 
-   public void putFloat(float val)
-   {
-      checkWriteSpace(SIZE_FLOAT);
-      getCurrentBuffer().putFloat(val);
-      pos += SIZE_FLOAT;
-   }
-
-   public void putInt(int i)
-   {
-      checkWriteSpace(SIZE_INT);
-      getCurrentBuffer().putInt(i);
-      pos += SIZE_INT;
-   }
-
-   public void putByte(byte b)
-   {
-      checkWriteSpace(SIZE_BYTE);
-      getCurrentBuffer().putByte(b);
-      pos += SIZE_BYTE;
-   }
-
-   public void putLong(long l)
-   {
-      checkWriteSpace(SIZE_LONG);
-      getCurrentBuffer().putLong(l);
-      pos += SIZE_LONG;
-   }
-
-   public void putBoolean(boolean b)
-   {
-      checkWriteSpace(SIZE_BOOLEAN);
-      getCurrentBuffer().putBoolean(b);
-      pos += SIZE_BOOLEAN;
-   }
-
-   public void putNullableString(String nullableString)
-   {
-      if (nullableString == null)
-		{
-			putByte(DataConstants.NULL);
-		}
-		else
-		{
-			putByte(DataConstants.NOT_NULL);
-
-			putString(nullableString);
-		}
-   }
-
-   public void putSimpleString(SimpleString string)
-   {
-      byte[] data = string.getData();
-
-   	putInt(data.length);
-   	putBytes(data);
-   }
-
-   public void putNullableSimpleString(SimpleString string)
-   {
-      if (string == null)
-   	{
-   		putByte(DataConstants.NULL);
-   	}
-   	else
-   	{
-   	   putByte(DataConstants.NOT_NULL);
-   		putSimpleString(string);
-   	}
-   }
-
-   public void putBytes(byte[] bytes, int i, int i1)
-   {
-      if(getCurrentBuffer().remaining() > (i1 - i))
-      {
-         getCurrentBuffer().putBytes(bytes, i, i1);
-         pos += (i1 - i);
-      }
-      else
-      {
-         int written = 0;
-         while(written < i1)
-         {
-            int left = getCurrentBuffer().remaining();
-            int towrite = left + written > i1?i1 - written:left;
-            getCurrentBuffer().putBytes(bytes, i, towrite);
-            written+=towrite;
-            i += left;
-            pos += written;
-            if(written < i1)
-            {
-               setNextFragment();
-            }
-
-         }
-      }
-   }
-
-   public void putBytes(byte[] bytes)
-   {
-      putBytes(bytes, 0, bytes.length);
-   }
-
-   public void putShort(short val)
-   {
-      checkWriteSpace(SIZE_SHORT);
-      getCurrentBuffer().putShort(val);
-      pos += SIZE_SHORT;
-   }
-
-   public void putDouble(double val)
-   {
-      checkWriteSpace(SIZE_DOUBLE);
-      getCurrentBuffer().putDouble(val);
-      pos += SIZE_DOUBLE;
-   }
-
-   public void putChar(char val)
-   {
-      checkWriteSpace(SIZE_CHAR);
-      getCurrentBuffer().putChar(val);
-      pos += SIZE_CHAR;
-   }
-
-   public void putUTF(String str)
-   {
-      //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
-		//(putPrefixedString)
-		ByteBuffer bb = utf8.encode(str);
-   	putInt(bb.limit() - bb.position());
-   	putBytes(bb.array());
-   }
-
-   public void putString(String string)
-   {
-      putInt(string.length());
-
-		for (int i = 0; i < string.length(); i++)
-		{
-			putChar(string.charAt(i));
-		}
-   }
-
-   public String getString()
-   {
-      int len = getInt();
-
-   	char[] chars = new char[len];
-
-      for (int i = 0; i < len; i++)
-      {
-      	chars[i] = getChar();
-      }
-
-      return new String(chars);
-   }
-
-   public void getBytes(byte[] data, int i, int offset)
-   {
-      int size = offset - i;
-      int read = 0;
-      while (read < size)
-      {
-         int left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
-
-         getCurrentBuffer().getBytes(data, read, left);
-         read += left;
-         pos += read;
-         if(read < size)
-         {
-            bufferPos++;
-         }
-
-      }
-   }
-
-   public String getUTF() throws Exception
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   public int getInt()
-   {
-      checkReadSpace(SIZE_INT);
-      pos += SIZE_INT;
-      return getCurrentBuffer().getInt();
-   }
-
-   public long getLong()
-   {
-      checkReadSpace(SIZE_LONG);
-      pos += SIZE_LONG;
-      return getCurrentBuffer().getLong();
-   }
-
-   public short getShort()
-   {
-      checkReadSpace(SIZE_SHORT);
-      pos += SIZE_SHORT;
-      return getCurrentBuffer().getShort();
-   }
-
-   public boolean getBoolean()
-   {
-      checkReadSpace(SIZE_BOOLEAN);
-      pos += SIZE_BOOLEAN;
-      return getCurrentBuffer().getBoolean();
-   }
-
-   public String getNullableString()
-   {
-      byte check = getByte();
-
-		if (check == DataConstants.NULL)
-		{
-			return null;
-		}
-		else
-		{
-			return getString();
-		}
-   }
-
-   public SimpleString getSimpleString()
-   {
-      int len = getInt();
-
-   	byte[] data = new byte[len];
-   	getBytes(data);
-
-   	return new SimpleString(data);
-   }
-
-    public SimpleString getNullableSimpleString()
-   {
-      int b = getByte();
-   	if (b == DataConstants.NULL)
-   	{
-   	   return null;
-   	}
-   	else
-   	{
-         return getSimpleString();
-   	}
-   }
-
-   public byte getByte()
-   {
-      checkReadSpace(SIZE_BYTE);
-      return getCurrentBuffer().getByte();
-   }
-
-   public void getBytes(byte[] data)
-   {
-      int left = getCurrentBuffer().remaining();
-      if(data.length <= left)
-      {
-         getCurrentBuffer().getBytes(data);
-         pos += data.length;
-      }
-      else
-      {
-         int size = data.length;
-         int read = 0;
-         while (read < size)
-         {
-            left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
-
-            getCurrentBuffer().getBytes(data, read, left);
-            read += left;
-            pos += read;
-            if(read < size)
-            {
-               bufferPos++;
-            }
-
-         }
-
-      }
-   }
-
-
-   public float getFloat()
-   {
-      checkReadSpace(SIZE_FLOAT);
-      pos += SIZE_FLOAT;
-      return getCurrentBuffer().getFloat();
-   }
-
-   public double getDouble()
-   {
-      checkReadSpace(SIZE_DOUBLE);
-      pos += SIZE_DOUBLE;
-      return getCurrentBuffer().getDouble();
-   }
-
-   public char getChar()
-   {
-      checkReadSpace(SIZE_CHAR);
-      pos += SIZE_CHAR;
-      return getCurrentBuffer().getChar();
-   }
-
-   public short getUnsignedByte()
-   {
-      checkReadSpace(SIZE_SHORT);
-      pos += SIZE_SHORT;
-      return getCurrentBuffer().getUnsignedByte();
-   }
-
-   public int getUnsignedShort()
-   {
-     checkReadSpace(SIZE_INT);
-      pos += SIZE_INT;
-      return getCurrentBuffer().getUnsignedByte();
-   }
-
-   public void clearBody()
-   {
-      bufferPos = 0;
-      pos = 0;
-      MessagingBuffer buff = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-      bodySize = PacketImpl.INITIAL_BUFFER_SIZE;
-      buffersSizes.clear();
-      buffers.clear();
-      buffers.add(buff);
-   }
-
-
-   public int getBodySize()
-   {
-      /*int size = 0;
-      for (MessagingBuffer buffer : buffers)
-      {
-         size += buffer.limit();
-      }
-      return size;*/
-      return bodySize;
-   }
-
-   public void reset()
-   {
-      bufferPos = 0;
-      pos = 0;
-      for (MessagingBuffer buffer : buffers)
-      {
-         buffer.rewind();
-      }
-   }
-
-   public void flip()
-   {
-      for (MessagingBuffer buffer : buffers)
-      {
-         buffer.flip();
-         bodySize += buffer.limit();
-      }
-      pos = 0;
-   }
-
-   public void rewind()
-   {
-      bufferPos = 0;
-      pos = 0;
-      for (MessagingBuffer buffer : buffers)
-      {
-         buffer.rewind();
-      }
-   }
-
-   public int remaining()
-   {
-      return bodySize - pos;
-   }
-
-   public void putBuffer(MessagingBuffer body)
-   {
-      buffers.add(body);
-      buffersSizes.add(body.limit() - 5);
-      bodySize += body.limit() - 5;
-   }
-
-   public void transferRemainingBuffer(Encoder buffer, int len)
-   {
-      for(int i = bufferPos + 1; i < buffers.size(); i++)
-      {
-         buffer.putBuffer(buffers.get(i));
-      }
-   }
    // Package protected ---------------------------------------------
 
     // Private -------------------------------------------------------
 
-   private void checkReadSpace(int size)
-   {
-      if(getCurrentBuffer().position() + size > getCurrentBuffersSize() + SIZE_INT )
-      {
-         bufferPos++;
-      }
-   }
-
-   private void checkWriteSpace(int size)
-   {
-      if(getCurrentBuffer().remaining() < size)
-      {
-         setNextFragment();
-      }
-   }
-
-   private void setNextFragment()
-   {
-      MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-      buffer.putInt(-1);
-      buffer.putInt(-1);
-      buffer.putBoolean(false);
-      buffers.add(buffer);
-      bufferPos++;
-   }
-
-
-   private MessagingBuffer getCurrentBuffer()
-   {
-      try
-      {
-         return buffers.get(bufferPos);
-      }
-      catch (Exception e)
-      {
-         return null;
-      }
-   }
-
-   private int getCurrentBuffersSize()
-   {
-      return buffersSizes.get(bufferPos);
-   }
-
-   // Inner classes -------------------------------------------------  
+   // Inner classes -------------------------------------------------
 }

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
@@ -0,0 +1,537 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.Encoder;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
+
+import javax.transaction.xa.Xid;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class EncoderImpl implements Encoder
+{
+   private static final Charset utf8 = Charset.forName("UTF-8");
+   protected List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
+   protected List<Integer> buffersSizes = new ArrayList<Integer>();
+   private int bufferPos = 0;
+   protected int pos = 0;
+   protected int bodySize = 0;
+
+   public void putXidType(Xid xid)
+   {
+      putInt(xid.getFormatId());
+      putInt(xid.getBranchQualifier().length);
+      putBytes(xid.getBranchQualifier());
+      putInt(xid.getGlobalTransactionId().length);
+      putBytes(xid.getGlobalTransactionId());
+   }
+
+   public void putFloat(float val)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_FLOAT);
+      getCurrentBuffer().putFloat(val);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
+   }
+
+   public void putInt(int i)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
+      getCurrentBuffer().putInt(i);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
+   }
+
+   public void putByte(byte b)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_BYTE);
+      getCurrentBuffer().putByte(b);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+   }
+
+   public void putLong(long l)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_LONG);
+      getCurrentBuffer().putLong(l);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_LONG;
+   }
+
+   public void putBoolean(boolean b)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN);
+      getCurrentBuffer().putBoolean(b);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
+   }
+
+   public void putNullableString(String nullableString)
+   {
+      if (nullableString == null)
+		{
+			putByte(DataConstants.NULL);
+		}
+		else
+		{
+			putByte(DataConstants.NOT_NULL);
+
+			putString(nullableString);
+		}
+   }
+
+   public void putSimpleString(SimpleString string)
+   {
+      byte[] data = string.getData();
+
+   	putInt(data.length);
+   	putBytes(data);
+   }
+
+   public void putNullableSimpleString(SimpleString string)
+   {
+      if (string == null)
+   	{
+   		putByte(DataConstants.NULL);
+   	}
+   	else
+   	{
+   	   putByte(DataConstants.NOT_NULL);
+   		putSimpleString(string);
+   	}
+   }
+
+   public void putBytes(byte[] bytes, int i, int i1)
+   {
+      if(getCurrentBuffer().remaining() > (i1 - i))
+      {
+         getCurrentBuffer().putBytes(bytes, i, i1);
+         pos += (i1 - i);
+      }
+      else
+      {
+         int written = 0;
+         while(written < i1)
+         {
+            int left = getCurrentBuffer().remaining();
+            int towrite = left + written > i1?i1 - written:left;
+            getCurrentBuffer().putBytes(bytes, i, towrite);
+            written+=towrite;
+            i += left;
+            pos += written;
+            if(written < i1)
+            {
+               setNextFragment();
+            }
+
+         }
+      }
+   }
+
+   public void putBytes(byte[] bytes)
+   {
+      putBytes(bytes, 0, bytes.length);
+   }
+
+   public void putShort(short val)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
+      getCurrentBuffer().putShort(val);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+   }
+
+   public void putDouble(double val)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_DOUBLE);
+      getCurrentBuffer().putDouble(val);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
+   }
+
+   public void putChar(char val)
+   {
+      checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_CHAR);
+      getCurrentBuffer().putChar(val);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_CHAR;
+   }
+
+   public void putUTF(String str)
+   {
+      //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
+		//(putPrefixedString)
+		ByteBuffer bb = utf8.encode(str);
+   	putInt(bb.limit() - bb.position());
+   	putBytes(bb.array());
+   }
+
+   public void putString(String string)
+   {
+      putInt(string.length());
+
+		for (int i = 0; i < string.length(); i++)
+		{
+			putChar(string.charAt(i));
+		}
+   }
+      public Xid getXidType()
+   {
+      int formatID = getInt();
+      byte[] bq = new byte[getInt()];
+      getBytes(bq);
+      byte[] gtxid = new byte[getInt()];
+      getBytes(gtxid);
+      return new XidImpl(bq, formatID, gtxid);
+   }
+
+
+   public List<Xid> getXids(int len)
+   {
+      List<Xid> xids = new ArrayList<Xid>();
+
+      for (int i = 0; i < len; i++)
+      {
+         int formatID = getInt();
+         byte[] bq = new byte[getInt()];
+         getBytes(bq);
+         byte[] gtxid = new byte[getInt()];
+         getBytes(gtxid);
+         Xid xid = new XidImpl(bq, formatID, gtxid);
+
+         xids.add(xid);
+      }
+      return xids;
+   }
+   
+   public String getString()
+   {
+      int len = getInt();
+
+   	char[] chars = new char[len];
+
+      for (int i = 0; i < len; i++)
+      {
+      	chars[i] = getChar();
+      }
+
+      return new String(chars);
+   }
+
+   public void getBytes(byte[] data, int i, int offset)
+   {
+      int size = offset - i;
+      int read = 0;
+      while (read < size)
+      {
+         int left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
+
+         getCurrentBuffer().getBytes(data, read, left);
+         read += left;
+         pos += read;
+         if(read < size)
+         {
+            bufferPos++;
+         }
+
+      }
+   }
+
+   public String getUTF() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public int getInt()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
+      return getCurrentBuffer().getInt();
+   }
+
+   public long getLong()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_LONG);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_LONG;
+      return getCurrentBuffer().getLong();
+   }
+
+   public short getShort()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+      return getCurrentBuffer().getShort();
+   }
+
+   public boolean getBoolean()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
+      return getCurrentBuffer().getBoolean();
+   }
+
+   public String getNullableString()
+   {
+      byte check = getByte();
+
+		if (check == DataConstants.NULL)
+		{
+			return null;
+		}
+		else
+		{
+			return getString();
+		}
+   }
+
+   public SimpleString getSimpleString()
+   {
+      int len = getInt();
+
+   	byte[] data = new byte[len];
+   	getBytes(data);
+
+   	return new SimpleString(data);
+   }
+
+   public SimpleString getNullableSimpleString()
+   {
+      int b = getByte();
+   	if (b == DataConstants.NULL)
+   	{
+   	   return null;
+   	}
+   	else
+   	{
+         return getSimpleString();
+   	}
+   }
+
+   public byte getByte()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_BYTE);
+      return getCurrentBuffer().getByte();
+   }
+
+   public void getBytes(byte[] data)
+   {
+      int left = getCurrentBuffer().remaining();
+      if(data.length <= left)
+      {
+         getCurrentBuffer().getBytes(data);
+         pos += data.length;
+      }
+      else
+      {
+         int size = data.length;
+         int read = 0;
+         while (read < size)
+         {
+            left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
+
+            getCurrentBuffer().getBytes(data, read, left);
+            read += left;
+            pos += read;
+            if(read < size)
+            {
+               bufferPos++;
+            }
+
+         }
+
+      }
+   }
+
+   public float getFloat()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_FLOAT);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
+      return getCurrentBuffer().getFloat();
+   }
+
+   public double getDouble()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_DOUBLE);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
+      return getCurrentBuffer().getDouble();
+   }
+
+   public char getChar()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_CHAR);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_CHAR;
+      return getCurrentBuffer().getChar();
+   }
+
+   public short getUnsignedByte()
+   {
+      checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+      return getCurrentBuffer().getUnsignedByte();
+   }
+
+   public int getUnsignedShort()
+   {
+     checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
+      pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
+      return getCurrentBuffer().getUnsignedByte();
+   }
+
+   public void clearBody()
+   {
+      bufferPos = 0;
+      pos = 0;
+      MessagingBuffer buff = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+      bodySize = PacketImpl.INITIAL_BUFFER_SIZE;
+      buffersSizes.clear();
+      buffers.clear();
+      buffers.add(buff);
+   }
+
+   public int getBodySize()
+   {
+      return bodySize;
+   }
+
+   public void reset()
+   {
+      bufferPos = 0;
+      pos = 0;
+      for (MessagingBuffer buffer : buffers)
+      {
+         buffer.rewind();
+      }
+   }
+
+   public void flip()
+   {
+      for (MessagingBuffer buffer : buffers)
+      {
+         buffer.flip();
+         bodySize += buffer.limit();
+      }
+      pos = 0;
+   }
+
+   public void rewind()
+   {
+      bufferPos = 0;
+      pos = 0;
+      for (MessagingBuffer buffer : buffers)
+      {
+         buffer.rewind();
+      }
+   }
+
+   public int remaining()
+   {
+      return bodySize - pos;
+   }
+
+   public void putBuffer(MessagingBuffer body)
+   {
+      buffers.add(body);
+      buffersSizes.add(body.limit() - 5);
+      bodySize += body.limit() - 5;
+   }
+
+   public void transferRemainingBuffer(Encoder buffer, int len)
+   {
+      for(int i = bufferPos + 1; i < buffers.size(); i++)
+      {
+         buffer.putBuffer(buffers.get(i));
+      }
+   }
+
+   private void checkReadSpace(int size)
+   {
+      if(getCurrentBuffer().position() + size > getCurrentBuffersSize() + org.jboss.messaging.util.DataConstants.SIZE_INT)
+      {
+         bufferPos++;
+      }
+   }
+
+   private void checkWriteSpace(int size)
+   {
+      if(getCurrentBuffer().remaining() < size)
+      {
+         setNextFragment();
+      }
+   }
+
+   private void setNextFragment()
+   {
+      MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+      buffer.putInt(-1);
+      buffer.putInt(-1);
+      buffer.putBoolean(false);
+      buffers.add(buffer);
+      bufferPos++;
+   }
+
+   protected MessagingBuffer getCurrentBuffer()
+   {
+      try
+      {
+         return buffers.get(bufferPos);
+      }
+      catch (Exception e)
+      {
+         return null;
+      }
+   }
+
+   private int getCurrentBuffersSize()
+   {
+      return buffersSizes.get(bufferPos);
+   }
+
+   public List<MessagingBuffer> getBodyType()
+   {
+      int bodyLength = getInt();
+      //if we are part way thru a buffer it must we need to copy into new buffer
+      if(getCurrentBuffer().position() - org.jboss.messaging.util.DataConstants.SIZE_INT != getCurrentBuffersSize())
+      {
+         MessagingBuffer body = getCurrentBuffer().createNewBuffer(bodyLength);
+         byte[] bytes = new byte[bodyLength];
+         getCurrentBuffer().getBytes(bytes);
+         body.putBytes(bytes);
+         List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
+         theBody.add(body);
+         return theBody;
+      }
+      //else we just return the remaining buffers
+      else
+      {
+         List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
+         for(int i = bufferPos; i < buffers.size(); i++)
+         {
+            theBody.add(buffers.get(i));
+            bufferPos++;
+         }
+         bufferPos++;
+         return theBody;
+      }
+   }
+}

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-13 12:49:04 UTC (rev 4799)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
@@ -24,18 +24,12 @@
 
 
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Encoder;
 import org.jboss.messaging.core.remoting.MessagingBuffer;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.util.DataConstants;
-import static org.jboss.messaging.util.DataConstants.*;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.remoting.impl.EncoderImpl;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
 
-import javax.transaction.xa.Xid;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -44,7 +38,7 @@
  * 
  * @version <tt>$Revision$</tt>
  */
-public class PacketImpl implements Packet
+public class PacketImpl extends EncoderImpl implements Packet
 {
    // Constants -----------------------------------------------------
    
@@ -136,10 +130,6 @@
    public static final byte PROD_RECEIVETOKENS = 101;
    
    public static final byte RECEIVE_MSG = 110;
-
-   private List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
-   private List<Integer> buffersSizes = new ArrayList<Integer>();
-   private int currentPos = 0;
    private int packetid;
 
    // Static --------------------------------------------------------
@@ -272,342 +262,6 @@
              r.executorID == this.executorID &&
              r.targetID == this.targetID;
    }
-
-   public void putXidType(Xid xid)
-   {
-      putInt(xid.getFormatId());
-      putInt(xid.getBranchQualifier().length);
-      putBytes(xid.getBranchQualifier());
-      putInt(xid.getGlobalTransactionId().length);
-      putBytes(xid.getGlobalTransactionId());
-   }
-
-   public void putFloat(float val)
-   {
-      checkWriteSpace(SIZE_FLOAT);
-      getCurrentBuffer().putFloat(val);
-   }
-
-   public void putInt(int i)
-   {
-      checkWriteSpace(SIZE_INT);
-      getCurrentBuffer().putInt(i);
-   }
-
-   public void putByte(byte b)
-   {
-      checkWriteSpace(SIZE_BYTE);
-      getCurrentBuffer().putByte(b);
-   }
-
-   public void putLong(long l)
-   {
-      checkWriteSpace(SIZE_LONG);
-      getCurrentBuffer().putLong(l);
-   }
-
-   public void putBoolean(boolean b)
-   {
-      checkWriteSpace(SIZE_BOOLEAN);
-      getCurrentBuffer().putBoolean(b);
-   }
-
-   public void putNullableString(String nullableString)
-   {
-      if (nullableString == null)
-		{
-			putByte(DataConstants.NULL);
-		}
-		else
-		{
-			putByte(DataConstants.NOT_NULL);
-
-			putString(nullableString);
-		}
-   }
-
-   public void putSimpleString(SimpleString string)
-   {
-      byte[] data = string.getData();
-
-   	putInt(data.length);
-   	putBytes(data);
-   }
-
-   public void putNullableSimpleString(SimpleString string)
-   {
-      if (string == null)
-   	{
-   		putByte(DataConstants.NULL);
-   	}
-   	else
-   	{
-   	   putByte(DataConstants.NOT_NULL);
-   		putSimpleString(string);
-   	}
-   }
-
-   public void putBytes(byte[] bytes, int i, int i1)
-   {
-      if(getCurrentBuffer().remaining() >= (i1 - i))
-      {
-         getCurrentBuffer().putBytes(bytes, i, i1);
-      }
-      else
-      {
-         int written = 0;
-         while(written < i1)
-         {
-            int left = getCurrentBuffer().remaining();
-            int towrite = left + written > i1?i1 - written:left;
-            getCurrentBuffer().putBytes(bytes, i, towrite);
-            written+=towrite;
-            i += left;
-            if(written < i1)
-            {
-               setNextFragment();
-            }
-         }
-      }
-   }
-
-   public void putBytes(byte[] bytes)
-   {
-      putBytes(bytes, 0, bytes.length);
-   }
-
-   public void putShort(short val)
-   {
-      checkWriteSpace(SIZE_SHORT);
-      getCurrentBuffer().putShort(val);
-   }
-
-   public void putDouble(double val)
-   {
-      checkWriteSpace(SIZE_DOUBLE);
-      getCurrentBuffer().putDouble(val);
-   }
-
-   public void putChar(char val)
-   {
-      checkWriteSpace(SIZE_CHAR);
-      getCurrentBuffer().putChar(val);
-   }
-
-   public void putUTF(String str)
-   {
-      //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
-		//(putPrefixedString)
-		ByteBuffer bb = utf8.encode(str);
-   	putInt(bb.limit() - bb.position());
-   	putBytes(bb.array());
-   }
-
-   public void putString(String string)
-   {
-      putInt(string.length());
-
-		for (int i = 0; i < string.length(); i++)
-		{
-			putChar(string.charAt(i));
-		}
-   }
-
-   public String getString()
-   {
-      int len = getInt();
-
-   	char[] chars = new char[len];
-
-      for (int i = 0; i < len; i++)
-      {
-      	chars[i] = getChar();
-      }
-
-      return new String(chars);
-   }
-
-   public Xid getXidType()
-   {
-      int formatID = getInt();
-      byte[] bq = new byte[getInt()];
-      getBytes(bq);
-      byte[] gtxid = new byte[getInt()];
-      getBytes(gtxid);
-      Xid xid = new XidImpl(bq, formatID, gtxid);
-      return xid;
-   }
-
-
-   public List<Xid> getXids(int len)
-   {
-      List<Xid> xids = new ArrayList<Xid>();
-
-      //  new ArrayList<Xid>(len);
-      for (int i = 0; i < len; i++)
-      {
-         int formatID = getInt();
-         byte[] bq = new byte[getInt()];
-         getBytes(bq);
-         byte[] gtxid = new byte[getInt()];
-         getBytes(gtxid);
-         Xid xid = new XidImpl(bq, formatID, gtxid);
-
-         xids.add(xid);
-      }
-      return xids;
-   }
-
-   public void getBytes(byte[] value, int i, int read)
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   public String getUTF() throws Exception
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   public int getInt()
-   {
-      checkReadSpace(SIZE_INT);
-      return getCurrentBuffer().getInt();
-   }
-
-   public long getLong()
-   {
-      checkReadSpace(SIZE_LONG);
-      return getCurrentBuffer().getLong();
-   }
-
-   public short getShort()
-   {
-      checkReadSpace(SIZE_SHORT);
-      return getCurrentBuffer().getShort();
-   }
-
-   public boolean getBoolean()
-   {
-      checkReadSpace(SIZE_BOOLEAN);
-      return getCurrentBuffer().getBoolean();
-   }
-
-   public String getNullableString()
-   {
-      byte check = getByte();
-
-		if (check == DataConstants.NULL)
-		{
-			return null;
-		}
-		else
-		{
-			return getString();
-		}
-   }
-
-   public SimpleString getSimpleString()
-   {
-      int len = getInt();
-
-   	byte[] data = new byte[len];
-   	getBytes(data);
-
-   	return new SimpleString(data);
-   }
-
-    public SimpleString getNullableSimpleString()
-   {
-      int b = getByte();
-   	if (b == DataConstants.NULL)
-   	{
-   	   return null;
-   	}
-   	else
-   	{
-         return getSimpleString();
-   	}
-   }
-
-   public byte getByte()
-   {
-      checkReadSpace(SIZE_BYTE);
-      return getCurrentBuffer().getByte();
-   }
-
-   public void getBytes(byte[] data)
-   {
-      int left = getCurrentBuffer().remaining();
-      if(data.length <= left)
-      {
-         getCurrentBuffer().getBytes(data);
-      }
-      else
-      {
-         int size = data.length;
-         int read = 0;
-         while (read < size)
-         {
-            left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
-
-            getCurrentBuffer().getBytes(data, read, left);
-            read += left;
-            if(read < size)
-            {
-               currentPos++;
-            }
-
-         }
-
-      }
-   }
-
-
-   public float getFloat()
-   {
-      checkReadSpace(SIZE_FLOAT);
-      return getCurrentBuffer().getFloat();
-   }
-
-   public double getDouble()
-   {
-      checkReadSpace(SIZE_DOUBLE);
-      return getCurrentBuffer().getDouble();
-   }
-
-   public char getChar()
-   {
-      checkReadSpace(SIZE_CHAR);
-      return getCurrentBuffer().getChar();
-   }
-
-   public short getUnsignedByte()
-   {
-      checkReadSpace(SIZE_SHORT);
-      return getCurrentBuffer().getUnsignedByte();
-   }
-
-   public int getUnsignedShort()
-   {
-     checkReadSpace(SIZE_INT);
-      return getCurrentBuffer().getUnsignedByte();
-   }
-
-   public void putBuffer(MessagingBuffer body)
-   {
-      body.putInt(SIZE_INT, packetid);
-      buffers.add(body);
-      //buffersSizes.add(len);
-   }
-
-   public void transferRemainingBuffer(Encoder buffer, int len)
-   {
-      for(int i = currentPos + 1; i < buffers.size(); i++)
-      {
-         buffer.putBuffer(buffers.get(i));
-      }
-   }
-
    public List<MessagingBuffer> getBuffers()
    {
       for (int i = 0; i < buffers.size(); i++)
@@ -634,34 +288,6 @@
       buffersSizes.add(packetLength);
    }
 
-   public List<MessagingBuffer> getBodyType()
-   {
-      int bodyLength = getInt();
-      //if we are part way thru a buffer it must we need to copy into new buffer
-      if(getCurrentBuffer().position() - SIZE_INT != getCurrentBuffersSize())
-      {
-         MessagingBuffer body = getCurrentBuffer().createNewBuffer(bodyLength);
-         byte[] bytes = new byte[bodyLength];
-         getCurrentBuffer().getBytes(bytes);
-         body.putBytes(bytes);
-         List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
-         theBody.add(body);
-         return theBody;
-      }
-      //else we just return the remaining buffers
-      else
-      {
-         List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
-         for(int i = currentPos; i < buffers.size(); i++)
-         {
-            theBody.add(buffers.get(i));
-            currentPos++;
-         }
-         currentPos++;
-         return theBody;
-      }
-   }
-
    public int remaining()
    {
       return getCurrentBuffer().remaining();
@@ -680,45 +306,6 @@
 
     // Private -------------------------------------------------------
 
-   private void checkReadSpace(int size)
-   {
-      if(getCurrentBuffer().position() + size > getCurrentBuffersSize() + SIZE_INT )
-      {
-         currentPos++;
-      }
-   }
-
-   private void checkWriteSpace(int size)
-   {
-      if(getCurrentBuffer().remaining() < size)
-      {
-         setNextFragment();
-      }
-   }
-
-   private void setNextFragment()
-   {
-      buffers.add(buffers.get(0).createNewBuffer(INITIAL_BUFFER_SIZE));
-      currentPos++;
-   }
-
-
-   private MessagingBuffer getCurrentBuffer()
-   {
-      try
-      {
-         return buffers.get(currentPos);
-      }
-      catch (Exception e)
-      {
-         return null;
-      }
-   }
-
-   private int getCurrentBuffersSize()
-   {
-      return buffersSizes.get(currentPos);
-   }
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------




More information about the jboss-cvs-commits mailing list