[jboss-cvs] JBoss Messaging SVN: r4854 - in branches/Branch_Message_Chunking_new: examples/jms/src/org/jboss/jms/example and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 20 08:45:52 EDT 2008
Author: ataylor
Date: 2008-08-20 08:45:52 -0400 (Wed, 20 Aug 2008)
New Revision: 4854
Modified:
branches/Branch_Message_Chunking_new/build-messaging.xml
branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
branches/Branch_Message_Chunking_new/messaging.iml
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Packet.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
Log:
refactoring
Modified: branches/Branch_Message_Chunking_new/build-messaging.xml
===================================================================
--- branches/Branch_Message_Chunking_new/build-messaging.xml 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/build-messaging.xml 2008-08-20 12:45:52 UTC (rev 4854)
@@ -905,7 +905,7 @@
<target name="runServer" depends="jar">
<java classname="org.jboss.messaging.microcontainer.JBMBootstrapServer" fork="true">
- <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"/>
+ <!--<jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"/>-->
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -49,7 +49,7 @@
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queue);
final BytesMessage message = session.createBytesMessage();
- byte[] bytes = new byte[1024];
+ byte[] bytes = new byte[2048];
for(int i = 0; i < bytes.length; i++)
{
bytes[i] = (byte) i;
Modified: branches/Branch_Message_Chunking_new/messaging.iml
===================================================================
--- branches/Branch_Message_Chunking_new/messaging.iml 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/messaging.iml 2008-08-20 12:45:52 UTC (rev 4854)
@@ -27,7 +27,6 @@
<sourceFolder url="file://$MODULE_DIR$/src/main" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests/jms-tests/src" isTestSource="true" />
- <sourceFolder url="file://$MODULE_DIR$/tests/src" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/build/api" />
<excludeFolder url="file://$MODULE_DIR$/build/classes" />
<excludeFolder url="file://$MODULE_DIR$/build/jars" />
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -100,6 +100,7 @@
final long timestamp, final byte priority, MessagingBuffer body)
{
this();
+ currentBuffer = body;
this.type = type;
this.durable = durable;
this.expiration = expiration;
@@ -127,7 +128,6 @@
this.priority = other.priority;
this.properties = new TypedProperties(other.properties);
this.buffers = other.buffers;
- this.buffersSizes = other.buffersSizes;
}
// Message implementation ----------------------------------------
@@ -191,8 +191,8 @@
buffers.add(new ByteBufferWrapper(ByteBuffer.allocate(PacketImpl.INITIAL_BUFFER_SIZE)));
byte[] bytes = new byte[len];
buffer.getBytes(bytes);
- getCurrentBuffer().putBytes(bytes);
- getCurrentBuffer().flip();
+ currentBuffer.putBytes(bytes);
+ currentBuffer.flip();
bodySize = len;
pos = 0;
}
@@ -215,7 +215,7 @@
for (int i = 0; i < buffers.size(); i++)
{
MessagingBuffer buffer = buffers.get(i);
- buff.putBytes(buffer.array(), PacketImpl.BUFFER_HEADER_SIZE, buffersSizes.get(i));
+ buff.putBytes(buffer.array(), PacketImpl.BUFFER_HEADER_SIZE, buffer.limit() - PacketImpl.BUFFER_HEADER_SIZE);
}
}
else
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.remoting;
+import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.util.SimpleString;
import java.util.List;
@@ -59,7 +60,7 @@
void normalize(Packet other);
- List<MessagingBuffer> encode(MessagingBuffer buffer, int packetId);
+ List<MessagingBuffer> encode(Connection connection, int packetId);
void decode();
@@ -131,5 +132,5 @@
void transferRemainingBuffer(Encoder buffer, int len);
- void addBuffer(MessagingBuffer buffer, int packetLength);
+ void addBuffer(MessagingBuffer buffer);
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -41,7 +41,7 @@
{
private static final Charset utf8 = Charset.forName("UTF-8");
protected List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
- protected List<Integer> buffersSizes = new ArrayList<Integer>();
+ protected MessagingBuffer currentBuffer = null;
private int bufferPos = 0;
protected int pos = 0;
protected int bodySize = 0;
@@ -58,35 +58,35 @@
public void putFloat(float val)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_FLOAT);
- getCurrentBuffer().putFloat(val);
+ currentBuffer.putFloat(val);
pos += org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
}
public void putInt(int i)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
- getCurrentBuffer().putInt(i);
+ currentBuffer.putInt(i);
pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
}
public void putByte(byte b)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_BYTE);
- getCurrentBuffer().putByte(b);
+ currentBuffer.putByte(b);
pos += org.jboss.messaging.util.DataConstants.SIZE_BYTE;
}
public void putLong(long l)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_LONG);
- getCurrentBuffer().putLong(l);
+ currentBuffer.putLong(l);
pos += org.jboss.messaging.util.DataConstants.SIZE_LONG;
}
public void putBoolean(boolean b)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN);
- getCurrentBuffer().putBoolean(b);
+ currentBuffer.putBoolean(b);
pos += org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
}
@@ -127,9 +127,9 @@
public void putBytes(byte[] bytes, int i, int i1)
{
- if(getCurrentBuffer().capacity() - getCurrentBuffer().position() >= (i1 - i))
+ if(currentBuffer.capacity() - currentBuffer.position() >= (i1 - i))
{
- getCurrentBuffer().putBytes(bytes, i, i1);
+ currentBuffer.putBytes(bytes, i, i1);
pos += (i1 - i);
}
else
@@ -137,9 +137,9 @@
int written = 0;
while(written < i1)
{
- int left = getCurrentBuffer().remaining();
+ int left = currentBuffer.remaining();
int towrite = left + written > i1?i1 - written:left;
- getCurrentBuffer().putBytes(bytes, i, towrite);
+ currentBuffer.putBytes(bytes, i, towrite);
written+=towrite;
i += left;
pos += written;
@@ -160,21 +160,21 @@
public void putShort(short val)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
- getCurrentBuffer().putShort(val);
+ currentBuffer.putShort(val);
pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
}
public void putDouble(double val)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_DOUBLE);
- getCurrentBuffer().putDouble(val);
+ currentBuffer.putDouble(val);
pos += org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
}
public void putChar(char val)
{
checkWriteSpace(org.jboss.messaging.util.DataConstants.SIZE_CHAR);
- getCurrentBuffer().putChar(val);
+ currentBuffer.putChar(val);
pos += org.jboss.messaging.util.DataConstants.SIZE_CHAR;
}
@@ -245,14 +245,14 @@
int read = 0;
while (read < size)
{
- int left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
+ int left = currentBuffer.remaining() <= (size - read)? currentBuffer.remaining():size - read;
- getCurrentBuffer().getBytes(data, read, left);
+ currentBuffer.getBytes(data, read, left);
read += left;
pos += read;
if(read < size)
{
- bufferPos++;
+ setNextBuffer();
}
}
@@ -267,28 +267,28 @@
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
- return getCurrentBuffer().getInt();
+ return currentBuffer.getInt();
}
public long getLong()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_LONG);
pos += org.jboss.messaging.util.DataConstants.SIZE_LONG;
- return getCurrentBuffer().getLong();
+ return currentBuffer.getLong();
}
public short getShort()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
- return getCurrentBuffer().getShort();
+ return currentBuffer.getShort();
}
public boolean getBoolean()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN);
pos += org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
- return getCurrentBuffer().getBoolean();
+ return currentBuffer.getBoolean();
}
public String getNullableString()
@@ -331,15 +331,15 @@
public byte getByte()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_BYTE);
- return getCurrentBuffer().getByte();
+ return currentBuffer.getByte();
}
public void getBytes(byte[] data)
{
- int left = getCurrentBuffer().remaining();
+ int left = currentBuffer.remaining();
if(data.length <= left)
{
- getCurrentBuffer().getBytes(data);
+ currentBuffer.getBytes(data);
pos += data.length;
}
else
@@ -348,14 +348,14 @@
int read = 0;
while (read < size)
{
- left = getCurrentBuffer().remaining() <= (size - read)? getCurrentBuffer().remaining():size - read;
+ left = currentBuffer.remaining() <= (size - read)? currentBuffer.remaining():size - read;
- getCurrentBuffer().getBytes(data, read, left);
+ currentBuffer.getBytes(data, read, left);
read += left;
pos += read;
if(read < size)
{
- bufferPos++;
+ setNextBuffer();
}
}
@@ -367,35 +367,35 @@
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_FLOAT);
pos += org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
- return getCurrentBuffer().getFloat();
+ return currentBuffer.getFloat();
}
public double getDouble()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_DOUBLE);
pos += org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
- return getCurrentBuffer().getDouble();
+ return currentBuffer.getDouble();
}
public char getChar()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_CHAR);
pos += org.jboss.messaging.util.DataConstants.SIZE_CHAR;
- return getCurrentBuffer().getChar();
+ return currentBuffer.getChar();
}
public short getUnsignedByte()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_SHORT);
pos += org.jboss.messaging.util.DataConstants.SIZE_SHORT;
- return getCurrentBuffer().getUnsignedByte();
+ return currentBuffer.getUnsignedByte();
}
public int getUnsignedShort()
{
checkReadSpace(org.jboss.messaging.util.DataConstants.SIZE_INT);
pos += org.jboss.messaging.util.DataConstants.SIZE_INT;
- return getCurrentBuffer().getUnsignedByte();
+ return currentBuffer.getUnsignedByte();
}
public void clearBody()
@@ -404,7 +404,6 @@
pos = 0;
MessagingBuffer buff = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
bodySize = PacketImpl.INITIAL_BUFFER_SIZE;
- buffersSizes.clear();
buffers.clear();
buffers.add(buff);
}
@@ -452,7 +451,10 @@
public void putBuffer(MessagingBuffer body)
{
buffers.add(body);
- buffersSizes.add(body.limit() - PacketImpl.BUFFER_HEADER_SIZE);
+ if(currentBuffer == null)
+ {
+ currentBuffer = body;
+ }
bodySize += body.limit() - PacketImpl.BUFFER_HEADER_SIZE;
}
@@ -466,15 +468,15 @@
private void checkReadSpace(int size)
{
- if(getCurrentBuffer().position() + size > getCurrentBuffersSize() + org.jboss.messaging.util.DataConstants.SIZE_INT)
+ if(currentBuffer.position() + size > currentBuffer.limit())
{
- bufferPos++;
+ setNextBuffer();
}
}
private void checkWriteSpace(int size)
{
- if(getCurrentBuffer().capacity() < size)
+ if(currentBuffer.capacity() < size)
{
setNextFragment();
}
@@ -487,23 +489,12 @@
buffer.putInt(-1);
buffer.putBoolean(false);
buffers.add(buffer);
- bufferPos++;
+ setNextBuffer();
}
- protected MessagingBuffer getCurrentBuffer()
+ private void setNextBuffer()
{
- try
- {
- return buffers.get(bufferPos);
- }
- catch (Exception e)
- {
- return null;
- }
+ bufferPos++;
+ currentBuffer = buffers.get(bufferPos);
}
-
- private int getCurrentBuffersSize()
- {
- return buffersSizes.get(bufferPos);
- }
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -302,9 +302,7 @@
throw new IllegalStateException("Cannot write packet to connection, it is destroyed");
}
- MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
-
- List<MessagingBuffer> buffers = packet.encode(buffer, packetIdCreator.getAndIncrement());
+ List<MessagingBuffer> buffers = packet.encode(transportConnection, packetIdCreator.getAndIncrement());
for (MessagingBuffer buff : buffers)
{
transportConnection.write(buff);
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -102,15 +102,14 @@
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer) throws Exception
{
- int packetLength = buffer.getInt();
+ //int packetLength = buffer.getInt();
int packetId = buffer.getInt();
boolean lastPacket = buffer.getBoolean();
- final Packet packet = getPacket(connectionID, packetId, buffer);
- packet.addBuffer(buffer, packetLength);
+ final Packet packet = getPacket(connectionID, packetId, buffer, lastPacket);
+ packet.addBuffer(buffer);
if (lastPacket)
{
- packetsToDeliver.get(connectionID).remove(packetId);
packet.decode();
if (executorFactory != null)
{
@@ -152,17 +151,30 @@
}
}
- private Packet getPacket(Object connectionID, int packetId, MessagingBuffer buffer)
+ private Packet getPacket(Object connectionID, int packetId, MessagingBuffer buffer, boolean remove)
{
- if(packetsToDeliver.get(connectionID) == null)
+ Map<Integer, Packet> packetMap = packetsToDeliver.get(connectionID);
+ if(packetMap == null)
{
- packetsToDeliver.put(connectionID, new HashMap<Integer, Packet>());
+ packetMap = new HashMap<Integer, Packet>();
+ packetsToDeliver.put(connectionID, packetMap);
}
- Packet packet = packetsToDeliver.get(connectionID).get(packetId);
+ Packet packet = null;
+ if(!remove)
+ {
+ packet = packetMap.get(packetId);
+ }
+ else
+ {
+ packet = packetMap.remove(packetId);
+ }
if (packet == null)
{
packet = createPacket(connectionID, buffer);
- packetsToDeliver.get(connectionID).put(packetId, packet);
+ if(!remove)
+ {
+ packetMap.put(packetId, packet);
+ }
}
return packet;
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -95,6 +95,7 @@
IoBuffer sliced = in.slice();
in.position(start + length + SIZE_INT);
+ sliced.position(SIZE_INT);
sliced.limit(length + SIZE_INT);
out.write(sliced);
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-20 09:51:33 UTC (rev 4853)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-20 12:45:52 UTC (rev 4854)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.EncoderImpl;
+import org.jboss.messaging.core.remoting.spi.Connection;
import static org.jboss.messaging.util.DataConstants.SIZE_INT;
import java.nio.charset.Charset;
@@ -194,16 +195,18 @@
setTargetID(other.getResponseTargetID());
}
- public List<MessagingBuffer> encode(MessagingBuffer buffer,int packetId)
+ public List<MessagingBuffer> encode(Connection connection,int packetId)
{
- //The standard header fields
- buffers.add(buffer);
+ currentBuffer = connection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE + PacketImpl.BUFFER_HEADER_SIZE);//The standard header fields
+
+ buffers.add(currentBuffer);
+
//length filled at end
- getCurrentBuffer().putInt(-1);
+ currentBuffer.putInt(-1);
//the packetid filled at end
- getCurrentBuffer().putInt(-1);
+ currentBuffer.putInt(-1);
//are we the last packet, filled at end.
- getCurrentBuffer().putBoolean(false);
+ currentBuffer.putBoolean(false);
putByte(type);
putInt(commandID);
putLong(responseTargetID);
@@ -231,6 +234,7 @@
public void decode()
{
+ currentBuffer = buffers.get(0);
commandID = getInt();
responseTargetID = getLong();
targetID = getLong();
@@ -269,15 +273,14 @@
r.targetID == this.targetID;
}
- public void addBuffer(MessagingBuffer buffer, int packetLength)
+ public void addBuffer(MessagingBuffer buffer)
{
buffers.add(buffer);
- buffersSizes.add(packetLength);
}
public int remaining()
{
- return getCurrentBuffer().remaining();
+ return currentBuffer.remaining();
}
// Package protected ---------------------------------------------
More information about the jboss-cvs-commits
mailing list