[jboss-cvs] JBoss Messaging SVN: r4799 - in branches/Branch_Message_Chunking_new: examples/jms/src/org/jboss/jms/example and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 13 08:49:05 EDT 2008
Author: ataylor
Date: 2008-08-13 08:49:04 -0400 (Wed, 13 Aug 2008)
New Revision: 4799
Modified:
branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
branches/Branch_Message_Chunking_new/messaging.ipr
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
Log:
tidied up
Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -49,7 +49,7 @@
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queue);
final BytesMessage message = session.createBytesMessage();
- byte[] bytes = new byte[500];
+ byte[] bytes = new byte[900];
for(int i = 0; i < bytes.length; i++)
{
bytes[i] = (byte) i;
@@ -58,6 +58,7 @@
message.writeBytes(bytes);
log.info("sending message to queue");
producer.send(message);
+ producer.send(message);
/* new Thread(new Runnable()
{
public void run()
@@ -106,6 +107,21 @@
throw new RuntimeException(i+"");
}
}
+ message2 = (BytesMessage) messageConsumer.receive(50000000);
+ log.info("message received from queue");
+ message2.readBytes(newbytes);
+ //log.info("message = " + new String(newbytes));
+ for (int i = 0; i < newbytes.length; i++)
+ {
+ byte newbyte = newbytes[i];
+ System.out.print(newbyte);
+ System.out.print(" = ");
+ System.out.println(bytes[i]);
+ if(bytes[i] != newbyte)
+ {
+ throw new RuntimeException(i+"");
+ }
+ }
/*message2 = (BytesMessage) messageConsumer.receive(50000000);
log.info("message received from queue");
message2.readBytes(newbytes);
Modified: branches/Branch_Message_Chunking_new/messaging.ipr
===================================================================
--- branches/Branch_Message_Chunking_new/messaging.ipr 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/messaging.ipr 2008-08-13 12:49:04 UTC (rev 4799)
@@ -468,6 +468,7 @@
<JAVADOC />
<SOURCES>
<root url="jar://$PROJECT_DIR$/thirdparty/jboss/jnpserver/lib/jnpserver-sources.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1-sources.jar!/" />
</SOURCES>
<jarDirectory url="file://$PROJECT_DIR$/examples/jms/config" recursive="false" />
<jarDirectory url="file://$PROJECT_DIR$/src/etc" recursive="false" />
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -40,4 +40,6 @@
long getDeliveryID();
void setDeliveryID(long deliveryID);
+
+ void prepareBuffers();
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -60,7 +60,15 @@
{
super(type, durable, expiration, timestamp, priority, body);
}
-
+
+ protected void resetBodyForUse()
+ {
+ for (MessagingBuffer messagingBuffer : buffers)
+ {
+ messagingBuffer.position(9);
+ }
+ }
+
public ClientMessageImpl(final byte type, final boolean durable, MessagingBuffer body)
{
super(type, durable, 0, System.currentTimeMillis(), (byte)4, body);
@@ -96,4 +104,14 @@
return this.deliveryID;
}
+
+ public void prepareBuffers()
+ {
+ bodySize = 0;
+ for (MessagingBuffer buffer : buffers)
+ {
+ buffersSizes.add(buffer.limit() - 9);
+ bodySize += (buffer.limit() - 9);
+ }
+ }
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/Message.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -68,6 +68,10 @@
void setPriority(byte priority);
int getEncodeSize();
+
+ void encodeForTransport(Encoder buffer);
+
+ void decodeForTransport(Encoder buffer);
// Properties
// ------------------------------------------------------------------
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -22,11 +22,13 @@
package org.jboss.messaging.core.message.impl;
+import org.apache.mina.core.buffer.IoBuffer;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.remoting.Encoder;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.util.DataConstants;
import static org.jboss.messaging.util.DataConstants.*;
@@ -68,7 +70,7 @@
private SimpleString destination;
- private byte type;
+ protected byte type;
protected boolean durable;
@@ -81,11 +83,12 @@
private byte priority;
- private List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
- private List<Integer> buffersSizes = new ArrayList<Integer>();
+ protected List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
+ protected List<Integer> buffersSizes = new ArrayList<Integer>();
private int bufferPos = 0;
private int pos = 0;
- private int bodySize = 0;
+ protected int bodySize = 0;
+ private boolean isBodySplit = true;
// Constructors --------------------------------------------------
@@ -138,51 +141,128 @@
}
// Message implementation ----------------------------------------
- public void encode(Encoder buff)
+
+ public void encodeForTransport(Encoder buff)
{
- buff.putSimpleString(destination);
- buff.putByte(type);
+ buff.putSimpleString(getDestination());
+ buff.putByte(getType());
buff.putBoolean(durable);
- buff.putLong(expiration);
- buff.putLong(timestamp);
- buff.putByte(priority);
- properties.encode(buff);
- buff.putInt(getBodySize() - (buffers.size() * 9));
- for (MessagingBuffer buffer : buffers)
+ buff.putLong(getExpiration());
+ buff.putLong(getTimestamp());
+ buff.putByte(getPriority());
+ getProperties().encode(buff);
+ if(buffers.size() == 1 && buffers.get(0).limit() <= buff.remaining())
{
- buff.putBuffer(buffer);
+ int offSet = isBodySplit?9:0;
+ buff.putInt(getBodySize());
+ //are we splitting the buffers.
+ buff.putBoolean(false);
+ buff.putBytes(buffers.get(0).array(), offSet, buffers.get(0).limit());
}
+ else
+ {
+ buff.putInt(getBodySize());
+ buff.putBoolean(true);
+ for (int i = 0; i < buffers.size(); i++)
+ {
+ MessagingBuffer buffer = buffers.get(i);
+ IoBuffer copied = IoBuffer.allocate(buffer.limit());
+ copied.put(buffer.array(), 0, buffer.limit());
+ copied.setAutoExpand(true);
+ buff.putBuffer(new IoBufferWrapper(copied));
+ }
+ }
}
- public int getEncodeSize()
+ public void decodeForTransport(Encoder buffer)
{
- return /* Destination */ SimpleString.sizeofString(destination) +
- /* Type */ SIZE_BYTE +
- /* Durable */ SIZE_BOOLEAN +
- /* Expiration */ SIZE_LONG +
- /* Timestamp */ SIZE_LONG +
- /* Priority */ SIZE_BYTE +
- /* PropertySize and Properties */ properties.getEncodeSize() +
- /* BodySize and Body */ SIZE_INT + getBodySize();
+ setDestination(buffer.getSimpleString());
+ type = buffer.getByte();
+ durable = buffer.getBoolean();
+ setExpiration(buffer.getLong());
+ setTimestamp(buffer.getLong());
+ setPriority(buffer.getByte());
+
+ getProperties().decode(buffer);
+ int len = buffer.getInt();
+ isBodySplit = buffer.getBoolean();
+ if(isBodySplit)
+ {
+ buffer.transferRemainingBuffer(this, len);
+ //we now need to reposition the buffers for use
+ resetBodyForUse();
+ }
+ else
+ {
+ buffers.add(new ByteBufferWrapper(ByteBuffer.allocate(PacketImpl.INITIAL_BUFFER_SIZE)));
+ byte[] bytes = new byte[len];
+ buffer.getBytes(bytes);
+ getCurrentBuffer().putBytes(bytes);
+ getCurrentBuffer().flip();
+ bodySize = len;
+ pos = 0;
+ }
}
+ abstract protected void resetBodyForUse();
+
+ public void encode(Encoder buff)
+ {
+ buff.putSimpleString(getDestination());
+ buff.putByte(getType());
+ buff.putBoolean(durable);
+ buff.putLong(getExpiration());
+ buff.putLong(getTimestamp());
+ buff.putByte(getPriority());
+ getProperties().encode(buff);
+ if(isBodySplit)
+ {
+ buff.putInt(getBodySize());
+ for (int i = 0; i < buffers.size(); i++)
+ {
+ MessagingBuffer buffer = buffers.get(i);
+ buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
+ }
+ }
+ else
+ {
+ buff.putInt(getBodySize());
+ buff.putBytes(buffers.get(0).array(), 0, buffers.get(0).limit());
+ }
+ }
+
public void decode(final Encoder buffer)
{
- destination = buffer.getSimpleString();
+ setDestination(buffer.getSimpleString());
type = buffer.getByte();
durable = buffer.getBoolean();
- expiration = buffer.getLong();
- timestamp = buffer.getLong();
- priority = buffer.getByte();
+ setExpiration(buffer.getLong());
+ setTimestamp(buffer.getLong());
+ setPriority(buffer.getByte());
- properties.decode(buffer);
+ getProperties().decode(buffer);
int len = buffer.getInt();
buffers.add(new ByteBufferWrapper(ByteBuffer.allocate(PacketImpl.INITIAL_BUFFER_SIZE)));
buffer.transferRemainingBuffer(this, len);
flip();
}
+
+ public int getEncodeSize()
+ {
+ return /* Destination */ SimpleString.sizeofString(destination) +
+ /* Type */ SIZE_BYTE +
+ /* Durable */ SIZE_BOOLEAN +
+ /* Expiration */ SIZE_LONG +
+ /* Timestamp */ SIZE_LONG +
+ /* Priority */ SIZE_BYTE +
+ /* PropertySize and Properties */ properties.getEncodeSize() +
+ /* BodySize and Body */ SIZE_INT + getBodySize();
+ }
+
+
+
public SimpleString getDestination()
{
return destination;
@@ -396,7 +476,7 @@
public void putBytes(byte[] bytes, int i, int i1)
{
- if(getCurrentBuffer().remaining() <= (i1 - i))
+ if(getCurrentBuffer().remaining() > (i1 - i))
{
getCurrentBuffer().putBytes(bytes, i, i1);
pos += (i1 - i);
@@ -654,12 +734,13 @@
public int getBodySize()
{
- int size = 0;
+ /*int size = 0;
for (MessagingBuffer buffer : buffers)
{
size += buffer.limit();
}
- return size;
+ return size;*/
+ return bodySize;
}
public void reset()
@@ -699,33 +780,17 @@
public void putBuffer(MessagingBuffer body)
{
- if(getCurrentBuffer().remaining() > body.limit())
- {
- //copy contents
- putBytes(body.array(), 0, body.limit());
- }
- else
- {
- buffers.add(body);
- }
+ buffers.add(body);
+ buffersSizes.add(body.limit() - 5);
+ bodySize += body.limit() - 5;
}
public void transferRemainingBuffer(Encoder buffer, int len)
{
- if(getCurrentBuffer().remaining() > 0)
+ for(int i = bufferPos + 1; i < buffers.size(); i++)
{
- //copy contents
- byte[] bytes = new byte[len];
- getBytes(bytes);
- buffer.putBytes(bytes);
+ buffer.putBuffer(buffers.get(i));
}
- else
- {
- for(int i = bufferPos + 1; i < buffers.size(); i++)
- {
- buffer.putBuffer(buffers.get(i));
- }
- }
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Encoder.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -69,7 +69,11 @@
boolean getBoolean();
- void putBuffer(MessagingBuffer buffer);
+ void putBuffer(MessagingBuffer buffern);
void transferRemainingBuffer(Encoder buffer, int len);
+
+ void putBytes(byte[] bytes, int i, int i1);
+
+ int remaining();
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -378,7 +378,7 @@
public void putBuffer(MessagingBuffer buffer)
{
- buffer.putBytes(buffer.array(), buffer.position(), buffer.limit());
+ buffer.putBytes(buffer.array());
}
public void transferRemainingBuffer(Encoder buffer, int len)
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -361,7 +361,7 @@
public void putBuffer(MessagingBuffer buffer)
{
- putBytes(buffer.array(), buffer.position(), buffer.limit());
+ putBytes(buffer.array());
}
public void transferRemainingBuffer(Encoder buffer, int len)
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -370,7 +370,7 @@
public void putBuffer(MessagingBuffer buffer)
{
- buffer.putBytes(buffer.array(), buffer.position(), buffer.limit());
+ buffer.putBytes(buffer.array());
}
public void transferRemainingBuffer(Encoder buffer, int len)
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -94,6 +94,7 @@
IoBuffer copied = IoBuffer.allocate(in.remaining());
copied.put(in);
copied.setAutoExpand(true);
+ copied.limit(length + SIZE_INT);
copied.flip();
in.position(start + length + SIZE_INT);
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -140,6 +140,7 @@
private List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
private List<Integer> buffersSizes = new ArrayList<Integer>();
private int currentPos = 0;
+ private int packetid;
// Static --------------------------------------------------------
@@ -204,8 +205,10 @@
public void encode(MessagingBuffer buffer, int packetId)
{
+ this.packetid = packetId;
//The standard header fields
buffers.add(buffer);
+ //buffersSizes.add(-1);
//length filled at end
getCurrentBuffer().putInt(-1);
//are we the last packet, filled at end.
@@ -218,6 +221,16 @@
putLong(executorID);
encodeBody();
+
+ for (int i = 0; i < buffers.size(); i++)
+ {
+ MessagingBuffer messagingBuffer = buffers.get(i);
+ if(messagingBuffer.position() != 0)
+ {
+ messagingBuffer.flip();
+ }
+ buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
+ }
}
public void decode()
@@ -582,33 +595,17 @@
public void putBuffer(MessagingBuffer body)
{
- if(getCurrentBuffer().remaining() > body.limit())
- {
- //copy contents
- putBytes(body.array(), 9, body.limit());
- }
- else
- {
- buffers.add(body);
- }
+ body.putInt(SIZE_INT, packetid);
+ buffers.add(body);
+ //buffersSizes.add(len);
}
public void transferRemainingBuffer(Encoder buffer, int len)
{
- if(getCurrentBuffer().remaining() > 0)
+ for(int i = currentPos + 1; i < buffers.size(); i++)
{
- //copy contents
- byte[] bytes = new byte[len];
- getBytes(bytes);
- buffer.putBytes(bytes);
+ buffer.putBuffer(buffers.get(i));
}
- else
- {
- for(int i = currentPos + 1; i < buffers.size(); i++)
- {
- buffer.putBuffer(buffers.get(i));
- }
- }
}
public List<MessagingBuffer> getBuffers()
@@ -616,15 +613,17 @@
for (int i = 0; i < buffers.size(); i++)
{
MessagingBuffer buffer = buffers.get(i);
- buffer.putInt(0, buffer.position() - SIZE_INT);
+ if(buffer.position() != 0)
+ {
+ buffer.flip();
+ }
+ buffer.putInt(0, buffersSizes.get(i));
+ buffer.putInt(SIZE_INT, packetid);
if(i == buffers.size() - 1)
{
buffer.putBoolean(SIZE_INT * 2, true);
}
- if(buffer.position() != 0)
- {
- buffer.flip();
- }
+
}
return buffers;
}
@@ -663,6 +662,11 @@
}
}
+ public int remaining()
+ {
+ return getCurrentBuffer().remaining();
+ }
+
// Package protected ---------------------------------------------
protected String getParentString()
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -82,7 +82,8 @@
public void encodeBody()
{
- clientMessage.encode(this);
+ clientMessage.prepareBuffers();
+ clientMessage.encodeForTransport(this);
}
public void decodeBody()
@@ -91,7 +92,7 @@
serverMessage = new ServerMessageImpl();
- serverMessage.decode(this);
+ serverMessage.decodeForTransport(this);
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -106,7 +106,7 @@
{
putInt(deliveryCount);
putLong(deliveryID);
- serverMessage.encode(this);
+ serverMessage.encodeForTransport(this);
}
public void decodeBody()
@@ -118,7 +118,7 @@
clientMessage = new ClientMessageImpl(deliveryCount, deliveryID);
- clientMessage.decode(this);
+ clientMessage.decodeForTransport(this);
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-08-12 23:51:48 UTC (rev 4798)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-08-13 12:49:04 UTC (rev 4799)
@@ -83,7 +83,15 @@
{
super(type, durable, expiration, timestamp, priority, buffer);
}
-
+
+ protected void resetBodyForUse()
+ {
+ for (MessagingBuffer messagingBuffer : buffers)
+ {
+ messagingBuffer.position(0);
+ }
+ }
+
public long getMessageID()
{
return messageID;
@@ -134,6 +142,7 @@
public ServerMessage copy()
{
return new ServerMessageImpl(this);
- }
+ }
+
}
More information about the jboss-cvs-commits
mailing list