[jboss-cvs] JBoss Messaging SVN: r4800 - in branches/Branch_Message_Chunking_new: src/main/org/jboss/messaging/core/message/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 13 10:43:30 EDT 2008
Author: ataylor
Date: 2008-08-13 10:43:30 -0400 (Wed, 13 Aug 2008)
New Revision: 4800
Added:
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
Modified:
branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
Log:
tidied up
Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-13 12:49:04 UTC (rev 4799)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-13 14:43:30 UTC (rev 4800)
@@ -49,7 +49,7 @@
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queue);
final BytesMessage message = session.createBytesMessage();
- byte[] bytes = new byte[900];
+ byte[] bytes = new byte[40000];
for(int i = 0; i < bytes.length; i++)
{
bytes[i] = (byte) i;
@@ -58,38 +58,8 @@
message.writeBytes(bytes);
log.info("sending message to queue");
producer.send(message);
- producer.send(message);
- /* new Thread(new Runnable()
- {
- public void run()
- {
+ //producer.send(message);
- try
- {
- producer.send(message);
- }
- catch (JMSException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }).start();*/
- /* new Thread(new Runnable()
- {
- public void run()
- {
-
- try
- {
- producer.send(message);
- }
- catch (JMSException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }).start();*/
-
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
@@ -107,7 +77,7 @@
throw new RuntimeException(i+"");
}
}
- message2 = (BytesMessage) messageConsumer.receive(50000000);
+ /*message2 = (BytesMessage) messageConsumer.receive(50000000);
log.info("message received from queue");
message2.readBytes(newbytes);
//log.info("message = " + new String(newbytes));
@@ -121,7 +91,7 @@
{
throw new RuntimeException(i+"");
}
- }
+ }*/
/*message2 = (BytesMessage) messageConsumer.receive(50000000);
log.info("message received from queue");
message2.readBytes(newbytes);
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-13 12:49:04 UTC (rev 4799)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
@@ -28,17 +28,14 @@
import org.jboss.messaging.core.remoting.Encoder;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.EncoderImpl;
import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.util.DataConstants;
import static org.jboss.messaging.util.DataConstants.*;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Set;
/**
@@ -56,14 +53,12 @@
*
* $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
*/
-public abstract class MessageImpl implements Message
+public abstract class MessageImpl extends EncoderImpl implements Message
{
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(MessageImpl.class);
- private static final Charset utf8 = Charset.forName("UTF-8");
-
public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBMActualExpiryTime");
// Attributes ----------------------------------------------------
@@ -83,11 +78,6 @@
private byte priority;
- protected List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
- protected List<Integer> buffersSizes = new ArrayList<Integer>();
- private int bufferPos = 0;
- private int pos = 0;
- protected int bodySize = 0;
private boolean isBodySplit = true;
// Constructors --------------------------------------------------
@@ -166,6 +156,7 @@
for (int i = 0; i < buffers.size(); i++)
{
MessagingBuffer buffer = buffers.get(i);
+ //todo fix horrible hack, there is a mina bug so slicing the buffer doesnt seem to work, so i need to make a copy in case of multiple sends
IoBuffer copied = IoBuffer.allocate(buffer.limit());
copied.put(buffer.array(), 0, buffer.limit());
copied.setAutoExpand(true);
@@ -260,7 +251,6 @@
/* BodySize and Body */ SIZE_INT + getBodySize();
}
-
public SimpleString getDestination()
@@ -404,441 +394,9 @@
// Body
// -------------------------------------------------------------------------------------
- public void putFloat(float val)
- {
- checkWriteSpace(SIZE_FLOAT);
- getCurrentBuffer().putFloat(val);
- pos += SIZE_FLOAT;
- }
-
- public void putInt(int i)
- {
- checkWriteSpace(SIZE_INT);
- getCurrentBuffer().putInt(i);
- pos += SIZE_INT;
- }
-
- public void putByte(byte b)
- {
- checkWriteSpace(SIZE_BYTE);
- getCurrentBuffer().putByte(b);
- pos += SIZE_BYTE;
- }
-
- public void putLong(long l)
- {
- checkWriteSpace(SIZE_LONG);
- getCurrentBuffer().putLong(l);
- pos += SIZE_LONG;
- }
-
- public void putBoolean(boolean b)
- {
- checkWriteSpace(SIZE_BOOLEAN);
- getCurrentBuffer().putBoolean(b);
- pos += SIZE_BOOLEAN;
- }
-
- public void putNullableString(String nullableString)
- {
- if (nullableString == null)
- {
- putByte(DataConstants.NULL);
- }
- else
- {
- putByte(DataConstants.NOT_NULL);
-
- putString(nullableString);
- }
- }
-
- public void putSimpleString(SimpleString string)
- {
- byte[] data = string.getData();
-
- putInt(data.length);
- putBytes(data);
- }
-
- public void putNullableSimpleString(SimpleString string)
- {
- if (string == null)
- {
- putByte(DataConstants.NULL);
- }
- else
- {
- putByte(DataConstants.NOT_NULL);
- putSimpleString(string);
- }
- }
-
- public void putBytes(byte[] bytes, int i, int i1)
- {
- if(getCurrentBuffer().remaining() > (i1 - i))
- {
- getCurrentBuffer().putBytes(bytes, i, i1);
- pos += (i1 - i);
- }
- else
- {
- int written = 0;
- while(written < i1)
- {
- int left = getCurrentBuffer().remaining();
- int towrite = left + written > i1?i1 - written:left;
- getCurrentBuffer().putBytes(bytes, i, towrite);
- written+=towrite;
- i += left;
- pos += written;
- if(written < i1)
- {
- setNextFragment();
- }
-
- }
- }
- }
-
- public void putBytes(byte[] bytes)
- {
- putBytes(bytes, 0, bytes.length);
- }
-
- public void putShort(short val)
- {
- checkWriteSpace(SIZE_SHORT);
- getCurrentBuffer().putShort(val);
- pos += SIZE_SHORT;
- }
-
- public void putDouble(double val)
- {
- checkWriteSpace(SIZE_DOUBLE);
- getCurrentBuffer().putDouble(val);
- pos += SIZE_DOUBLE;
- }
-
- public void putChar(char val)
- {
- checkWriteSpace(SIZE_CHAR);
- getCurrentBuffer().putChar(val);
- pos += SIZE_CHAR;
- }
-
- public void putUTF(String str)
- {
- //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
- //(putPrefixedString)
- ByteBuffer bb = utf8.encode(str);
- putInt(bb.limit() - bb.position());
- putBytes(bb.array());
- }
-
- public void putString(String string)
- {
- putInt(string.length());
-
- for (int i = 0; i < string.length(); i++)
- {
- putChar(string.charAt(i));
- }
- }
-
- public String getString()
- {
- int len = getInt();
-
- char[] chars = new char[len];
-
- for (int i = 0; i < len; i++)
- {
- chars[i] = getChar();
- }
-
- return new String(chars);
- }
-
- public void getBytes(byte[] data, int i, int offset)
- {
- int size = offset - i;
- int read = 0;
- while (read < size)
- {
- int left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
-
- getCurrentBuffer().getBytes(data, read, left);
- read += left;
- pos += read;
- if(read < size)
- {
- bufferPos++;
- }
-
- }
- }
-
- public String getUTF() throws Exception
- {
- throw new UnsupportedOperationException();
- }
-
- public int getInt()
- {
- checkReadSpace(SIZE_INT);
- pos += SIZE_INT;
- return getCurrentBuffer().getInt();
- }
-
- public long getLong()
- {
- checkReadSpace(SIZE_LONG);
- pos += SIZE_LONG;
- return getCurrentBuffer().getLong();
- }
-
- public short getShort()
- {
- checkReadSpace(SIZE_SHORT);
- pos += SIZE_SHORT;
- return getCurrentBuffer().getShort();
- }
-
- public boolean getBoolean()
- {
- checkReadSpace(SIZE_BOOLEAN);
- pos += SIZE_BOOLEAN;
- return getCurrentBuffer().getBoolean();
- }
-
- public String getNullableString()
- {
- byte check = getByte();
-
- if (check == DataConstants.NULL)
- {
- return null;
- }
- else
- {
- return getString();
- }
- }
-
- public SimpleString getSimpleString()
- {
- int len = getInt();
-
- byte[] data = new byte[len];
- getBytes(data);
-
- return new SimpleString(data);
- }
-
- public SimpleString getNullableSimpleString()
- {
- int b = getByte();
- if (b == DataConstants.NULL)
- {
- return null;
- }
- else
- {
- return getSimpleString();
- }
- }
-
- public byte getByte()
- {
- checkReadSpace(SIZE_BYTE);
- return getCurrentBuffer().getByte();
- }
-
- public void getBytes(byte[] data)
- {
- int left = getCurrentBuffer().remaining();
- if(data.length <= left)
- {
- getCurrentBuffer().getBytes(data);
- pos += data.length;
- }
- else
- {
- int size = data.length;
- int read = 0;
- while (read < size)
- {
- left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
-
- getCurrentBuffer().getBytes(data, read, left);
- read += left;
- pos += read;
- if(read < size)
- {
- bufferPos++;
- }
-
- }
-
- }
- }
-
-
- public float getFloat()
- {
- checkReadSpace(SIZE_FLOAT);
- pos += SIZE_FLOAT;
- return getCurrentBuffer().getFloat();
- }
-
- public double getDouble()
- {
- checkReadSpace(SIZE_DOUBLE);
- pos += SIZE_DOUBLE;
- return getCurrentBuffer().getDouble();
- }
-
- public char getChar()
- {
- checkReadSpace(SIZE_CHAR);
- pos += SIZE_CHAR;
- return getCurrentBuffer().getChar();
- }
-
- public short getUnsignedByte()
- {
- checkReadSpace(SIZE_SHORT);
- pos += SIZE_SHORT;
- return getCurrentBuffer().getUnsignedByte();
- }
-
- public int getUnsignedShort()
- {
- checkReadSpace(SIZE_INT);
- pos += SIZE_INT;
- return getCurrentBuffer().getUnsignedByte();
- }
-
- public void clearBody()
- {
- bufferPos = 0;
- pos = 0;
- MessagingBuffer buff = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
- bodySize = PacketImpl.INITIAL_BUFFER_SIZE;
- buffersSizes.clear();
- buffers.clear();
- buffers.add(buff);
- }
-
-
- public int getBodySize()
- {
- /*int size = 0;
- for (MessagingBuffer buffer : buffers)
- {
- size += buffer.limit();
- }
- return size;*/
- return bodySize;
- }
-
- public void reset()
- {
- bufferPos = 0;
- pos = 0;
- for (MessagingBuffer buffer : buffers)
- {
- buffer.rewind();
- }
- }
-
- public void flip()
- {
- for (MessagingBuffer buffer : buffers)
- {
- buffer.flip();
- bodySize += buffer.limit();
- }
- pos = 0;
- }
-
- public void rewind()
- {
- bufferPos = 0;
- pos = 0;
- for (MessagingBuffer buffer : buffers)
- {
- buffer.rewind();
- }
- }
-
- public int remaining()
- {
- return bodySize - pos;
- }
-
- public void putBuffer(MessagingBuffer body)
- {
- buffers.add(body);
- buffersSizes.add(body.limit() - 5);
- bodySize += body.limit() - 5;
- }
-
- public void transferRemainingBuffer(Encoder buffer, int len)
- {
- for(int i = bufferPos + 1; i < buffers.size(); i++)
- {
- buffer.putBuffer(buffers.get(i));
- }
- }
// Package protected ---------------------------------------------
// Private -------------------------------------------------------
- private void checkReadSpace(int size)
- {
- if(getCurrentBuffer().position() + size > getCurrentBuffersSize() + SIZE_INT )
- {
- bufferPos++;
- }
- }
-
- private void checkWriteSpace(int size)
- {
- if(getCurrentBuffer().remaining() < size)
- {
- setNextFragment();
- }
- }
-
- private void setNextFragment()
- {
- MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
- buffer.putInt(-1);
- buffer.putInt(-1);
- buffer.putBoolean(false);
- buffers.add(buffer);
- bufferPos++;
- }
-
-
- private MessagingBuffer getCurrentBuffer()
- {
- try
- {
- return buffers.get(bufferPos);
- }
- catch (Exception e)
- {
- return null;
- }
- }
-
- private int getCurrentBuffersSize()
- {
- return buffersSizes.get(bufferPos);
- }
-
- // Inner classes -------------------------------------------------
+ // Inner classes -------------------------------------------------
}
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
@@ -0,0 +1,537 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.Encoder;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
+
+import javax.transaction.xa.Xid;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class EncoderImpl implements Encoder
+{
+ private static final Charset utf8 = Charset.forName("UTF-8");
+ protected List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
+ protected List<Integer> buffersSizes = new ArrayList<Integer>();
+ private int bufferPos = 0;
+ protected int pos = 0;
+ protected int bodySize = 0;
+
+ public void putXidType(Xid xid)
+ {
+ putInt(xid.getFormatId());
+ putInt(xid.getBranchQualifier().length);
+ putBytes(xid.getBranchQualifier());
+ putInt(xid.getGlobalTransactionId().length);
+ putBytes(xid.getGlobalTransactionId());
+ }
+
+ public void putFloat(float val)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_FLOAT);
+ getCurrentBuffer().putFloat(val);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
+ }
+
+ public void putInt(int i)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
+ getCurrentBuffer().putInt(i);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
+ }
+
+ public void putByte(byte b)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_BYTE);
+ getCurrentBuffer().putByte(b);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+ }
+
+ public void putLong(long l)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_LONG);
+ getCurrentBuffer().putLong(l);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_LONG;
+ }
+
+ public void putBoolean(boolean b)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN);
+ getCurrentBuffer().putBoolean(b);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
+ }
+
+ public void putNullableString(String nullableString)
+ {
+ if (nullableString == null)
+ {
+ putByte(DataConstants.NULL);
+ }
+ else
+ {
+ putByte(DataConstants.NOT_NULL);
+
+ putString(nullableString);
+ }
+ }
+
+ public void putSimpleString(SimpleString string)
+ {
+ byte[] data = string.getData();
+
+ putInt(data.length);
+ putBytes(data);
+ }
+
+ public void putNullableSimpleString(SimpleString string)
+ {
+ if (string == null)
+ {
+ putByte(DataConstants.NULL);
+ }
+ else
+ {
+ putByte(DataConstants.NOT_NULL);
+ putSimpleString(string);
+ }
+ }
+
+ public void putBytes(byte[] bytes, int i, int i1)
+ {
+ if(getCurrentBuffer().remaining() > (i1 - i))
+ {
+ getCurrentBuffer().putBytes(bytes, i, i1);
+ pos += (i1 - i);
+ }
+ else
+ {
+ int written = 0;
+ while(written < i1)
+ {
+ int left = getCurrentBuffer().remaining();
+ int towrite = left + written > i1?i1 - written:left;
+ getCurrentBuffer().putBytes(bytes, i, towrite);
+ written+=towrite;
+ i += left;
+ pos += written;
+ if(written < i1)
+ {
+ setNextFragment();
+ }
+
+ }
+ }
+ }
+
+ public void putBytes(byte[] bytes)
+ {
+ putBytes(bytes, 0, bytes.length);
+ }
+
+ public void putShort(short val)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
+ getCurrentBuffer().putShort(val);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+ }
+
+ public void putDouble(double val)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_DOUBLE);
+ getCurrentBuffer().putDouble(val);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
+ }
+
+ public void putChar(char val)
+ {
+ checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_CHAR);
+ getCurrentBuffer().putChar(val);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_CHAR;
+ }
+
+ public void putUTF(String str)
+ {
+ //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
+ //(putPrefixedString)
+ ByteBuffer bb = utf8.encode(str);
+ putInt(bb.limit() - bb.position());
+ putBytes(bb.array());
+ }
+
+ public void putString(String string)
+ {
+ putInt(string.length());
+
+ for (int i = 0; i < string.length(); i++)
+ {
+ putChar(string.charAt(i));
+ }
+ }
+ public Xid getXidType()
+ {
+ int formatID = getInt();
+ byte[] bq = new byte[getInt()];
+ getBytes(bq);
+ byte[] gtxid = new byte[getInt()];
+ getBytes(gtxid);
+ return new XidImpl(bq, formatID, gtxid);
+ }
+
+
+ public List<Xid> getXids(int len)
+ {
+ List<Xid> xids = new ArrayList<Xid>();
+
+ for (int i = 0; i < len; i++)
+ {
+ int formatID = getInt();
+ byte[] bq = new byte[getInt()];
+ getBytes(bq);
+ byte[] gtxid = new byte[getInt()];
+ getBytes(gtxid);
+ Xid xid = new XidImpl(bq, formatID, gtxid);
+
+ xids.add(xid);
+ }
+ return xids;
+ }
+
+ public String getString()
+ {
+ int len = getInt();
+
+ char[] chars = new char[len];
+
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = getChar();
+ }
+
+ return new String(chars);
+ }
+
+ public void getBytes(byte[] data, int i, int offset)
+ {
+ int size = offset - i;
+ int read = 0;
+ while (read < size)
+ {
+ int left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
+
+ getCurrentBuffer().getBytes(data, read, left);
+ read += left;
+ pos += read;
+ if(read < size)
+ {
+ bufferPos++;
+ }
+
+ }
+ }
+
+ public String getUTF() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getInt()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
+ return getCurrentBuffer().getInt();
+ }
+
+ public long getLong()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_LONG);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_LONG;
+ return getCurrentBuffer().getLong();
+ }
+
+ public short getShort()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+ return getCurrentBuffer().getShort();
+ }
+
+ public boolean getBoolean()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
+ return getCurrentBuffer().getBoolean();
+ }
+
+ public String getNullableString()
+ {
+ byte check = getByte();
+
+ if (check == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getString();
+ }
+ }
+
+ public SimpleString getSimpleString()
+ {
+ int len = getInt();
+
+ byte[] data = new byte[len];
+ getBytes(data);
+
+ return new SimpleString(data);
+ }
+
+ public SimpleString getNullableSimpleString()
+ {
+ int b = getByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getSimpleString();
+ }
+ }
+
+ public byte getByte()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_BYTE);
+ return getCurrentBuffer().getByte();
+ }
+
+ public void getBytes(byte[] data)
+ {
+ int left = getCurrentBuffer().remaining();
+ if(data.length <= left)
+ {
+ getCurrentBuffer().getBytes(data);
+ pos += data.length;
+ }
+ else
+ {
+ int size = data.length;
+ int read = 0;
+ while (read < size)
+ {
+ left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
+
+ getCurrentBuffer().getBytes(data, read, left);
+ read += left;
+ pos += read;
+ if(read < size)
+ {
+ bufferPos++;
+ }
+
+ }
+
+ }
+ }
+
+ public float getFloat()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_FLOAT);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
+ return getCurrentBuffer().getFloat();
+ }
+
+ public double getDouble()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_DOUBLE);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
+ return getCurrentBuffer().getDouble();
+ }
+
+ public char getChar()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_CHAR);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_CHAR;
+ return getCurrentBuffer().getChar();
+ }
+
+ public short getUnsignedByte()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+ return getCurrentBuffer().getUnsignedByte();
+ }
+
+ public int getUnsignedShort()
+ {
+ checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
+ pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
+ return getCurrentBuffer().getUnsignedByte();
+ }
+
+ public void clearBody()
+ {
+ bufferPos = 0;
+ pos = 0;
+ MessagingBuffer buff = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+ bodySize = PacketImpl.INITIAL_BUFFER_SIZE;
+ buffersSizes.clear();
+ buffers.clear();
+ buffers.add(buff);
+ }
+
+ public int getBodySize()
+ {
+ return bodySize;
+ }
+
+ public void reset()
+ {
+ bufferPos = 0;
+ pos = 0;
+ for (MessagingBuffer buffer : buffers)
+ {
+ buffer.rewind();
+ }
+ }
+
+ public void flip()
+ {
+ for (MessagingBuffer buffer : buffers)
+ {
+ buffer.flip();
+ bodySize += buffer.limit();
+ }
+ pos = 0;
+ }
+
+ public void rewind()
+ {
+ bufferPos = 0;
+ pos = 0;
+ for (MessagingBuffer buffer : buffers)
+ {
+ buffer.rewind();
+ }
+ }
+
+ public int remaining()
+ {
+ return bodySize - pos;
+ }
+
+ public void putBuffer(MessagingBuffer body)
+ {
+ buffers.add(body);
+ buffersSizes.add(body.limit() - 5);
+ bodySize += body.limit() - 5;
+ }
+
+ public void transferRemainingBuffer(Encoder buffer, int len)
+ {
+ for(int i = bufferPos + 1; i < buffers.size(); i++)
+ {
+ buffer.putBuffer(buffers.get(i));
+ }
+ }
+
+ private void checkReadSpace(int size)
+ {
+ if(getCurrentBuffer().position() + size > getCurrentBuffersSize() + org.jboss.messaging.util.DataConstants.SIZE_INT)
+ {
+ bufferPos++;
+ }
+ }
+
+ private void checkWriteSpace(int size)
+ {
+ if(getCurrentBuffer().remaining() < size)
+ {
+ setNextFragment();
+ }
+ }
+
+ private void setNextFragment()
+ {
+ MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+ buffer.putInt(-1);
+ buffer.putInt(-1);
+ buffer.putBoolean(false);
+ buffers.add(buffer);
+ bufferPos++;
+ }
+
+ protected MessagingBuffer getCurrentBuffer()
+ {
+ try
+ {
+ return buffers.get(bufferPos);
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ }
+
+ private int getCurrentBuffersSize()
+ {
+ return buffersSizes.get(bufferPos);
+ }
+
+ public List<MessagingBuffer> getBodyType()
+ {
+ int bodyLength = getInt();
+ //if we are part way thru a buffer it must we need to copy into new buffer
+ if(getCurrentBuffer().position() - org.jboss.messaging.util.DataConstants.SIZE_INT != getCurrentBuffersSize())
+ {
+ MessagingBuffer body = getCurrentBuffer().createNewBuffer(bodyLength);
+ byte[] bytes = new byte[bodyLength];
+ getCurrentBuffer().getBytes(bytes);
+ body.putBytes(bytes);
+ List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
+ theBody.add(body);
+ return theBody;
+ }
+ //else we just return the remaining buffers
+ else
+ {
+ List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
+ for(int i = bufferPos; i < buffers.size(); i++)
+ {
+ theBody.add(buffers.get(i));
+ bufferPos++;
+ }
+ bufferPos++;
+ return theBody;
+ }
+ }
+}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-13 12:49:04 UTC (rev 4799)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
@@ -24,18 +24,12 @@
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Encoder;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.util.DataConstants;
-import static org.jboss.messaging.util.DataConstants.*;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.remoting.impl.EncoderImpl;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
-import javax.transaction.xa.Xid;
-import java.nio.ByteBuffer;
import java.nio.charset.Charset;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -44,7 +38,7 @@
*
* @version <tt>$Revision$</tt>
*/
-public class PacketImpl implements Packet
+public class PacketImpl extends EncoderImpl implements Packet
{
// Constants -----------------------------------------------------
@@ -136,10 +130,6 @@
public static final byte PROD_RECEIVETOKENS = 101;
public static final byte RECEIVE_MSG = 110;
-
- private List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
- private List<Integer> buffersSizes = new ArrayList<Integer>();
- private int currentPos = 0;
private int packetid;
// Static --------------------------------------------------------
@@ -272,342 +262,6 @@
r.executorID == this.executorID &&
r.targetID == this.targetID;
}
-
- public void putXidType(Xid xid)
- {
- putInt(xid.getFormatId());
- putInt(xid.getBranchQualifier().length);
- putBytes(xid.getBranchQualifier());
- putInt(xid.getGlobalTransactionId().length);
- putBytes(xid.getGlobalTransactionId());
- }
-
- public void putFloat(float val)
- {
- checkWriteSpace(SIZE_FLOAT);
- getCurrentBuffer().putFloat(val);
- }
-
- public void putInt(int i)
- {
- checkWriteSpace(SIZE_INT);
- getCurrentBuffer().putInt(i);
- }
-
- public void putByte(byte b)
- {
- checkWriteSpace(SIZE_BYTE);
- getCurrentBuffer().putByte(b);
- }
-
- public void putLong(long l)
- {
- checkWriteSpace(SIZE_LONG);
- getCurrentBuffer().putLong(l);
- }
-
- public void putBoolean(boolean b)
- {
- checkWriteSpace(SIZE_BOOLEAN);
- getCurrentBuffer().putBoolean(b);
- }
-
- public void putNullableString(String nullableString)
- {
- if (nullableString == null)
- {
- putByte(DataConstants.NULL);
- }
- else
- {
- putByte(DataConstants.NOT_NULL);
-
- putString(nullableString);
- }
- }
-
- public void putSimpleString(SimpleString string)
- {
- byte[] data = string.getData();
-
- putInt(data.length);
- putBytes(data);
- }
-
- public void putNullableSimpleString(SimpleString string)
- {
- if (string == null)
- {
- putByte(DataConstants.NULL);
- }
- else
- {
- putByte(DataConstants.NOT_NULL);
- putSimpleString(string);
- }
- }
-
- public void putBytes(byte[] bytes, int i, int i1)
- {
- if(getCurrentBuffer().remaining() >= (i1 - i))
- {
- getCurrentBuffer().putBytes(bytes, i, i1);
- }
- else
- {
- int written = 0;
- while(written < i1)
- {
- int left = getCurrentBuffer().remaining();
- int towrite = left + written > i1?i1 - written:left;
- getCurrentBuffer().putBytes(bytes, i, towrite);
- written+=towrite;
- i += left;
- if(written < i1)
- {
- setNextFragment();
- }
- }
- }
- }
-
- public void putBytes(byte[] bytes)
- {
- putBytes(bytes, 0, bytes.length);
- }
-
- public void putShort(short val)
- {
- checkWriteSpace(SIZE_SHORT);
- getCurrentBuffer().putShort(val);
- }
-
- public void putDouble(double val)
- {
- checkWriteSpace(SIZE_DOUBLE);
- getCurrentBuffer().putDouble(val);
- }
-
- public void putChar(char val)
- {
- checkWriteSpace(SIZE_CHAR);
- getCurrentBuffer().putChar(val);
- }
-
- public void putUTF(String str)
- {
- //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
- //(putPrefixedString)
- ByteBuffer bb = utf8.encode(str);
- putInt(bb.limit() - bb.position());
- putBytes(bb.array());
- }
-
- public void putString(String string)
- {
- putInt(string.length());
-
- for (int i = 0; i < string.length(); i++)
- {
- putChar(string.charAt(i));
- }
- }
-
- public String getString()
- {
- int len = getInt();
-
- char[] chars = new char[len];
-
- for (int i = 0; i < len; i++)
- {
- chars[i] = getChar();
- }
-
- return new String(chars);
- }
-
- public Xid getXidType()
- {
- int formatID = getInt();
- byte[] bq = new byte[getInt()];
- getBytes(bq);
- byte[] gtxid = new byte[getInt()];
- getBytes(gtxid);
- Xid xid = new XidImpl(bq, formatID, gtxid);
- return xid;
- }
-
-
- public List<Xid> getXids(int len)
- {
- List<Xid> xids = new ArrayList<Xid>();
-
- // new ArrayList<Xid>(len);
- for (int i = 0; i < len; i++)
- {
- int formatID = getInt();
- byte[] bq = new byte[getInt()];
- getBytes(bq);
- byte[] gtxid = new byte[getInt()];
- getBytes(gtxid);
- Xid xid = new XidImpl(bq, formatID, gtxid);
-
- xids.add(xid);
- }
- return xids;
- }
-
- public void getBytes(byte[] value, int i, int read)
- {
- throw new UnsupportedOperationException();
- }
-
- public String getUTF() throws Exception
- {
- throw new UnsupportedOperationException();
- }
-
- public int getInt()
- {
- checkReadSpace(SIZE_INT);
- return getCurrentBuffer().getInt();
- }
-
- public long getLong()
- {
- checkReadSpace(SIZE_LONG);
- return getCurrentBuffer().getLong();
- }
-
- public short getShort()
- {
- checkReadSpace(SIZE_SHORT);
- return getCurrentBuffer().getShort();
- }
-
- public boolean getBoolean()
- {
- checkReadSpace(SIZE_BOOLEAN);
- return getCurrentBuffer().getBoolean();
- }
-
- public String getNullableString()
- {
- byte check = getByte();
-
- if (check == DataConstants.NULL)
- {
- return null;
- }
- else
- {
- return getString();
- }
- }
-
- public SimpleString getSimpleString()
- {
- int len = getInt();
-
- byte[] data = new byte[len];
- getBytes(data);
-
- return new SimpleString(data);
- }
-
- public SimpleString getNullableSimpleString()
- {
- int b = getByte();
- if (b == DataConstants.NULL)
- {
- return null;
- }
- else
- {
- return getSimpleString();
- }
- }
-
- public byte getByte()
- {
- checkReadSpace(SIZE_BYTE);
- return getCurrentBuffer().getByte();
- }
-
- public void getBytes(byte[] data)
- {
- int left = getCurrentBuffer().remaining();
- if(data.length <= left)
- {
- getCurrentBuffer().getBytes(data);
- }
- else
- {
- int size = data.length;
- int read = 0;
- while (read < size)
- {
- left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
-
- getCurrentBuffer().getBytes(data, read, left);
- read += left;
- if(read < size)
- {
- currentPos++;
- }
-
- }
-
- }
- }
-
-
- public float getFloat()
- {
- checkReadSpace(SIZE_FLOAT);
- return getCurrentBuffer().getFloat();
- }
-
- public double getDouble()
- {
- checkReadSpace(SIZE_DOUBLE);
- return getCurrentBuffer().getDouble();
- }
-
- public char getChar()
- {
- checkReadSpace(SIZE_CHAR);
- return getCurrentBuffer().getChar();
- }
-
- public short getUnsignedByte()
- {
- checkReadSpace(SIZE_SHORT);
- return getCurrentBuffer().getUnsignedByte();
- }
-
- public int getUnsignedShort()
- {
- checkReadSpace(SIZE_INT);
- return getCurrentBuffer().getUnsignedByte();
- }
-
- public void putBuffer(MessagingBuffer body)
- {
- body.putInt(SIZE_INT, packetid);
- buffers.add(body);
- //buffersSizes.add(len);
- }
-
- public void transferRemainingBuffer(Encoder buffer, int len)
- {
- for(int i = currentPos + 1; i < buffers.size(); i++)
- {
- buffer.putBuffer(buffers.get(i));
- }
- }
-
public List<MessagingBuffer> getBuffers()
{
for (int i = 0; i < buffers.size(); i++)
@@ -634,34 +288,6 @@
buffersSizes.add(packetLength);
}
- public List<MessagingBuffer> getBodyType()
- {
- int bodyLength = getInt();
- //if we are part way thru a buffer it must we need to copy into new buffer
- if(getCurrentBuffer().position() - SIZE_INT != getCurrentBuffersSize())
- {
- MessagingBuffer body = getCurrentBuffer().createNewBuffer(bodyLength);
- byte[] bytes = new byte[bodyLength];
- getCurrentBuffer().getBytes(bytes);
- body.putBytes(bytes);
- List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
- theBody.add(body);
- return theBody;
- }
- //else we just return the remaining buffers
- else
- {
- List<MessagingBuffer> theBody = new ArrayList<MessagingBuffer>();
- for(int i = currentPos; i < buffers.size(); i++)
- {
- theBody.add(buffers.get(i));
- currentPos++;
- }
- currentPos++;
- return theBody;
- }
- }
-
public int remaining()
{
return getCurrentBuffer().remaining();
@@ -680,45 +306,6 @@
// Private -------------------------------------------------------
- private void checkReadSpace(int size)
- {
- if(getCurrentBuffer().position() + size > getCurrentBuffersSize() + SIZE_INT )
- {
- currentPos++;
- }
- }
-
- private void checkWriteSpace(int size)
- {
- if(getCurrentBuffer().remaining() < size)
- {
- setNextFragment();
- }
- }
-
- private void setNextFragment()
- {
- buffers.add(buffers.get(0).createNewBuffer(INITIAL_BUFFER_SIZE));
- currentPos++;
- }
-
-
- private MessagingBuffer getCurrentBuffer()
- {
- try
- {
- return buffers.get(currentPos);
- }
- catch (Exception e)
- {
- return null;
- }
- }
-
- private int getCurrentBuffersSize()
- {
- return buffersSizes.get(currentPos);
- }
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
More information about the jboss-cvs-commits
mailing list