[jboss-cvs] JBoss Messaging SVN: r4715 - in branches: Branch_Message_Chunking and 19 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 23 07:27:03 EDT 2008
Author: ataylor
Date: 2008-07-23 07:27:02 -0400 (Wed, 23 Jul 2008)
New Revision: 4715
Added:
branches/Branch_Message_Chunking/
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java
Modified:
branches/Branch_Message_Chunking/build-messaging.xml
branches/Branch_Message_Chunking/examples/jms/src/org/jboss/jms/example/QueueExample.java
branches/Branch_Message_Chunking/messaging.ipr
branches/Branch_Message_Chunking/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
branches/Branch_Message_Chunking/src/config/jbm-configuration.xml
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/ConnectionParams.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/Configuration.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/Message.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/Packet.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java
branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/MessagingBuffer.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java
branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java
Log:
message chunking branch
Copied: branches/Branch_Message_Chunking (from rev 4713, trunk)
Modified: branches/Branch_Message_Chunking/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/build-messaging.xml 2008-07-23 11:27:02 UTC (rev 4715)
@@ -894,6 +894,7 @@
<target name="runServer" depends="jar">
<java classname="org.jboss.messaging.microcontainer.JBMBootstrapServer" fork="true">
+ <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"/>
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
Modified: branches/Branch_Message_Chunking/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -21,15 +21,7 @@
*/
package org.jboss.jms.example;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
import javax.naming.InitialContext;
import org.jboss.messaging.core.logging.Logger;
@@ -55,17 +47,79 @@
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue);
- Message message = session.createTextMessage("This is a text message!");
-
+ final MessageProducer producer = session.createProducer(queue);
+ final BytesMessage message = session.createBytesMessage();
+ byte[] bytes = new byte[1024];
+ for(int i = 0; i < 1024; i++)
+ {
+ bytes[i] = (byte) i;
+ }
+ byte[] newbytes = new byte[bytes.length];
+ message.writeBytes(bytes);
log.info("sending message to queue");
- producer.send(message);
-
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+
+ try
+ {
+ producer.send(message);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }).start();
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+
+ try
+ {
+ producer.send(message);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }).start();
+
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
- TextMessage message2 = (TextMessage) messageConsumer.receive(5000);
+ BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
log.info("message received from queue");
- log.info("message = " + message2.getText());
+ message2.readBytes(newbytes);
+ //log.info("message = " + new String(newbytes));
+ for (int i = 0; i < newbytes.length; i++)
+ {
+ byte newbyte = newbytes[i];
+ System.out.print(newbyte);
+ System.out.print(" = ");
+ System.out.println(bytes[i]);
+ if(bytes[i] != newbyte)
+ {
+ throw new RuntimeException(i+"");
+ }
+ }
+ message2 = (BytesMessage) messageConsumer.receive(50000000);
+ log.info("message received from queue");
+ message2.readBytes(newbytes);
+ //log.info("message = " + new String(newbytes));
+ for (int i = 0; i < newbytes.length; i++)
+ {
+ byte newbyte = newbytes[i];
+ System.out.print(newbyte);
+ System.out.print(" = ");
+ System.out.println(bytes[i]);
+ if(bytes[i] != newbyte)
+ {
+ throw new RuntimeException(i+"");
+ }
+ }
}
catch (Exception e)
{
Modified: branches/Branch_Message_Chunking/messaging.ipr
===================================================================
--- trunk/messaging.ipr 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/messaging.ipr 2008-07-23 11:27:02 UTC (rev 4715)
@@ -460,6 +460,7 @@
<root url="jar://$PROJECT_DIR$/tools/lib/jbossbuild.jar!/" />
<root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant-junit.jar!/" />
<root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant.jar!/" />
+ <root url="file://$PROJECT_DIR$/examples/jms/config" />
</CLASSES>
<JAVADOC />
<SOURCES>
Modified: branches/Branch_Message_Chunking/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-07-23 11:27:02 UTC (rev 4715)
@@ -7,6 +7,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+/* Inaccessible static: _00024VRc */
/* Inaccessible static: log */
/* Inaccessible static: totalMaxIO */
/* Inaccessible static: loaded */
Modified: branches/Branch_Message_Chunking/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/config/jbm-configuration.xml 2008-07-23 11:27:02 UTC (rev 4715)
@@ -28,7 +28,7 @@
<remoting-port>5400</remoting-port>
<!-- call timeout in milliseconds -->
- <remoting-call-timeout>5000</remoting-call-timeout>
+ <remoting-call-timeout>50000</remoting-call-timeout>
<!-- true to enable invm communication when the client and the server are in the same JVM. -->
<remoting-enable-invm-optimisation>true</remoting-enable-invm-optimisation>
@@ -54,7 +54,14 @@
<!--How long to wait for a returning pong after sending a ping message to a client/server.-->
<!-- If no pong is received after this time resources are cleaned up-->
<remoting-ping-timeout>5000</remoting-ping-timeout>
-
+
+ <!--The initial size of the buffer created for the transport-->
+ <remoting-initial-packet-fragment-size>2048</remoting-initial-packet-fragment-size>
+
+ <!--The maximum size of packet to send on the transport, except for the first packet which is the r
+ emoting-initial-packet-fragment-size parameter. setting to 0 means that packet chunking is disabled-->
+ <remoting-packet-fragment-size>1024</remoting-packet-fragment-size>
+
<!-- if ssl is enabled, all remoting-ssl-* properties must be set -->
<remoting-enable-ssl>false</remoting-enable-ssl>
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -80,4 +80,12 @@
String getTrustStorePassword();
void setTrustStorePassword(String trustStorePassword);
+
+ int getPacketFragmentSize();
+
+ int getInitialPacketFragmentSize();
+
+ void setInitialPacketFragmentSize(int initialPacketFragmentSize);
+
+ void setPacketFragmentSize(int packetFragmentSize);
}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -32,7 +32,7 @@
{
//Constants ---------------------------------------------------------------------------------------
- public static final int DEFAULT_PING_INTERVAL = 10000; // in ms
+ public static final int DEFAULT_PING_INTERVAL = 0; // in ms
public static final int DEFAULT_PING_TIMEOUT = 5000; // ms
@@ -47,6 +47,10 @@
public static final int DEFAULT_TCP_SEND_BUFFER_SIZE = 32 * 1024; // in bytes
public static final boolean DEFAULT_SSL_ENABLED = false;
+
+ public static final Integer DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE = 2048;
+
+ public static final Integer DEFAULT_PACKET_FRAGMENT_SIZE = 1024;
public static final String SSL_KEYSTORE_PATH_PROPERTY_NAME = "jbm.remoting.ssl.keystore.path";
@@ -87,7 +91,12 @@
private String trustStorePath;
private String trustStorePassword;
-
+
+ private int initialPacketFragmentSize = DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE;
+
+ private int packetFragmentSize = DEFAULT_PACKET_FRAGMENT_SIZE;
+
+
public long getCallTimeout()
{
return callTimeout;
@@ -217,7 +226,27 @@
{
this.trustStorePassword = trustStorePassword;
}
-
+
+ public void setInitialPacketFragmentSize(int initialPacketFragmentSize)
+ {
+ this.initialPacketFragmentSize = initialPacketFragmentSize;
+ }
+
+ public int getInitialPacketFragmentSize()
+ {
+ return initialPacketFragmentSize;
+ }
+
+ public void setPacketFragmentSize(int packetFragmentSize)
+ {
+ this.packetFragmentSize = packetFragmentSize;
+ }
+
+ public int getPacketFragmentSize()
+ {
+ return packetFragmentSize;
+ }
+
public boolean equals(Object other)
{
if (other instanceof ConnectionParams == false)
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/Configuration.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -158,4 +158,8 @@
boolean isCreateJournalDir();
void setCreateJournalDir(boolean create);
+
+ int getInitialPacketFragmentSize();
+
+ int getPacketFragmentSize();
}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -94,9 +94,11 @@
public static final long DEFAULT_AIO_TIMEOUT = 60000; // in ms
public static final long DEFAULT_JOURNAL_TASK_PERIOD = 5000;
+
+ public static final int INITIAL_PACKET_FRAGMENT_SIZE = 2048;
+ public static final int PACKET_FRAGMENT_SIZE = 1024;
-
private static final long serialVersionUID = 4077088945050267843L;
@@ -153,6 +155,10 @@
protected String host = DEFAULT_HOST;
protected int port = DEFAULT_PORT;
+
+ protected int initialPacketFragmentSize = INITIAL_PACKET_FRAGMENT_SIZE;
+
+ protected int packetFragmentSize = PACKET_FRAGMENT_SIZE;
protected final ConnectionParams defaultConnectionParams = new ConnectionParamsImpl();
@@ -451,7 +457,17 @@
this.createJournalDir = create;
}
- public boolean isSecurityEnabled()
+ public int getInitialPacketFragmentSize()
+ {
+ return initialPacketFragmentSize;
+ }
+
+ public int getPacketFragmentSize()
+ {
+ return packetFragmentSize;
+ }
+
+ public boolean isSecurityEnabled()
{
return securityEnabled;
}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -114,6 +114,10 @@
trustStorePassword = getString(e, "remoting-ssl-truststore-password", null);
+ initialPacketFragmentSize = getInteger(e, "remoting-initial-packet-fragment-size", ConnectionParamsImpl.DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE);
+
+ packetFragmentSize = getInteger(e, "remoting-packet-fragment-size", ConnectionParamsImpl.DEFAULT_PACKET_FRAGMENT_SIZE);
+
defaultConnectionParams.setCallTimeout(callTimeout);
defaultConnectionParams.setInVMOptimisationEnabled(inVMOptimisationEnabled);
@@ -129,7 +133,11 @@
defaultConnectionParams.setPingTimeout(pingTimeout);
defaultConnectionParams.setSSLEnabled(sslEnabled);
-
+
+ defaultConnectionParams.setInitialPacketFragmentSize(initialPacketFragmentSize);
+
+ defaultConnectionParams.setPacketFragmentSize(packetFragmentSize);
+
// Persistence config
bindingsDirectory = getString(e, "bindings-directory", bindingsDirectory);
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/Message.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,8 +25,8 @@
import java.util.Set;
import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
/**
* A message is a routable instance that has a payload.
@@ -68,6 +68,8 @@
int getEncodeSize();
+ void setMessageBodyEncoder(MessageBodyEncoder bodyEncoder);
+
void encode(MessagingBuffer buffer);
void decode(MessagingBuffer buffer);
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,13 @@
+package org.jboss.messaging.core.message;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface MessageBodyEncoder
+{
+ void encode(MessagingBuffer buffer);
+
+ void decode(MessagingBuffer buffer, int len);
+}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -28,12 +28,12 @@
import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
import java.util.Set;
+import java.nio.ByteBuffer;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TypedProperties;
+import org.jboss.messaging.core.message.MessageBodyEncoder;
+import org.jboss.messaging.util.*;
/**
* A concrete implementation of a message
@@ -77,6 +77,8 @@
private MessagingBuffer body;
+ private MessageBodyEncoder bodyEncoder = new SimpleBodyEncoder();
+
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -131,8 +133,10 @@
buff.putLong(timestamp);
buff.putByte(priority);
properties.encode(buff);
- buff.putInt(body.limit());
- buff.putBytes(body.array(), 0, body.limit());
+ int pos = buff.position();
+ buff.putInt(-1);
+ bodyEncoder.encode(buff);
+ buff.putInt(pos, buff.position() - pos - DataConstants.SIZE_INT);
}
public int getEncodeSize()
@@ -158,14 +162,16 @@
properties.decode(buffer);
int len = buffer.getInt();
+ bodyEncoder.decode(buffer, len);
- //TODO - this can be optimised
- byte[] bytes = new byte[len];
- buffer.getBytes(bytes);
- body = buffer.createNewBuffer(len);
- body.putBytes(bytes);
+
}
-
+
+ public void setMessageBodyEncoder(MessageBodyEncoder bodyEncoder)
+ {
+ this.bodyEncoder = bodyEncoder;
+ }
+
public SimpleString getDestination()
{
return destination;
@@ -311,14 +317,31 @@
{
this.body = body;
}
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
-
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
- // Inner classes -------------------------------------------------
+ // Inner classes -------------------------------------------------
+
+ class SimpleBodyEncoder implements MessageBodyEncoder
+ {
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putBytes(body.array(), 0, body.limit());
+ }
+
+ public void decode(MessagingBuffer buffer, int len)
+ {
+ body = new ByteBufferWrapper(ByteBuffer.allocate(len));
+ byte[] bytes = new byte[len];
+ buffer.getBytes(bytes);
+ body.putBytes(bytes);
+ body.flip();
+ }
+ }
}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,11 @@
+package org.jboss.messaging.core.remoting;
+
+/**
+ * called by the PacketAssembler after a packet has been decoded
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface DecoderHandler
+{
+ void handle(Packet packet);
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,13 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * called by the PacketAssembler once a message has been encoded.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface EncoderHandler
+{
+ void handle(MessagingBuffer buffer);
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,15 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface MessageCache
+{
+ void cache(MessagingBuffer buffer, int length, long sessionId, int packetId);
+
+ MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId);
+
+ void clear(long id);
+}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -24,6 +24,9 @@
import org.jboss.messaging.util.MessagingBuffer;
+import java.util.Set;
+import java.util.List;
+
/**
* Used to encode/decode messages.
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -24,6 +24,9 @@
import org.jboss.messaging.util.MessagingBuffer;
+import java.util.Set;
+import java.util.List;
+
/**
*
* A Packet
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,14 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.apache.mina.common.IoBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketAssembler
+{
+ boolean assemble(MessagingBuffer buffer, DecoderHandler decoderHandler, long id) throws Exception;
+
+ void disAssemble(MessagingBuffer buffer, EncoderHandler outputHandler, Object message) throws Exception;
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,37 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragment
+{
+ int getPacketId();
+
+ int getCorrelationId();
+
+ MessagingBuffer getMessagingBuffer();
+
+ int getLength();
+
+ boolean isLastPacket();
+
+ int getDataStartPosition();
+
+ void setDataStartPosition(int dataStartPosition);
+
+ int getEndPosition();
+
+ void reset();
+
+ void setPacketId(int packetId);
+
+ void setCorrelationId(int correlationId);
+
+ void setLastPacket(boolean lastPacket);
+
+ int getHeaderSize();
+
+ void setLength(int i);
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.ByteBufferWrapper;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+/**
+ * An in memory cache for message buffer contents.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class InMemoryMessageCache implements MessageCache
+{
+ Map<Long, Map<Integer, List<MessagingBuffer>>> cache = new HashMap<Long, Map<Integer, List<MessagingBuffer>>>();
+ public void cache(MessagingBuffer buffer, int length, long sessionId, int packetId)
+ {
+ byte[] bytes = new byte[length];
+ buffer.getBytes(bytes);
+ getByteList(sessionId, packetId).add(new ByteBufferWrapper(ByteBuffer.wrap(bytes)));
+ }
+
+ public MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId)
+ {
+ List<MessagingBuffer> buffers = getByteList(sessionId, packetId);
+ MessagingBuffer buffer = buffers.remove(0);
+ if(buffers.isEmpty())
+ {
+ Map<Integer, List<MessagingBuffer>> sessionMap = cache.get(sessionId);
+ sessionMap.remove(packetId);
+ if(sessionMap.isEmpty())
+ {
+ cache.remove(sessionId);
+ }
+ }
+ return buffer;
+ }
+
+ public void clear(long id)
+ {
+ cache.remove(id);
+ }
+
+
+ protected List<MessagingBuffer> getByteList(long sessionId, int packetId)
+ {
+ if(cache.get(sessionId) == null)
+ {
+ cache.put(sessionId, new HashMap<Integer, List<MessagingBuffer>>());
+ }
+ if(cache.get(sessionId).get(packetId) == null)
+ {
+ cache.get(sessionId).put(packetId, new ArrayList<MessagingBuffer>());
+ }
+ return cache.get(sessionId).get(packetId);
+ }
+
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MessagingBufferCache extends InMemoryMessageCache
+{
+ public void cache(MessagingBuffer buffer, int length, long sessionId, int packetId)
+ {
+ MessagingBuffer buff = buffer.slice();
+ buffer.position(buffer.position() + length);
+ getByteList(sessionId, packetId).add(buff);
+ }
+}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,11 +25,15 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.MessagingCodec;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketFragment;
import org.jboss.messaging.core.remoting.impl.wireformat.*;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
import static org.jboss.messaging.util.DataConstants.SIZE_INT;
import org.jboss.messaging.util.MessagingBuffer;
+import java.util.Set;
+import java.util.List;
+
/**
* A MessagingCodec
*
@@ -51,22 +55,7 @@
public Packet decode(final MessagingBuffer in) throws Exception
{
- int start = in.position();
- if (in.remaining() <= SIZE_INT)
- {
- return null;
- }
-
- int length = in.getInt();
-
- if (in.remaining() < length)
- {
- in.position(start);
-
- return null;
- }
-
byte packetType = in.getByte();
Packet packet;
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,169 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketFragmentBuffer;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketAssemblerImpl implements PacketAssembler
+{
+ private static final Logger log = Logger.getLogger(PacketAssemblerImpl.class);
+ Map<Long, Map<Integer, List<PacketFragment>>> allFragments = new HashMap<Long, Map<Integer, List<PacketFragment>>>();
+ private AtomicInteger packetId = new AtomicInteger(0);
+ private MessagingCodec messagingCodec = new MessagingCodecImpl();
+ private MessageCache messageCache = new InMemoryMessageCache();
+ private int packetFragmentSize;
+ private boolean useMessageChunking = false;
+
+
+ public PacketAssemblerImpl(int packetFragmentSize, MessageCache messageCache)
+ {
+ this.packetFragmentSize = packetFragmentSize;
+ useMessageChunking = this.packetFragmentSize > 0;
+ this.messageCache = messageCache;
+ }
+
+ public boolean assemble(MessagingBuffer buffer, DecoderHandler decoderHandler, long id) throws Exception
+ {
+ int start = buffer.position();
+
+ if (buffer.remaining() <= SIZE_INT)
+ {
+ return false;
+ }
+
+ int length = buffer.getInt();
+
+ if (buffer.remaining() < length)
+ {
+ buffer.position(start);
+
+ return false;
+ }
+ if (useMessageChunking)
+ {
+ assembleMulti(buffer, decoderHandler, id, length);
+ }
+ else
+ {
+ assembleSingle(buffer, decoderHandler, id, length);
+ }
+
+ return true;
+ }
+
+
+ public void disAssemble(MessagingBuffer buffer, EncoderHandler outputHandler, Object message) throws Exception
+ {
+ if (useMessageChunking)
+ {
+ disasembleMulti(buffer, outputHandler, message);
+ }
+ else
+ {
+ disasembleSingle(buffer, outputHandler, message);
+ }
+ }
+
+ private List<PacketFragment> getFragments(long id, int packetId)
+ {
+ if(allFragments.get(id) == null)
+ {
+ allFragments.put(id, new HashMap<Integer, List<PacketFragment>>());
+ }
+ if(allFragments.get(id).get(packetId) == null)
+ {
+ allFragments.get(id).put(packetId, new ArrayList<PacketFragment>());
+ }
+ return allFragments.get(id).get(packetId);
+ }
+
+
+ private void disasembleMulti(MessagingBuffer buffer, EncoderHandler outputHandler, Object message)
+ throws Exception
+ {
+ PacketFragmentBuffer buff = new PacketFragmentBufferImpl(packetId.getAndIncrement(), buffer, packetFragmentSize);
+ messagingCodec.encode(buff, message);
+ buff.prepare();
+ for (PacketFragment packetFragment : buff.getPacketFragments())
+ {
+ outputHandler.handle(packetFragment.getMessagingBuffer());
+ }
+ }
+
+ private void disasembleSingle(MessagingBuffer buffer, EncoderHandler outputHandler, Object message)
+ throws Exception
+ {
+ buffer.putInt(-1);
+ messagingCodec.encode(buffer, message);
+ //The length doesn't include the actual length byte
+ int len = buffer.position() - DataConstants.SIZE_INT;
+ buffer.putInt(0, len);
+ buffer.flip();
+ outputHandler.handle(buffer);
+ }
+
+ private void assembleMulti(MessagingBuffer buffer, DecoderHandler decoderHandler, long id, int length)
+ throws Exception
+ {
+ boolean lastPacket = buffer.getBoolean();
+ int packetId = buffer.getInt();
+ int correlationId = buffer.getInt();
+
+ if(lastPacket)
+ {
+ PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, buffer);
+ getFragments(id, fragment.getPacketId()).add(fragment);
+ List<PacketFragment> fragments = allFragments.get(id).remove(fragment.getPacketId());
+ PacketFragmentBuffer buff = new PacketFragmentBufferImpl(fragments, packetFragmentSize);
+ Packet packet = messagingCodec.decode(buff);
+ decoderHandler.handle(packet);
+ }
+ else
+ {
+ //if we are part of a multi part packet then we need to be cached for later use
+ PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, messageCache,id);
+ getFragments(id, fragment.getPacketId()).add(fragment);
+ messageCache.cache(buffer, length - 9, id, packetId);
+ }
+ }
+
+ private void assembleSingle(MessagingBuffer buffer, DecoderHandler decoderHandler, long id, int length)
+ throws Exception
+ {
+ Packet packet = messagingCodec.decode(buffer);
+ decoderHandler.handle(packet);
+ }
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,558 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.MessagingBuffer;
+import static org.jboss.messaging.util.DataConstants.NULL;
+import static org.jboss.messaging.util.DataConstants.NOT_NULL;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketFragmentBuffer;
+import org.jboss.messaging.core.remoting.PacketFragment;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+/**
+ * A MessagingBuffer that can be used to break up the buffers into smaller pieces.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentBufferImpl implements PacketFragmentBuffer
+{
+
+ private static final Charset utf8 = Charset.forName("UTF-8");
+
+ private List<PacketFragment> packetFragments;
+ private int currentPos = 0;
+ private int size = 0;
+ private int correlationId = 0;
+ private int packetFragmentSize;
+
+ public PacketFragmentBufferImpl(int packetId, MessagingBuffer buffer, int packetFragmentSize)
+ {
+ packetFragments = new ArrayList<PacketFragment>();
+ packetFragments.add(new PacketFragmentImpl(packetId, false, correlationId++, buffer));
+ putInt(-1);
+ putBoolean(false);
+ putInt(getCurrentFragment().getPacketId());
+ putInt(getCurrentFragment().getCorrelationId());
+ getCurrentFragment().setDataStartPosition(getCurrentBuffer().position());
+ this.packetFragmentSize = packetFragmentSize;
+ }
+
+ public PacketFragmentBufferImpl(List<PacketFragment> packetFragments, int packetFragmentSize)
+ {
+ if(packetFragments == null || packetFragments.size() == 0)
+ {
+ throw new IllegalArgumentException("packet fragments must not be null or empty");
+ }
+ this.packetFragments = packetFragments;
+ currentPos = 0;
+ this.packetFragmentSize = packetFragmentSize;
+ getCurrentFragment().reset();
+ }
+
+ public void putFloat(float val)
+ {
+ checkBufferSpace(DataConstants.SIZE_FLOAT);
+ getCurrentBuffer().putFloat(val);
+ }
+
+ public void putInt(int i)
+ {
+ checkBufferSpace(DataConstants.SIZE_INT);
+ getCurrentBuffer().putInt(i);
+ }
+
+ public void putByte(byte b)
+ {
+ checkBufferSpace(DataConstants.SIZE_BYTE);
+ getCurrentBuffer().putByte(b);
+ }
+
+ public void putLong(long l)
+ {
+ checkBufferSpace(DataConstants.SIZE_LONG);
+ getCurrentBuffer().putLong(l);
+ }
+
+ public void putBoolean(boolean b)
+ {
+ checkBufferSpace(DataConstants.SIZE_BOOLEAN);
+ getCurrentBuffer().putBoolean(b);
+ }
+
+ public void putNullableString(String nullableString)
+ {
+ if (nullableString == null)
+ {
+ putByte(NULL);
+ }
+ else
+ {
+ putByte(NOT_NULL);
+
+ putString(nullableString);
+ }
+ }
+
+ public void putSimpleString(SimpleString string)
+ {
+ byte[] data = string.getData();
+
+ putInt(data.length);
+ putBytes(data);
+ }
+
+ public void putNullableSimpleString(SimpleString string)
+ {
+ if (string == null)
+ {
+ putByte(NULL);
+ }
+ else
+ {
+ putByte(NOT_NULL);
+ putSimpleString(string);
+ }
+ }
+
+ public void putBytes(byte[] bytes, int i, int i1)
+ {
+ if(hasWriteSpace(i1 - i))
+ {
+ getCurrentBuffer().putBytes(bytes, i, i1);
+ }
+ else
+ {
+ int size = bytes.length;
+ int written = 0;
+ while(written < i1)
+ {
+ int left = getCurrentBuffer().limit() - getCurrentBuffer().position();
+ int towrite = left + written > i1?i1 - written:left;
+ getCurrentBuffer().putBytes(bytes, i, towrite);
+ written+=towrite;
+ i += left;
+ if(written < i1)
+ {
+ setNextFragment();
+ }
+ }
+
+ }
+ }
+
+ public void putBytes(byte[] bytes)
+ {
+ putBytes(bytes, 0, bytes.length);
+ }
+
+ public void putShort(short val)
+ {
+ checkBufferSpace(DataConstants.SIZE_SHORT);
+ getCurrentBuffer().putShort(val);
+ }
+
+ public void putDouble(double val)
+ {
+ checkBufferSpace(DataConstants.SIZE_DOUBLE);
+ getCurrentBuffer().putDouble(val);
+ }
+
+ public void putChar(char val)
+ {
+ checkBufferSpace(DataConstants.SIZE_CHAR);
+ getCurrentBuffer().putChar(val);
+ }
+
+ public void putUTF(String str)
+ {
+ //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
+ //(putPrefixedString)
+ ByteBuffer bb = utf8.encode(str);
+ putInt(bb.limit() - bb.position());
+ putBytes(bb.array());
+ }
+
+ public void putBoolean(int pos, boolean val)
+ {
+ int bufferPos = getBufferPosition(pos);
+ int actualPos = pos - getPreviousBuffersSize(bufferPos);
+ packetFragments.get(bufferPos).getMessagingBuffer().putBoolean(actualPos, val);
+ }
+
+ public void putInt(int pos, int val)
+ {
+ int bufferPos = getBufferPosition(pos);
+ int actualPos = pos - getPreviousBuffersSize(bufferPos) + getCurrentFragment().getHeaderSize() + DataConstants.SIZE_INT;
+ packetFragments.get(bufferPos).getMessagingBuffer().putInt(actualPos, val);
+ }
+
+
+ public void putString(String string)
+ {
+ putInt(string.length());
+
+ for (int i = 0; i < string.length(); i++)
+ {
+ putChar(string.charAt(i));
+ }
+ }
+
+ public byte[] array()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int capacity()
+ {
+ return 0;
+ }
+
+ public void limit(int limit)
+ {
+
+ }
+
+ public void position(int position)
+ {
+ int bufferPos = getBufferPosition(position);
+ int actualPos = position - getPreviousBuffersSize(bufferPos);
+ currentPos = bufferPos;
+ getCurrentBuffer().position(actualPos);
+ }
+
+ public int position()
+ {
+ return getPreviousBuffersSize() + (getCurrentBuffer().position() - getCurrentFragment().getHeaderSize() - DataConstants.SIZE_INT);
+ }
+
+ public MessagingBuffer slice()
+ {
+ return null;
+ }
+
+ public MessagingBuffer createNewBuffer(int len)
+ {
+ return new PacketFragmentBufferImpl(getCurrentFragment().getPacketId(),getCurrentBuffer().createNewBuffer(len), packetFragmentSize);
+ }
+
+
+ public String getString()
+ {
+ int len = getInt();
+
+ char[] chars = new char[len];
+
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = getChar();
+ }
+
+ return new String(chars);
+ }
+
+ public void getBytes(byte[] value, int i, int read)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getUTF() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void flip()
+ {
+ for (int i = 0; i < packetFragments.size(); i++)
+ {
+ packetFragments.get(i).getMessagingBuffer().flip();
+ }
+ }
+
+ public void rewind()
+ {
+ for (int i = 0; i < packetFragments.size(); i++)
+ {
+ packetFragments.get(i).getMessagingBuffer().rewind();
+ }
+ currentPos = 0;
+ }
+
+
+ public int getInt()
+ {
+ checkReadSize(DataConstants.SIZE_INT);
+ return getCurrentBuffer().getInt();
+ }
+
+ public long getLong()
+ {
+ checkReadSize(DataConstants.SIZE_LONG);
+ return getCurrentBuffer().getLong();
+ }
+
+ public short getShort()
+ {
+ checkReadSize(DataConstants.SIZE_SHORT);
+ return getCurrentBuffer().getShort();
+ }
+
+ public boolean getBoolean()
+ {
+ checkReadSize(DataConstants.SIZE_BOOLEAN);
+ return getCurrentBuffer().getBoolean();
+ }
+
+ public String getNullableString()
+ {
+ byte check = getByte();
+
+ if (check == NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getString();
+ }
+ }
+
+ public SimpleString getSimpleString()
+ {
+ int len = getInt();
+
+ byte[] data = new byte[len];
+ getBytes(data);
+
+ return new SimpleString(data);
+ }
+
+ public SimpleString getNullableSimpleString()
+ {
+ int b = getByte();
+ if (b == NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getSimpleString();
+ }
+ }
+
+ public byte getByte()
+ {
+ checkReadSize(DataConstants.SIZE_BYTE);
+ return getCurrentBuffer().getByte();
+ }
+
+ public void getBytes(byte[] data)
+ {
+ int left = getCurrentFragment().getEndPosition() - getCurrentBuffer().position();
+ if(data.length <= left)
+ {
+ getCurrentBuffer().getBytes(data);
+ }
+ else
+ {
+ int size = data.length;
+ int read = 0;
+ while (read < size)
+ {
+ left = getCurrentFragment().getEndPosition() - getCurrentBuffer().position();
+
+ getCurrentBuffer().getBytes(data, read, left);
+ read += left;
+ if(read < size)
+ {
+ currentPos++;
+ getCurrentFragment().reset();
+ }
+
+ }
+
+ }
+ }
+
+
+ public float getFloat()
+ {
+ checkReadSize(DataConstants.SIZE_FLOAT);
+ return getCurrentBuffer().getFloat();
+ }
+
+ public double getDouble()
+ {
+ checkReadSize(DataConstants.SIZE_DOUBLE);
+ return getCurrentBuffer().getDouble();
+ }
+
+ public char getChar()
+ {
+ checkReadSize(DataConstants.SIZE_CHAR);
+ return getCurrentBuffer().getChar();
+ }
+
+ public List<PacketFragment> getPacketFragments()
+ {
+ return packetFragments;
+ }
+
+ public void prepare()
+ {
+ for (int i = 0; i < packetFragments.size(); i++)
+ {
+ PacketFragment packetFragment = packetFragments.get(i);
+ int len = packetFragment.getMessagingBuffer().position() - packetFragment.getDataStartPosition() + getCurrentFragment().getHeaderSize();
+ packetFragment.getMessagingBuffer().putInt(0, len);
+ if(i == packetFragments.size() - 1)
+ {
+ packetFragment.getMessagingBuffer().putBoolean(DataConstants.SIZE_INT, true);
+ }
+ packetFragment.getMessagingBuffer().flip();
+ }
+ }
+
+ public short getUnsignedByte()
+ {
+ checkReadSize(DataConstants.SIZE_SHORT);
+ return getCurrentBuffer().getUnsignedByte();
+ }
+
+ public int getUnsignedShort()
+ {
+ checkReadSize(DataConstants.SIZE_INT);
+ return getCurrentBuffer().getUnsignedByte();
+ }
+
+ public int remaining()
+ {
+ return getPostBuffersSize() + getCurrentBuffer().remaining();
+ }
+
+ public int limit()
+ {
+ return getPreviousBuffersSize() + getCurrentBuffer().limit();
+ }
+
+ // Private -------------------------------------------------------
+
+ private void checkReadSize(int size)
+ {
+ if(!hasReadSpace(size))
+ {
+ currentPos++;
+ getCurrentFragment().reset();
+ }
+ }
+
+ private void checkBufferSpace(int size)
+ {
+ if(!hasWriteSpace(size))
+ {
+ setNextFragment();
+ }
+ }
+
+ private void setNextFragment()
+ {
+ getCurrentFragment().setLength(getCurrentBuffer().position());
+ packetFragments.add(new PacketFragmentImpl(getCurrentFragment().getPacketId(), false, getCurrentFragment().getCorrelationId() + 1,
+ getCurrentBuffer().createNewBuffer(packetFragmentSize)));
+ currentPos++;
+ putInt(-1);
+ putBoolean(false);
+ putInt(getCurrentFragment().getPacketId());
+ putInt(getCurrentFragment().getCorrelationId());
+ getCurrentFragment().setDataStartPosition(getCurrentBuffer().position());
+ }
+
+ private boolean hasWriteSpace(int size)
+ {
+ int left = getCurrentBuffer().limit() - getCurrentBuffer().position();
+ return size<=left;
+ }
+
+ private boolean hasReadSpace(int size)
+ {
+ return getCurrentBuffer().position() + size <= getCurrentFragment().getEndPosition();
+ }
+
+ private PacketFragment getCurrentFragment()
+ {
+ return packetFragments.get(currentPos);
+ }
+
+ private MessagingBuffer getCurrentBuffer()
+ {
+ return getCurrentFragment().getMessagingBuffer();
+ }
+
+ private int getPreviousBuffersSize()
+ {
+ return getPreviousBuffersSize(currentPos);
+ }
+
+ private int getPostBuffersSize()
+ {
+ return getPostBuffersSize(currentPos);
+ }
+
+ private int getPreviousBuffersSize(int currentPos)
+ {
+ int ret = 0;
+ for(int i = 0; i < currentPos; i++)
+ {
+ ret+=packetFragments.get(i).getLength() - getCurrentFragment().getHeaderSize() - DataConstants.SIZE_INT;
+ }
+ return ret;
+ }
+
+ private int getPostBuffersSize(int currentPos)
+ {
+ int ret = 0;
+ for(int i = currentPos + 1; i < packetFragments.size(); i++)
+ {
+ ret+=packetFragments.get(i).getLength();
+ }
+ return ret;
+ }
+
+ private int getBufferPosition(int position)
+ {
+ int pos = 0;
+ for (int i = 0; i < packetFragments.size(); i++)
+ {
+ int end = packetFragments.get(i).getLength();
+ if(position > pos && position < end)
+ {
+ return i;
+ }
+ }
+ return pos;
+ }
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+
+/**
+ * A fragment of a full packet that holds a portion of a full message within a buffer.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentImpl implements PacketFragment
+{
+ private int length;
+ private int packetId;
+ private int correlationId;
+ private boolean lastPacket;
+ private long sessionId;
+ private int dataStartPosition = -1;
+ private MessagingBuffer messagingBuffer;
+ private MessageCache messageCache;
+
+ public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessageCache messageCache, long id)
+ {
+ this.packetId = packetId;
+ this.lastPacket = lastPacket;
+ this.correlationId = correlationId;
+ this.messageCache = messageCache;
+ this.length = length;
+ this.sessionId = id;
+ dataStartPosition = 0;
+ }
+
+ public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessagingBuffer messagingBuffer)
+ {
+ this.packetId = packetId;
+ this.lastPacket = lastPacket;
+ this.correlationId = correlationId;
+ this.length = length;
+ this.messagingBuffer = messagingBuffer;
+ dataStartPosition = messagingBuffer.position();
+ }
+
+ public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, MessagingBuffer messagingBuffer)
+ {
+ this.packetId = packetId;
+ this.lastPacket = lastPacket;
+ this.correlationId = correlationId;
+ this.messagingBuffer = messagingBuffer;
+ }
+
+ public int getHeaderSize()
+ {
+ return (DataConstants.SIZE_INT * 2) + DataConstants.SIZE_BOOLEAN;
+ }
+ public int getPacketId()
+ {
+ return packetId;
+ }
+
+ public void setPacketId(int packetId)
+ {
+ this.packetId = packetId;
+ }
+
+ public void setCorrelationId(int correlationId)
+ {
+ this.correlationId = correlationId;
+ }
+
+ public void setLastPacket(boolean lastPacket)
+ {
+ this.lastPacket = lastPacket;
+ }
+
+ public int getCorrelationId()
+ {
+ return correlationId;
+ }
+
+ public int getLength()
+ {
+ return length;
+ }
+
+ public void setLength(int i)
+ {
+ length = i;
+ }
+
+ public boolean isLastPacket()
+ {
+ return lastPacket;
+ }
+
+ public MessagingBuffer getMessagingBuffer()
+ {
+ if(messagingBuffer == null)
+ {
+ messagingBuffer = messageCache.retrieve(length - 9, sessionId, packetId, correlationId);
+ }
+ return messagingBuffer;
+ }
+
+ public int getDataStartPosition()
+ {
+ return dataStartPosition;
+ }
+
+ public void setDataStartPosition(int dataStartPosition)
+ {
+ this.dataStartPosition = dataStartPosition;
+ }
+
+ public int getEndPosition()
+ {
+ return getDataStartPosition() + getLength() - getHeaderSize();
+ }
+ public void reset()
+ {
+ if(messagingBuffer != null)
+ {
+ messagingBuffer.position(dataStartPosition);
+ }
+ }
+}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.remoting.impl.mina;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.jboss.messaging.core.remoting.MessageCache;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -34,7 +35,7 @@
*/
public interface FilterChainSupport
{
- void addCodecFilter(final DefaultIoFilterChainBuilder filterChain);
+ void addCodecFilter(final DefaultIoFilterChainBuilder filterChain, int initialPacketFragmentSize, int packetFragmentSize, MessageCache messageCache);
void addSSLFilter(
final DefaultIoFilterChainBuilder filterChain, final boolean client,
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -30,6 +30,8 @@
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
+import org.jboss.messaging.core.remoting.MessageCache;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -50,11 +52,11 @@
// Public --------------------------------------------------------
- public void addCodecFilter(final DefaultIoFilterChainBuilder filterChain)
+ public void addCodecFilter(final DefaultIoFilterChainBuilder filterChain, int initialPacketFragmentSize, int packetFragmentSize, MessageCache messageCache)
{
assert filterChain != null;
- filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter()));
+ filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter(initialPacketFragmentSize, packetFragmentSize, messageCache)));
}
public void addSSLFilter(
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -68,6 +68,11 @@
// Public --------------------------------------------------------
+ public IoBuffer getBuffer()
+ {
+ return buffer;
+ }
+
// MessagingBuffer implementation ----------------------------------------------
public byte[] array()
@@ -137,7 +142,14 @@
public void putBytes(final byte[] bytes, int offset, int length)
{
- buffer.put(bytes, offset, length);
+ try
+ {
+ buffer.put(bytes, offset, length);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
public void putInt(final int intValue)
@@ -241,6 +253,17 @@
}
}
+ public void putBoolean(int pos, boolean val)
+ {
+ if (val)
+ {
+ buffer.put(pos, TRUE);
+ } else
+ {
+ buffer.put(pos, FALSE);
+ }
+ }
+
public boolean getBoolean()
{
byte b = buffer.get();
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -30,6 +30,8 @@
import org.jboss.messaging.core.remoting.Acceptor;
import org.jboss.messaging.core.remoting.CleanUpNotifier;
import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
@@ -50,6 +52,7 @@
private CleanUpNotifier cleanupNotifier;
private RemotingService remotingService;
private FilterChainSupport chainSupport = new FilterChainSupportImpl();
+ private MessageCache messageCache = new InMemoryMessageCache();
public void startAccepting(RemotingService remotingService, CleanUpNotifier cleanupNotifier) throws Exception
{
@@ -73,7 +76,8 @@
.getTrustStorePath(), remotingService.getConfiguration()
.getTrustStorePassword());
}
- chainSupport.addCodecFilter(filterChain);
+ chainSupport.addCodecFilter(filterChain, remotingService.getConfiguration().getInitialPacketFragmentSize(),
+ remotingService.getConfiguration().getPacketFragmentSize(), messageCache);
// Bind
acceptor.setDefaultLocalAddress(new InetSocketAddress(remotingService.getConfiguration().getHost(), remotingService.getConfiguration().getPort()));
@@ -172,6 +176,7 @@
{
remotingService.unregisterPinger(session.getId());
cleanupNotifier.fireCleanup(session.getId(), null);
+ messageCache.clear(session.getId());
}
}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -51,16 +51,9 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.ping.Pinger;
import org.jboss.messaging.core.ping.impl.PingerImpl;
-import org.jboss.messaging.core.remoting.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.RemotingConnector;
-import org.jboss.messaging.core.remoting.RemotingSession;
-import org.jboss.messaging.core.remoting.ResponseHandler;
-import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
import org.jboss.messaging.util.MessagingBuffer;
@@ -106,6 +99,8 @@
private boolean alive = true;
+ private MessageCache messageCache;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -156,6 +151,7 @@
this.connector = connector;
this.connectorConfig = connector.getSessionConfig();
this.chainSupport = chainSupport;
+ messageCache = new InMemoryMessageCache();
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
@@ -176,7 +172,8 @@
throw ise;
}
}
- this.chainSupport.addCodecFilter(filterChain);
+ this.chainSupport.addCodecFilter(filterChain, connectionParams.getInitialPacketFragmentSize(),
+ connectionParams.getPacketFragmentSize(), messageCache);
connectorConfig.setTcpNoDelay(connectionParams.isTcpNoDelay());
int receiveBufferSize = connectionParams.getTcpReceiveBufferSize();
if (receiveBufferSize != -1)
@@ -353,7 +350,7 @@
{
listener.sessionDestroyed(sessionID, me);
}
-
+ messageCache.clear(sessionID);
session = null;
connector = null;
}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.core.remoting.DecoderHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MinaDecoderHandler implements DecoderHandler
+{
+ private ProtocolDecoderOutput out;
+
+ public MinaDecoderHandler(final ProtocolDecoderOutput out)
+ {
+ this.out = out;
+ }
+
+ public void handle(Packet packet)
+ {
+ out.write(packet);
+ }
+}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.core.remoting.EncoderHandler;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MinaEncoderHandler implements EncoderHandler
+{
+ private ProtocolEncoderOutput out;
+
+ public MinaEncoderHandler(ProtocolEncoderOutput output)
+ {
+ this.out = output;
+ }
+ public void handle(MessagingBuffer buffer)
+ {
+ IoBufferWrapper ioBufferWrapper = (IoBufferWrapper) buffer;
+ out.write(ioBufferWrapper.getBuffer());
+ }
+}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -26,11 +26,14 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.*;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.MessagingCodec;
-import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.MessagingCodecImpl;
+import org.jboss.messaging.core.remoting.impl.PacketAssemblerImpl;
import org.jboss.messaging.util.MessagingBuffer;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* A Mina ProtocolEncoder used to encode/decode messages.
*
@@ -44,8 +47,19 @@
// ProtocolCodecFactory implementation
// -----------------------------------------------------------------------------------
- private MessagingCodec messagingCodec = new MessagingCodecImpl();
+ private PacketAssembler packetAssembler;
+
+ private AtomicInteger packetId = new AtomicInteger(0);
+ private int initialPacketFragmentSize;
+
+ public MinaProtocolCodecFilter(int initialPacketFragmentSize, int packetFragmentSize, MessageCache messageCache)
+ {
+ super();
+ this.initialPacketFragmentSize = initialPacketFragmentSize;
+ packetAssembler = new PacketAssemblerImpl(packetFragmentSize, messageCache);
+ }
+
public ProtocolDecoder getDecoder(final IoSession session)
{
return this;
@@ -66,15 +80,11 @@
final ProtocolEncoderOutput out) throws Exception
{
- IoBuffer iobuf = IoBuffer.allocate(1024, false);
-
+ IoBuffer iobuf = IoBuffer.allocate(initialPacketFragmentSize, false);
iobuf.setAutoExpand(true);
-
MessagingBuffer buffer = new IoBufferWrapper(iobuf);
-
- messagingCodec.encode(buffer, message);
-
- out.write(iobuf);
+ EncoderHandler outputHandler = new MinaEncoderHandler(out);
+ packetAssembler.disAssemble(buffer, outputHandler, message);
}
// CumulativeProtocolDecoder overrides
@@ -83,13 +93,8 @@
public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
{
MessagingBuffer buff = new IoBufferWrapper(in);
-
- Packet packet = messagingCodec.decode(buff);
- if(packet != null)
- {
- out.write(packet);
- }
- return packet != null;
+ DecoderHandler decoderHandler = new MinaDecoderHandler(out);
+ return packetAssembler.assemble(buff, decoderHandler, session.getId());
}
}
Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,17 @@
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.core.remoting.PacketFragment;
+
+import javax.transaction.xa.Xid;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragmentBuffer extends MessagingBuffer
+{
+ List<PacketFragment> getPacketFragments();
+
+ void prepare();
+}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,7 +25,6 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.util.DataConstants;
import org.jboss.messaging.util.MessagingBuffer;
/**
@@ -173,10 +172,9 @@
}
public void encode(MessagingBuffer buffer)
- {
+ {
//The standard header fields
- buffer.putInt(0); //The length gets filled in at the end
- buffer.putByte(type);
+ buffer.putByte(type);
buffer.putLong(responseTargetID);
buffer.putLong(targetID);
buffer.putLong(executorID);
@@ -184,11 +182,11 @@
encodeBody(buffer);
//The length doesn't include the actual length byte
- int len = buffer.position() - DataConstants.SIZE_INT;
+ /*int len = bufferEncoder.position() - DataConstants.SIZE_INT;
- buffer.putInt(0, len);
+ bufferEncoder.putInt(0, len);
- buffer.flip();
+ buffer.flip();*/
}
public void decode(final MessagingBuffer buffer) throws Exception
@@ -241,4 +239,5 @@
// Inner classes -------------------------------------------------
+
}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -89,13 +89,9 @@
public void decodeBody(final MessagingBuffer buffer)
{
- //TODO can be optimised
-
serverMessage = new ServerMessageImpl();
serverMessage.decode(buffer);
-
- serverMessage.getBody().flip();
}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -116,8 +116,7 @@
clientMessage = new ClientMessageImpl(deliveryCount, deliveryID);
clientMessage.decode(buffer);
-
- clientMessage.getBody().flip();
+
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,7 +25,7 @@
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.util.MessagingBuffer;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -415,7 +415,7 @@
{
super.clearBody();
- body = new IoBufferWrapper(1024);
+ body = body.createNewBuffer(1024);
}
public long getBodyLength() throws JMSException
@@ -425,11 +425,11 @@
return body.limit();
}
- public void doBeforeSend() throws Exception
+ public void encode(MessagingBuffer buffer)
{
- reset();
-
- message.setBody(body);
+ super.encode(buffer);
+ body.rewind();
+ buffer.putBytes(body.array(), 0, body.limit());
}
// Public --------------------------------------------------------
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -24,8 +24,10 @@
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketFragmentBuffer;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
+import org.jboss.messaging.util.MessagingBuffer;
import javax.jms.JMSException;
import javax.jms.MapMessage;
@@ -439,12 +441,10 @@
map.clear();
}
-
- public void doBeforeSend() throws Exception
+
+ public void encode(MessagingBuffer buff)
{
- map.encode(body);
-
- super.doBeforeSend();
+ map.encode(buff);
}
public void doBeforeReceive() throws Exception
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.client.impl.ClientMessageImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.MessageBodyEncoder;
import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.util.ByteBufferWrapper;
import org.jboss.messaging.util.MessagingBuffer;
@@ -55,7 +56,7 @@
*
* $Id: JBossMessage.java 3466 2007-12-10 18:44:52Z timfox $
*/
-public class JBossMessage implements javax.jms.Message
+public class JBossMessage implements javax.jms.Message, MessageBodyEncoder
{
// Constants -----------------------------------------------------
@@ -187,7 +188,7 @@
protected JBossMessage(final byte type, final ClientSession session)
{
message = session.createClientMessage(type, true, 0, System.currentTimeMillis(), (byte)4);
-
+ message.setMessageBodyEncoder(this);
//TODO - can we lazily create this?
body = message.getBody();
}
@@ -207,6 +208,8 @@
this.readOnly = true;
this.session = session;
+
+ message.setMessageBodyEncoder(this);
this.body = message.getBody();
}
@@ -876,13 +879,16 @@
{
return message;
}
-
- public void doBeforeSend() throws Exception
+
+ public void encode(MessagingBuffer buffer)
{
- body.flip();
-
- message.setBody(body);
+ //do nothing
}
+
+ public void decode(MessagingBuffer buffer, int len)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
public void doBeforeReceive() throws Exception
{
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -412,19 +412,6 @@
jbm.setJMSDestination(destination);
- try
- {
- jbm.doBeforeSend();
- }
- catch (Exception e)
- {
- JMSException je = new JMSException(e.getMessage());
-
- je.initCause(e);
-
- throw je;
- }
-
JBossDestination dest = (JBossDestination)destination;
try
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,6 +25,7 @@
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.util.ObjectInputStreamWithClassLoader;
+import org.jboss.messaging.util.MessagingBuffer;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
@@ -93,29 +94,37 @@
{
return JBossObjectMessage.TYPE;
}
-
- public void doBeforeSend() throws Exception
+
+ public void encode(MessagingBuffer buffer)
{
if (object != null)
{
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-
- ObjectOutputStream oos = new ObjectOutputStream(baos);
-
- oos.writeObject(object);
-
- oos.flush();
-
+
+ try
+ {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+ oos.writeObject(object);
+
+ oos.flush();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to serialize object");
+ }
+
byte[] data = baos.toByteArray();
-
- body.putInt(data.length);
- body.putBytes(data);
+
+ buffer.putInt(data.length);
+ buffer.putBytes(data);
}
-
- super.doBeforeSend();
+ else
+ {
+ buffer.putInt(0);
+ }
}
-
-
+
// ObjectMessage implementation ----------------------------------
public void setObject(Serializable object) throws JMSException
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -567,16 +567,9 @@
{
super.clearBody();
- body = new IoBufferWrapper(1024);
+ body = body.createNewBuffer(1024);
}
- public void doBeforeSend() throws Exception
- {
- reset();
-
- message.setBody(body);
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -26,6 +26,8 @@
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@@ -121,13 +123,17 @@
}
// JBossMessage override -----------------------------------------
-
- public void doBeforeSend() throws Exception
+
+ public void encode(MessagingBuffer buffer)
{
- body.putNullableString(text);
-
- super.doBeforeSend();
+ buffer.putNullableString(text);
}
+
+ public void decode(MessagingBuffer buffer, int len)
+ {
+ super.decode(buffer, len);
+ text = body.getNullableString();
+ }
public void doBeforeReceive() throws Exception
{
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ByteBufferWrapper.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -301,7 +301,18 @@
}
}
- public void putByte(byte val)
+ public void putBoolean(int pos, boolean val)
+ {
+ if (val)
+ {
+ buffer.put(pos, TRUE);
+ } else
+ {
+ buffer.put(pos, FALSE);
+ }
+ }
+
+ public void putByte(byte val)
{
buffer.put(val);
}
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -211,6 +211,17 @@
putByte((byte) (val ? -1 : 0));
}
+ public void putBoolean(int pos, boolean val)
+ {
+ if (val)
+ {
+ buf.put(pos, TRUE);
+ } else
+ {
+ buf.put(pos, FALSE);
+ }
+ }
+
public void putByte(byte val)
{
ensureRemaining(1).put(val);
Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/MessagingBuffer.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -50,9 +50,11 @@
void putFloat(float val);
void putBoolean(boolean val);
-
+
+ void putBoolean(int pos, boolean val);
+
void putChar(char val);
-
+
void putNullableString(String val);
void putString(String val);
Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -41,6 +41,7 @@
import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupportImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -84,7 +85,7 @@
support.addSSLFilter(acceptor.getFilterChain(), false, keystorePath,
keystorePassword, trustStorePath, trustStorePassword);
- support.addCodecFilter(acceptor.getFilterChain());
+ support.addCodecFilter(acceptor.getFilterChain(), 2048, 1024, new InMemoryMessageCache());
acceptor.setDefaultLocalAddress(address);
final CountDownLatch latch = new CountDownLatch(1);
@@ -103,7 +104,7 @@
NioSocketConnector connector = new NioSocketConnector();
support.addSSLFilter(connector.getFilterChain(), true,
keystorePath, keystorePassword, null, null);
- support.addCodecFilter(connector.getFilterChain());
+ support.addCodecFilter(connector.getFilterChain(), 2048, 1024, new InMemoryMessageCache());
connector.setHandler(new IoHandlerAdapter());
ConnectFuture future = connector.connect(address).awaitUninterruptibly();
IoSession session = future.getSession();
Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -23,6 +23,8 @@
package org.jboss.messaging.tests.integration.core.remoting.mina;
import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
+import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnector;
@@ -56,7 +58,10 @@
@Override
protected RemotingConnector createNIOConnector(PacketDispatcher dispatcher)
{
- return new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), dispatcher);
+ ConnectionParams connectionParams = new ConnectionParamsImpl();
+ connectionParams.setPingInterval(0);
+ connectionParams.setCallTimeout(10000000);
+ return new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT),connectionParams, dispatcher);
}
@Override
Added: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java
===================================================================
--- branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java (rev 0)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,387 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.remoting.impl;
+
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.core.remoting.impl.PacketFragmentBufferImpl;
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.ByteBufferWrapper;
+import org.jboss.messaging.util.SimpleString;
+import static org.jboss.messaging.util.DataConstants.NOT_NULL;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentBufferImplTest extends UnitTestCase
+{
+ private MessagingBuffer messagingBuffer;
+ private PacketFragmentBufferImpl buffer;
+
+ protected void setUp() throws Exception
+ {
+ messagingBuffer = new ByteBufferWrapper(ByteBuffer.allocate(1024));
+ }
+
+ public void testConstructor1()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.rewind();
+ assertEquals(messagingBuffer.getInt(), -1);
+ assertFalse(messagingBuffer.getBoolean());
+ assertEquals(messagingBuffer.getInt(), 10);
+ assertEquals(messagingBuffer.getInt(), 0);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(buffer.getPacketFragments().get(0).getDataStartPosition(), 13);
+ assertEquals(buffer.getPacketFragments().get(0).getDataStartPosition(), 13);
+ }
+
+ public void testPutFloat()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1020);
+ buffer.putFloat(55);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutFloatRollsOver()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1021);
+ buffer.putFloat(55);
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1021);
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ assertEquals(secondBuffer.position(), 17);
+ secondBuffer.position(13);
+ assertEquals(secondBuffer.getFloat(), 55f);
+ }
+
+ public void testPutInt()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1020);
+ buffer.putInt(55);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutIntRollsOver()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1021);
+ buffer.putInt(55);
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1021);
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ assertEquals(secondBuffer.position(), 17);
+ secondBuffer.position(13);
+ assertEquals(secondBuffer.getInt(), 55);
+ }
+
+ public void testPutByte()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1023);
+ buffer.putByte((byte) 55);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutByteRollsOver()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1024);
+ buffer.putByte((byte) 55);
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1024);
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ assertEquals(secondBuffer.position(), 14);
+ secondBuffer.position(13);
+ assertEquals(secondBuffer.getByte(), (byte) 55);
+ }
+
+ public void testPutLong()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1016);
+ buffer.putLong(55);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutLongRollsOver()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1017);
+ buffer.putLong(55);
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1017);
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ assertEquals(secondBuffer.position(), 21);
+ secondBuffer.position(13);
+ assertEquals(secondBuffer.getLong(), 55);
+ }
+
+ public void testPutBoolean()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1023);
+ buffer.putBoolean(true);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutBooleanRollsOver()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1024);
+ buffer.putBoolean(true);
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1024);
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ assertEquals(secondBuffer.position(), 14);
+ secondBuffer.position(13);
+ assertEquals(secondBuffer.getBoolean(), true);
+ }
+
+ public void testPutChar()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1022);
+ buffer.putChar('d');
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutCharRollsOver()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ messagingBuffer.position(1023);
+ buffer.putChar('d');
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1023);
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ assertEquals(secondBuffer.position(), 15);
+ secondBuffer.position(13);
+ assertEquals(secondBuffer.getChar(), 'd');
+ }
+
+ public void testPutNullableString()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ String s = "This is a String that is a 45 characters long";
+ messagingBuffer.position(1024 - 95);//45 * 2 + 4 (int) + 1 (byte)
+ buffer.putNullableString(s);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutNullNullableString()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ String s = null;
+ messagingBuffer.position(1023);
+ buffer.putNullableString(s);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ }
+
+ public void testPutNullableStringSplitAcrossTwoBuffers()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ String s = "This is a String that is a 45 characters long";
+ messagingBuffer.position(1024 - 51);//45 * 2 + 4 (int) + 1 (byte)
+ buffer.putNullableString(s);
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1024);
+ messagingBuffer.position(1024-51);
+ assertEquals(messagingBuffer.getByte(), NOT_NULL);
+ assertEquals(messagingBuffer.getInt(), 45);
+ String newString = "";
+ for(int i = 0; i < 23; i++)
+ {
+ newString+=messagingBuffer.getChar();
+ }
+ assertEquals(newString, "This is a String that i");
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ secondBuffer.position(13);
+ newString = "";
+ for(int i = 0; i < 22; i++)
+ {
+ newString+=secondBuffer.getChar();
+ }
+ assertEquals(newString, "s a 45 characters long");
+ }
+
+ public void testPutSimpleString()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ SimpleString s = new SimpleString("This is a String that is a 45 characters long");
+ messagingBuffer.position(1024 - s.getData().length - 4);//the length plus an int
+ buffer.putSimpleString(s);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ messagingBuffer.position(1024 - s.getData().length - 4);
+ assertEquals(messagingBuffer.getSimpleString(), s);
+ }
+
+ public void testPutSimpleStringSplitAcrossBuffers()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ SimpleString s = new SimpleString("This is a String that is a 45 characters long");
+ messagingBuffer.position(1024 - 23 - 4);//half the length plus an int
+ buffer.putSimpleString(s);
+ assertEquals(buffer.getPacketFragments().size(), 2);
+ assertEquals(messagingBuffer.position(), 1024);
+ messagingBuffer.position(1024 - 23 - 4);
+ assertEquals(messagingBuffer.getInt(), 45 * 2);
+ for(int i = 0; i < 23; i++)
+ {
+ assertEquals(messagingBuffer.getByte(), s.getData()[i]);
+ }
+ MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ secondBuffer.position(13);
+ for(int i = 0; i < 22; i++)
+ {
+ assertEquals(secondBuffer.getByte(), s.getData()[i + 23]);
+ }
+ }
+
+ public void testPutNullableSimpleString()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ SimpleString s = new SimpleString("This is a String that is a 45 characters long");
+ messagingBuffer.position(1024 - s.getData().length - 4 - 1);//the length plus an int and byte
+ buffer.putNullableSimpleString(s);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ messagingBuffer.position(1024 - s.getData().length - 4 - 1);
+ assertEquals(messagingBuffer.getNullableSimpleString(), s);
+ }
+
+ public void testPutNullNullableSimpleString()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ SimpleString s = null;
+ messagingBuffer.position(1023);//the length plus an int and byte
+ buffer.putNullableSimpleString(s);
+ assertEquals(buffer.getPacketFragments().size(), 1);
+ assertEquals(messagingBuffer.position(), 1024);
+ messagingBuffer.position(1023);
+ assertEquals(messagingBuffer.getNullableSimpleString(), s);
+ }
+
+ public void testPutBytes()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ byte[] bytes = RandomUtil.randomBytes(5000);
+ buffer.putBytes(bytes);
+ assertEquals(buffer.getPacketFragments().size(), 5);
+ byte[] bytes2 = new byte[bytes.length];
+ for (PacketFragment b : buffer.getPacketFragments())
+ {
+ b.getMessagingBuffer().position(13);
+ }
+ for(int i = 13; i < 1024; i++)
+ {
+ assertEquals(messagingBuffer.getByte(), bytes[i-13]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ for(int i = 1024 + 13; i < 2048; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, bytes[i-26]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(2).getMessagingBuffer();
+ for(int i = 2048 + 13; i < 3072; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, bytes[i-39]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(3).getMessagingBuffer();
+ for(int i = 3072 + 13; i < 4096; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, bytes[i-52]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(4).getMessagingBuffer();
+ for(int i = 4096 + 13; i <= 5000; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, bytes[i-65]);
+ }
+ }
+
+ public void testPutBytesOffset()
+ {
+ buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+ byte[] bytes = RandomUtil.randomBytes(7000);
+ byte[] middleBytes = new byte[5000];
+ buffer.putBytes(bytes, 1000, 5000);
+ System.arraycopy(bytes, 1000, middleBytes, 0, 5000);
+ assertEquals(buffer.getPacketFragments().size(), 5);
+ byte[] bytes2 = new byte[bytes.length];
+ for (PacketFragment b : buffer.getPacketFragments())
+ {
+ b.getMessagingBuffer().position(13);
+ }
+ for(int i = 13; i < 1024; i++)
+ {
+ assertEquals(messagingBuffer.getByte(), middleBytes[i-13]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+ for(int i = 1024 + 13; i < 2048; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, middleBytes[i-26]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(2).getMessagingBuffer();
+ for(int i = 2048 + 13; i < 3072; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, middleBytes[i-39]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(3).getMessagingBuffer();
+ for(int i = 3072 + 13; i < 4096; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, middleBytes[i-52]);
+ }
+ messagingBuffer = buffer.getPacketFragments().get(4).getMessagingBuffer();
+ for(int i = 4096 + 13; i <= 5000; i++)
+ {
+ byte b = messagingBuffer.getByte();
+ assertEquals(b, middleBytes[i-65]);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ messagingBuffer = null;
+ buffer = null;
+ }
+}
Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -87,7 +87,7 @@
session.write(packet);
- assertTrue(serverPacketHandler.await(2, SECONDS));
+ assertTrue(serverPacketHandler.await(20, SECONDS));
List<Packet> messages = serverPacketHandler.getPackets();
assertEquals(1, messages.size());
Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -67,6 +67,7 @@
import org.jboss.messaging.core.remoting.RemotingSession;
import org.jboss.messaging.core.remoting.TransportType;
import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
@@ -645,7 +646,7 @@
delegateBuilder = new DefaultIoFilterChainBuilder();
- filterChainSupport.addCodecFilter(delegateBuilder);
+ filterChainSupport.addCodecFilter(delegateBuilder, 2048, 1024, new InMemoryMessageCache());
sessionConfig = EasyMock.createMock(SocketSessionConfig.class);
Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -271,7 +271,7 @@
buffer.rewind();
SimpleProtocolDecoderOutput out = new SimpleProtocolDecoderOutput();
- MinaProtocolCodecFilter codec = new MinaProtocolCodecFilter();
+ MinaProtocolCodecFilter codec = new MinaProtocolCodecFilter(2048, -1, null);
codec.doDecode(null, IoBuffer.wrap(buffer.array()), out);
Object message = out.getMessage();
assertTrue(message instanceof Packet);
@@ -304,7 +304,7 @@
out.write(isA(IoBuffer.class));
replay(session, out);
- MinaProtocolCodecFilter filter = new MinaProtocolCodecFilter();
+ MinaProtocolCodecFilter filter = new MinaProtocolCodecFilter(2048, -1, null);
filter.encode(session, message, out);
verify(session, out);
Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -122,7 +122,7 @@
JBossObjectMessage msg = new JBossObjectMessage();
msg.setObject(object);
- msg.doBeforeSend();
+ //msg.doBeforeSend();
MessagingBuffer body = msg.getCoreMessage().getBody();
assertEquals(data.length, body.getInt());
Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java 2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java 2008-07-23 11:27:02 UTC (rev 4715)
@@ -112,7 +112,7 @@
JBossTextMessage msg = new JBossTextMessage();
msg.setText(text);
- msg.doBeforeSend();
+ //msg.doBeforeSend();
MessagingBuffer body = msg.getCoreMessage().getBody();
String s = body.getNullableString();
More information about the jboss-cvs-commits
mailing list