[jboss-cvs] JBoss Messaging SVN: r4744 - in branches: Branch_Message_Chunking_new and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 29 09:42:18 EDT 2008
Author: ataylor
Date: 2008-07-29 09:42:17 -0400 (Tue, 29 Jul 2008)
New Revision: 4744
Added:
branches/Branch_Message_Chunking_new/
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
Modified:
branches/Branch_Message_Chunking_new/build-messaging.xml
branches/Branch_Message_Chunking_new/messaging.ipr
branches/Branch_Message_Chunking_new/src/config/jbm-configuration.xml
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ConnectionParams.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/Configuration.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
Log:
new message chunking branch
Copied: branches/Branch_Message_Chunking_new (from rev 4739, trunk)
Modified: branches/Branch_Message_Chunking_new/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/build-messaging.xml 2008-07-29 13:42:17 UTC (rev 4744)
@@ -901,6 +901,7 @@
<target name="runServer" depends="jar">
<java classname="org.jboss.messaging.microcontainer.JBMBootstrapServer" fork="true">
+ <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"/>
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
Modified: branches/Branch_Message_Chunking_new/messaging.ipr
===================================================================
--- trunk/messaging.ipr 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/messaging.ipr 2008-07-29 13:42:17 UTC (rev 4744)
@@ -47,6 +47,9 @@
<component name="CompilerConfiguration">
<option name="DEFAULT_COMPILER" value="Javac" />
<option name="DEPLOY_AFTER_MAKE" value="0" />
+ <excludeFromCompile>
+ <file url="file://$PROJECT_DIR$/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java" />
+ </excludeFromCompile>
<resourceExtensions>
<entry name=".+\.(properties|xml|html|dtd|tld)" />
<entry name=".+\.(gif|png|jpeg|jpg)" />
Modified: branches/Branch_Message_Chunking_new/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/config/jbm-configuration.xml 2008-07-29 13:42:17 UTC (rev 4744)
@@ -51,6 +51,13 @@
<!--How long to wait for a returning pong after sending a ping message to a client/server.-->
<!-- If no pong is received after this time resources are cleaned up-->
<remoting-ping-timeout>5000</remoting-ping-timeout>
+
+ <!--The initial size of the buffer created for the transport-->
+ <remoting-initial-packet-fragment-size>1024</remoting-initial-packet-fragment-size>
+
+ <!--The maximum size of packet to send on the transport, except for the first packet which is the r
+ emoting-initial-packet-fragment-size parameter. setting to 0 means that packet chunking is disabled-->
+ <remoting-packet-fragment-size>1024</remoting-packet-fragment-size>
<!-- if ssl is enabled, all remoting-ssl-* properties must be set -->
<remoting-enable-ssl>false</remoting-enable-ssl>
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -75,5 +75,13 @@
String getTrustStorePassword();
- void setTrustStorePassword(String trustStorePassword);
+ void setTrustStorePassword(String trustStorePassword);
+
+ int getPacketFragmentSize();
+
+ int getInitialPacketFragmentSize();
+
+ void setInitialPacketFragmentSize(int initialPacketFragmentSize);
+
+ void setPacketFragmentSize(int packetFragmentSize);
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -45,6 +45,10 @@
public static final int DEFAULT_TCP_SEND_BUFFER_SIZE = 32 * 1024; // in bytes
public static final boolean DEFAULT_SSL_ENABLED = false;
+
+ public static final Integer DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE = 2048;
+
+ public static final Integer DEFAULT_PACKET_FRAGMENT_SIZE = 1024;
public static final String SSL_KEYSTORE_PATH_PROPERTY_NAME = "jbm.remoting.ssl.keystore.path";
@@ -83,6 +87,10 @@
private String trustStorePath;
private String trustStorePassword;
+
+ private int initialPacketFragmentSize = DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE;
+
+ private int packetFragmentSize = DEFAULT_PACKET_FRAGMENT_SIZE;
public long getCallTimeout()
{
@@ -202,6 +210,26 @@
{
this.trustStorePassword = trustStorePassword;
}
+
+ public void setInitialPacketFragmentSize(int initialPacketFragmentSize)
+ {
+ this.initialPacketFragmentSize = initialPacketFragmentSize;
+ }
+
+ public int getInitialPacketFragmentSize()
+ {
+ return initialPacketFragmentSize;
+ }
+
+ public void setPacketFragmentSize(int packetFragmentSize)
+ {
+ this.packetFragmentSize = packetFragmentSize;
+ }
+
+ public int getPacketFragmentSize()
+ {
+ return packetFragmentSize;
+ }
public boolean equals(Object other)
{
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/Configuration.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.config;
@@ -32,117 +32,121 @@
import org.jboss.messaging.core.server.JournalType;
/**
- *
+ *
* A Configuration
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public interface Configuration extends Serializable
{
// General attributes -------------------------------------------------------------------
-
+
boolean isClustered();
-
+
void setClustered(boolean clustered);
int getScheduledThreadPoolMaxSize();
-
+
void setScheduledThreadPoolMaxSize(int maxSize);
-
+
long getSecurityInvalidationInterval();
-
+
void setSecurityInvalidationInterval(long interval);
-
+
boolean isSecurityEnabled();
-
+
void setSecurityEnabled(boolean enabled);
-
+
boolean isRequireDestinations();
-
+
void setRequireDestinations(boolean require);
// Remoting related attributes ----------------------------------------------------------
-
+
List<String> getInterceptorClassNames();
-
+
Set<String> getAcceptorFactoryClassNames();
-
+
ConnectionParams getConnectionParams();
-
+
TransportType getTransport();
-
+
void setTransport(TransportType transport);
String getHost();
-
+
void setHost(String host);
int getPort();
-
+
void setPort(int port);
Location getLocation();
-
+
String getKeyStorePath();
-
+
void setKeyStorePath(String path);
String getKeyStorePassword();
-
+
void setKeyStorePassword(String password);
String getTrustStorePath();
-
+
void setTrustStorePath(String path);
String getTrustStorePassword();
-
+
void setTrustStorePassword(String password);
-
+
boolean isSSLEnabled();
-
+
void setSSLEnabled(boolean enabled);
-
+
// Journal related attributes
-
+
String getBindingsDirectory();
-
+
void setBindingsDirectory(String dir);
String getJournalDirectory();
-
+
void setJournalDirectory(String dir);
JournalType getJournalType();
-
+
void setJournalType(JournalType type);
boolean isJournalSyncTransactional();
-
+
void setJournalSyncTransactional(boolean sync);
-
+
boolean isJournalSyncNonTransactional();
-
+
void setJournalSyncNonTransactional(boolean sync);
int getJournalFileSize();
-
+
void setJournalFileSize(int size);
int getJournalMinFiles();
-
+
void setJournalMinFiles(int files);
-
+
int getJournalMaxAIO();
-
+
void setJournalMaxAIO(int maxAIO);
-
+
boolean isCreateBindingsDir();
-
+
void setCreateBindingsDir(boolean create);
boolean isCreateJournalDir();
-
+
void setCreateJournalDir(boolean create);
+
+ int getInitialPacketFragmentSize();
+
+ int getPacketFragmentSize();
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -91,8 +91,12 @@
public static final int DEFAULT_MAX_AIO = 5000;
- private static final long serialVersionUID = 4077088945050267843L;
+ private static final long serialVersionUID = 4077088945050267843L;
+ public static final int INITIAL_PACKET_FRAGMENT_SIZE = 2048;
+
+ public static final int PACKET_FRAGMENT_SIZE = 1024;
+
// Attributes -----------------------------------------------------------------------------
@@ -136,6 +140,10 @@
protected String host = DEFAULT_HOST;
protected int port = DEFAULT_PORT;
+
+ protected int initialPacketFragmentSize = INITIAL_PACKET_FRAGMENT_SIZE;
+
+ protected int packetFragmentSize = PACKET_FRAGMENT_SIZE;
protected final ConnectionParams defaultConnectionParams = new ConnectionParamsImpl();
@@ -406,6 +414,16 @@
this.securityEnabled = enabled;
}
+ public int getInitialPacketFragmentSize()
+ {
+ return initialPacketFragmentSize;
+ }
+
+ public int getPacketFragmentSize()
+ {
+ return packetFragmentSize;
+ }
+
public ConnectionParams getConnectionParams()
{
return this.defaultConnectionParams;
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -116,6 +116,10 @@
trustStorePassword = getString(e, "remoting-ssl-truststore-password", null);
+ initialPacketFragmentSize = getInteger(e, "remoting-initial-packet-fragment-size", ConnectionParamsImpl.DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE);
+
+ packetFragmentSize = getInteger(e, "remoting-packet-fragment-size", ConnectionParamsImpl.DEFAULT_PACKET_FRAGMENT_SIZE);
+
defaultConnectionParams.setCallTimeout(callTimeout);
defaultConnectionParams.setInVMOptimisationEnabled(inVMOptimisationEnabled);
@@ -129,6 +133,11 @@
defaultConnectionParams.setPingInterval(pingInterval);
defaultConnectionParams.setSSLEnabled(sslEnabled);
+
+ defaultConnectionParams.setInitialPacketFragmentSize(initialPacketFragmentSize);
+
+ defaultConnectionParams.setPacketFragmentSize(packetFragmentSize);
+
NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,13 @@
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface MessageCache
+{
+ void cache(MessagingBuffer buffer, int length, long sessionId, int packetId, int correlationId);
+
+ MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId);
+
+ void clear(long id);
+}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -52,9 +52,11 @@
void putFloat(float val);
void putBoolean(boolean val);
+
+ void putBoolean(int pos, boolean b);
+
+ void putChar(char val);
- void putChar(char val);
-
void putNullableString(String val);
void putString(String val);
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,17 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.spi.Connection;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketAssembler
+{
+ Packet assemble(MessagingBuffer buffer, RemotingHandler handler, long id) throws Exception;
+
+ void disAssemble(MessagingBuffer buffer, Connection connection, Packet message);
+
+ int getInitialPacketFragmentSize();
+
+ int getPacketFragmentSize();
+}
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,86 @@
+package org.jboss.messaging.core.remoting;
+
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragment
+{
+ int getPacketId();
+
+ int getCorrelationId();
+
+ int getLength();
+
+ boolean isLastPacket();
+
+ int getEndPosition();
+
+ boolean checkSpace(int size);
+
+ boolean hasReadSpace(int size);
+
+ void putFloat(float val);
+
+ void putInt(int i);
+
+ void putByte(byte b);
+
+ void putLong(long l);
+
+ void putBoolean(boolean b);
+
+ void putBytes(byte[] bytes, int i, int i1);
+
+ void putDouble(double val);
+
+ void putShort(short val);
+
+ void putChar(char val);
+
+ int getInt();
+
+ long getLong();
+
+ short getShort();
+
+ boolean getBoolean();
+
+ byte getByte();
+
+ void getBytes(byte[] data);
+
+ void getBytes(byte[] data, int read, int left);
+
+ float getFloat();
+
+ double getDouble();
+
+ char getChar();
+
+ short getUnsignedByte();
+
+ int getBodyLength();
+
+ void setBodyLength(int bodyLength);
+
+ void writeHeaders();
+
+ int remaining();
+
+ int limit();
+
+ PacketFragment createNextPacketFragment(int packetFragmentSize);
+
+ void putBoolean(int actualPos, boolean val);
+
+ void putInt(int actualPos, int val);
+
+ void rewind();
+
+ void flip();
+
+ void position(int position);
+
+ MessagingBuffer getMessagingBuffer();
+}
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragmentBuffer extends MessagingBuffer
+{
+ List<PacketFragment> getPacketFragments();
+
+ void prepare();
+}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -40,5 +40,7 @@
Set<Long> scanForFailedConnections(long expirePeriod);
- void removeLastPing(long connectionID);
+ void removeLastPing(long connectionID);
+
+ Packet decode(final long connectionID, final MessagingBuffer in) throws Exception;
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -307,7 +307,19 @@
}
}
- public void putByte(byte val)
+ public void putBoolean(int pos, boolean b)
+ {
+ if (b)
+ {
+ buffer.put(pos, TRUE);
+ }
+ else
+ {
+ buffer.put(pos, FALSE);
+ }
+ }
+
+ public void putByte(byte val)
{
buffer.put(val);
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -30,12 +30,7 @@
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.ConnectionRegistry;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.RemotingHandler;
-import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.spi.Connector;
import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
import org.jboss.messaging.core.remoting.spi.Connection;
@@ -88,17 +83,21 @@
return connection;
}
else
- {
+ {
+ MessageCache messageCache = new InMemoryMessageCache();
+
+ PacketAssembler assembler = new PacketAssemblerImpl(connectionParams.getInitialPacketFragmentSize(), connectionParams.getPacketFragmentSize(), messageCache);
+
PacketDispatcher dispatcher = new PacketDispatcherImpl(null);
- RemotingHandler handler = new RemotingHandlerImpl(dispatcher, null);
+ RemotingHandler handler = new RemotingHandlerImpl(dispatcher, null, assembler);
Connector connector = createConnector(location, connectionParams, handler, this);
connector.start();
Connection tc = connector.createConnection();
-
+
if (tc == null)
{
throw new IllegalStateException("Failed to connect to " + location);
@@ -109,12 +108,12 @@
if (pingInterval != -1)
{
connection = new RemotingConnectionImpl(tc, dispatcher, location,
- connectionParams.getCallTimeout(), connectionParams.getPingInterval(), pingExecutor);
+ connectionParams.getCallTimeout(), connectionParams.getPingInterval(), pingExecutor, assembler);
}
else
{
connection = new RemotingConnectionImpl(tc, dispatcher, location,
- connectionParams.getCallTimeout());
+ connectionParams.getCallTimeout(), assembler);
}
remotingConnections.put(tc.getID(), connection);
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -213,6 +213,18 @@
putByte((byte) (val ? -1 : 0));
}
+ public void putBoolean(int pos, boolean b)
+ {
+ if (b)
+ {
+ buf.put(pos, TRUE);
+ }
+ else
+ {
+ buf.put(pos, FALSE);
+ }
+ }
+
public void putByte(byte val)
{
ensureRemaining(1).put(val);
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+/**
+ * A simple in memory cache for message buffers.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class InMemoryMessageCache implements MessageCache
+{
+ Map<Long, Map<Integer, List<MessagingBuffer>>> cache = new HashMap<Long, Map<Integer, List<MessagingBuffer>>>();
+
+ public void cache(MessagingBuffer buffer, int length, long sessionId, int packetId, int correlationID)
+ {
+ getByteList(sessionId, packetId).add(buffer);
+ }
+
+ public MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId)
+ {
+ List<MessagingBuffer> buffers = getByteList(sessionId, packetId);
+ MessagingBuffer buffer = buffers.remove(0);
+ if(buffers.isEmpty())
+ {
+ Map<Integer, List<MessagingBuffer>> sessionMap = cache.get(sessionId);
+ sessionMap.remove(packetId);
+ if(sessionMap.isEmpty())
+ {
+ cache.remove(sessionId);
+ }
+ }
+ return buffer;
+ }
+
+ public void clear(long id)
+ {
+ cache.remove(id);
+ }
+
+
+ protected List<MessagingBuffer> getByteList(long sessionId, int packetId)
+ {
+ if(cache.get(sessionId) == null)
+ {
+ cache.put(sessionId, new HashMap<Integer, List<MessagingBuffer>>());
+ }
+ if(cache.get(sessionId).get(packetId) == null)
+ {
+ cache.get(sessionId).put(packetId, new ArrayList<MessagingBuffer>());
+ }
+ return cache.get(sessionId).get(packetId);
+ }
+
+}
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,160 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.DataConstants;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketAssemblerImpl implements PacketAssembler
+{
+ private static final Logger log = Logger.getLogger(PacketAssemblerImpl.class);
+ Map<Long, Map<Integer, List<PacketFragment>>> allFragments = new HashMap<Long, Map<Integer, List<PacketFragment>>>();
+ private AtomicInteger packetId = new AtomicInteger(0);
+ private int packetFragmentSize;
+ private boolean useMessageChunking = false;
+ private MessageCache messageCache;
+ private int initialPacketFragmentSize;
+
+
+ public PacketAssemblerImpl(final int initialPacketFragmentSize,final int packetFragmentSize,final MessageCache messageCache)
+ {
+ this.packetFragmentSize = packetFragmentSize;
+ this.initialPacketFragmentSize = initialPacketFragmentSize;
+ useMessageChunking = this.packetFragmentSize > 0;
+ this.messageCache = messageCache;
+ }
+
+ public Packet assemble(final MessagingBuffer buffer,final RemotingHandler handler,final long id) throws Exception
+ {
+ int length = buffer.getInt();
+ if (useMessageChunking)
+ {
+ return assembleMulti(buffer, handler, id, length);
+ }
+ else
+ {
+ return assembleSingle(buffer, handler, id, length);
+ }
+ }
+
+
+ public void disAssemble(final MessagingBuffer buffer, final Connection connection, final Packet message)
+ {
+ if (useMessageChunking)
+ {
+ disasembleMulti(buffer, connection, message);
+ }
+ else
+ {
+ disasembleSingle(buffer, connection, message);
+ }
+ }
+
+ public int getInitialPacketFragmentSize()
+ {
+ return initialPacketFragmentSize;
+ }
+
+ public int getPacketFragmentSize()
+ {
+ return packetFragmentSize;
+ }
+
+ private List<PacketFragment> getFragments(final long id, final int packetId)
+ {
+ if(allFragments.get(id) == null)
+ {
+ allFragments.put(id, new HashMap<Integer, List<PacketFragment>>());
+ }
+ if(allFragments.get(id).get(packetId) == null)
+ {
+ allFragments.get(id).put(packetId, new ArrayList<PacketFragment>());
+ }
+ return allFragments.get(id).get(packetId);
+ }
+
+
+ private void disasembleMulti(final MessagingBuffer buffer, final Connection connection, final Packet message)
+ {
+ PacketFragmentBuffer buff = new PacketFragmentBufferImpl(packetId.getAndIncrement(), buffer, packetFragmentSize);
+ message.encode(buff);
+ buff.prepare();
+ buff.flip();
+ for (PacketFragment packetFragment : buff.getPacketFragments())
+ {
+ connection.write(packetFragment.getMessagingBuffer());
+ }
+ }
+
+ private void disasembleSingle(MessagingBuffer buffer, Connection connection, Packet message)
+ {
+ buffer.putInt(-1);
+ message.encode(buffer);
+ //The length doesn't include the actual length byte
+ int len = buffer.position() - DataConstants.SIZE_INT;
+ buffer.putInt(0, len);
+ buffer.flip();
+ connection.write(buffer);
+ }
+
+ private Packet assembleMulti(final MessagingBuffer buffer, final RemotingHandler handler, final long id, final int length)
+ throws Exception
+ {
+ boolean lastPacket = buffer.getBoolean();
+ int packetId = buffer.getInt();
+ int correlationId = buffer.getInt();
+
+ if(lastPacket)
+ {
+ PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, buffer);
+ getFragments(id, fragment.getPacketId()).add(fragment);
+ List<PacketFragment> fragments = allFragments.get(id).remove(fragment.getPacketId());
+ PacketFragmentBuffer buff = new PacketFragmentBufferImpl(fragments, packetFragmentSize);
+ return handler.decode(id, buff);
+ }
+ else
+ {
+ //if we are part of a multi part packet then we need to be cached for later use
+ PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, messageCache,id);
+ getFragments(id, fragment.getPacketId()).add(fragment);
+ messageCache.cache(buffer, length - 9, id, packetId, correlationId);
+ return null;
+ }
+ }
+
+ private Packet assembleSingle(final MessagingBuffer buffer, final RemotingHandler handler, final long id, final int length)
+ throws Exception
+ {
+ return handler.decode(id, buffer);
+ }
+}
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,533 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.PacketFragmentBuffer;
+import static org.jboss.messaging.util.DataConstants.*;
+import org.jboss.messaging.util.SimpleString;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A MessagingBuffer that can be used to break up the buffers into smaller pieces.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentBufferImpl implements PacketFragmentBuffer
+{
+
+ private static final Charset utf8 = Charset.forName("UTF-8");
+
+ private List<PacketFragment> packetFragments;
+ private int currentPos = 0;
+ private int size = 0;
+ private int correlationId = 0;
+ private int packetFragmentSize;
+ private int position = 0;
+
+ public PacketFragmentBufferImpl(int packetId, MessagingBuffer buffer, int packetFragmentSize)
+ {
+ packetFragments = new ArrayList<PacketFragment>();
+ packetFragments.add(new PacketFragmentImpl(packetId, correlationId++, buffer));
+ this.packetFragmentSize = packetFragmentSize;
+ }
+
+ public PacketFragmentBufferImpl(List<PacketFragment> packetFragments, int packetFragmentSize)
+ {
+ if(packetFragments == null || packetFragments.size() == 0)
+ {
+ throw new IllegalArgumentException("packet fragments must not be null or empty");
+ }
+ this.packetFragments = packetFragments;
+ currentPos = 0;
+ this.packetFragmentSize = packetFragmentSize;
+ }
+
+ public void putFloat(float val)
+ {
+ checkWriteSpace(SIZE_FLOAT);
+ getCurrentFragment().putFloat(val);
+ position += SIZE_FLOAT;
+ }
+
+ public void putInt(int i)
+ {
+ checkWriteSpace(SIZE_INT);
+ getCurrentFragment().putInt(i);
+ position += SIZE_INT;
+ }
+
+ public void putByte(byte b)
+ {
+ checkWriteSpace(SIZE_BYTE);
+ getCurrentFragment().putByte(b);
+ position += SIZE_BYTE;
+ }
+
+ public void putLong(long l)
+ {
+ checkWriteSpace(SIZE_LONG);
+ getCurrentFragment().putLong(l);
+ position += SIZE_LONG;
+ }
+
+ public void putBoolean(boolean b)
+ {
+ checkWriteSpace(SIZE_BOOLEAN);
+ getCurrentFragment().putBoolean(b);
+ position += SIZE_BOOLEAN;
+ }
+
+ public void putNullableString(String nullableString)
+ {
+ if (nullableString == null)
+ {
+ putByte(NULL);
+ }
+ else
+ {
+ putByte(NOT_NULL);
+
+ putString(nullableString);
+ }
+ }
+
+ public void putSimpleString(SimpleString string)
+ {
+ byte[] data = string.getData();
+
+ putInt(data.length);
+ putBytes(data);
+ }
+
+ public void putNullableSimpleString(SimpleString string)
+ {
+ if (string == null)
+ {
+ putByte(NULL);
+ }
+ else
+ {
+ putByte(NOT_NULL);
+ putSimpleString(string);
+ }
+ }
+
+ public void putBytes(byte[] bytes, int i, int i1)
+ {
+ if(getCurrentFragment().checkSpace(i1 - i))
+ {
+ getCurrentFragment().putBytes(bytes, i, i1);
+ }
+ else
+ {
+ int size = bytes.length;
+ int written = 0;
+ while(written < i1)
+ {
+ int left = getCurrentFragment().remaining();
+ int towrite = left + written > i1?i1 - written:left;
+ getCurrentFragment().putBytes(bytes, i, towrite);
+ written+=towrite;
+ i += left;
+ if(written < i1)
+ {
+ setNextFragment();
+ }
+ }
+ }
+ position += i1;
+ }
+
+ public void putBytes(byte[] bytes)
+ {
+ putBytes(bytes, 0, bytes.length);
+ }
+
+ public void putShort(short val)
+ {
+ checkWriteSpace(SIZE_SHORT);
+ getCurrentFragment().putShort(val);
+ position += SIZE_SHORT;
+ }
+
+ public void putDouble(double val)
+ {
+ checkWriteSpace(SIZE_DOUBLE);
+ getCurrentFragment().putDouble(val);
+ position += SIZE_DOUBLE;
+ }
+
+ public void putChar(char val)
+ {
+ checkWriteSpace(SIZE_CHAR);
+ getCurrentFragment().putChar(val);
+ position += SIZE_CHAR;
+ }
+
+ public void putUTF(String str)
+ {
+ //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
+ //(putPrefixedString)
+ ByteBuffer bb = utf8.encode(str);
+ putInt(bb.limit() - bb.position());
+ putBytes(bb.array());
+ }
+
+ public void putBoolean(int pos, boolean val)
+ {
+ int currentpos = 0;
+ for (PacketFragment packetFragment : packetFragments)
+ {
+ if(pos < packetFragment.getBodyLength())
+ {
+ packetFragment.putBoolean(pos - currentpos, val);
+ return;
+ }
+ currentpos += packetFragment.getBodyLength();
+ }
+ throw new IndexOutOfBoundsException();
+ }
+
+ public void putInt(int pos, int val)
+ {
+ int currentpos = 0;
+ for (PacketFragment packetFragment : packetFragments)
+ {
+ if(pos < packetFragment.getBodyLength())
+ {
+ packetFragment.putInt(pos - currentpos, val);
+ return;
+ }
+ currentpos += packetFragment.getBodyLength();
+ }
+ throw new IndexOutOfBoundsException();
+ }
+
+
+ public void putString(String string)
+ {
+ putInt(string.length());
+
+ for (int i = 0; i < string.length(); i++)
+ {
+ putChar(string.charAt(i));
+ }
+ }
+
+ public byte[] array()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int capacity()
+ {
+ return 0;
+ }
+
+ public void limit(int limit)
+ {
+
+ }
+
+ public void position(int position)
+ {
+ int currentpos = 0;
+ for (int i = 0; i < packetFragments.size(); i++)
+ {
+ PacketFragment packetFragment = packetFragments.get(i);
+ currentpos += packetFragment.getBodyLength();
+ if (position < packetFragment.getBodyLength())
+ {
+ currentpos = i;
+ this.position = position;
+ packetFragment.position(position - currentpos);
+ return;
+ }
+ }
+ }
+
+ public int position()
+ {
+ return position;
+ }
+
+ public MessagingBuffer slice()
+ {
+ return null;
+ }
+
+ public MessagingBuffer createNewBuffer(int len)
+ {
+ return getCurrentFragment().getMessagingBuffer().createNewBuffer(len);
+ }
+
+
+ public String getString()
+ {
+ int len = getInt();
+
+ char[] chars = new char[len];
+
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = getChar();
+ }
+
+ return new String(chars);
+ }
+
+ public void getBytes(byte[] value, int i, int read)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getUTF() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void flip()
+ {
+ for (PacketFragment packetFragment : packetFragments)
+ {
+ packetFragment.flip();
+ }
+ currentPos = 0;
+ position = 0;
+ }
+
+ public void rewind()
+ {
+ for (PacketFragment packetFragment : packetFragments)
+ {
+ packetFragment.rewind();
+ }
+ currentPos = 0;
+ position = 0;
+ }
+
+
+ public int getInt()
+ {
+ checkReadSpace(SIZE_INT);
+ position += SIZE_INT;
+ return getCurrentFragment().getInt();
+ }
+
+ public long getLong()
+ {
+ checkReadSpace(SIZE_LONG);
+ position += SIZE_LONG;
+ return getCurrentFragment().getLong();
+ }
+
+ public short getShort()
+ {
+ checkReadSpace(SIZE_SHORT);
+ position += SIZE_SHORT;
+ return getCurrentFragment().getShort();
+ }
+
+ public boolean getBoolean()
+ {
+ checkReadSpace(SIZE_BOOLEAN);
+ position += SIZE_BOOLEAN;
+ return getCurrentFragment().getBoolean();
+ }
+
+ public String getNullableString()
+ {
+ byte check = getByte();
+
+ if (check == NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getString();
+ }
+ }
+
+ public SimpleString getSimpleString()
+ {
+ int len = getInt();
+
+ byte[] data = new byte[len];
+ getBytes(data);
+
+ return new SimpleString(data);
+ }
+
+ public SimpleString getNullableSimpleString()
+ {
+ int b = getByte();
+ if (b == NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getSimpleString();
+ }
+ }
+
+ public byte getByte()
+ {
+ checkReadSpace(SIZE_BYTE);
+ position += SIZE_BYTE;
+ return getCurrentFragment().getByte();
+ }
+
+ public void getBytes(byte[] data)
+ {
+ int left = getCurrentFragment().remaining();
+ if(data.length <= left)
+ {
+ getCurrentFragment().getBytes(data);
+ }
+ else
+ {
+ int size = data.length;
+ int read = 0;
+ while (read < size)
+ {
+ left = getCurrentFragment().remaining() <= (size - read)? getCurrentFragment().remaining():size - read;
+
+ getCurrentFragment().getBytes(data, read, left);
+ read += left;
+ if(read < size)
+ {
+ currentPos++;
+ }
+
+ }
+
+ }
+ position += data.length;
+ }
+
+
+ public float getFloat()
+ {
+ checkReadSpace(SIZE_FLOAT);
+ position += SIZE_FLOAT;
+ return getCurrentFragment().getFloat();
+ }
+
+ public double getDouble()
+ {
+ checkReadSpace(SIZE_DOUBLE);
+ position += SIZE_DOUBLE;
+ return getCurrentFragment().getDouble();
+ }
+
+ public char getChar()
+ {
+ checkReadSpace(SIZE_CHAR);
+ position += SIZE_CHAR;
+ return getCurrentFragment().getChar();
+ }
+
+ public List<PacketFragment> getPacketFragments()
+ {
+ return packetFragments;
+ }
+
+ public void prepare()
+ {
+ for (PacketFragment packetFragment : packetFragments)
+ {
+ packetFragment.writeHeaders();
+ }
+ }
+
+ public short getUnsignedByte()
+ {
+ checkReadSpace(SIZE_SHORT);
+ return getCurrentFragment().getUnsignedByte();
+ }
+
+ public int getUnsignedShort()
+ {
+ checkReadSpace(SIZE_INT);
+ return getCurrentFragment().getUnsignedByte();
+ }
+
+ public int remaining()
+ {
+ return getCurrentFragment().remaining();
+ }
+
+ public int limit()
+ {
+ int currentLimit = 0;
+ for (PacketFragment packetFragment : packetFragments)
+ {
+ currentLimit += packetFragment.limit();
+ }
+ return currentLimit;
+ }
+
+ public Object getUnderlyingBuffer()
+ {
+ return null;
+ }
+
+ // Private -------------------------------------------------------
+
+ private void checkReadSpace(int size)
+ {
+ if(!getCurrentFragment().checkSpace(size))
+ {
+ currentPos++;
+ }
+ }
+
+ private void checkWriteSpace(int size)
+ {
+ if(!getCurrentFragment().checkSpace(size))
+ {
+ setNextFragment();
+ }
+ }
+
+ private void setNextFragment()
+ {
+ packetFragments.add(getCurrentFragment().createNextPacketFragment(packetFragmentSize));
+ currentPos++;
+ }
+
+
+ private PacketFragment getCurrentFragment()
+ {
+ return packetFragments.get(currentPos);
+ }
+
+
+}
Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,295 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import static org.jboss.messaging.util.DataConstants.*;
+
+/**
+ * A fragment of a full packet that holds a portion of a full message within a buffer.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentImpl implements PacketFragment
+{
+ private int bodyLength = 0;
+ private int packetId;
+ private int correlationId;
+ private boolean lastPacket;
+ private long sessionId;
+ private int dataStartPosition = -1;
+ private MessagingBuffer messagingBuffer;
+ private MessageCache messageCache;
+ private static final int HEADER_SIZE = (SIZE_INT * 3) + SIZE_BOOLEAN;
+
+ public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessageCache messageCache, long id)
+ {
+ this.packetId = packetId;
+ this.lastPacket = lastPacket;
+ this.correlationId = correlationId;
+ this.messageCache = messageCache;
+ this.bodyLength = length - HEADER_SIZE;
+ this.sessionId = id;
+ dataStartPosition = 0;
+ }
+
+ public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessagingBuffer messagingBuffer)
+ {
+ this.packetId = packetId;
+ this.lastPacket = lastPacket;
+ this.correlationId = correlationId;
+ this.bodyLength = length - HEADER_SIZE;
+ this.messagingBuffer = messagingBuffer;
+ dataStartPosition = messagingBuffer.position();
+ }
+
+ public PacketFragmentImpl(int packetId, int correlationId, MessagingBuffer messagingBuffer)
+ {
+ this.packetId = packetId;
+ this.lastPacket = true;
+ this.correlationId = correlationId;
+ this.messagingBuffer = messagingBuffer;
+ messagingBuffer.putInt(-1);
+ messagingBuffer.putBoolean(true);
+ messagingBuffer.putInt(packetId);
+ messagingBuffer.putInt(correlationId);
+ dataStartPosition = messagingBuffer.position();
+ }
+
+ public int getPacketId()
+ {
+ return packetId;
+ }
+
+ public int getCorrelationId()
+ {
+ return correlationId;
+ }
+
+ public int getLength()
+ {
+ return bodyLength + HEADER_SIZE;
+ }
+
+ public boolean isLastPacket()
+ {
+ return lastPacket;
+ }
+
+ public MessagingBuffer getMessagingBuffer()
+ {
+ if(messagingBuffer == null)
+ {
+ messagingBuffer = messageCache.retrieve(getLength(), sessionId, packetId, correlationId);
+ }
+ return messagingBuffer;
+ }
+
+ public int getEndPosition()
+ {
+ return dataStartPosition + getLength() - HEADER_SIZE;
+ }
+
+ public boolean checkSpace(int size)
+ {
+ int left = getMessagingBuffer().limit() - getMessagingBuffer().position();
+ return size<=left;
+ }
+
+ public boolean hasReadSpace(int size)
+ {
+ return getMessagingBuffer().position() + size <= getMessagingBuffer().limit();
+ }
+
+ public void putFloat(float val)
+ {
+ getMessagingBuffer().putFloat(val);
+ bodyLength += SIZE_FLOAT;
+ }
+
+ public void putInt(int i)
+ {
+ getMessagingBuffer().putInt(i);
+ bodyLength += SIZE_INT;
+ }
+
+ public void putByte(byte b)
+ {
+ getMessagingBuffer().putByte(b);
+ bodyLength += SIZE_BYTE;
+ }
+
+ public void putLong(long l)
+ {
+ getMessagingBuffer().putLong(l);
+ bodyLength += SIZE_LONG;
+ }
+
+ public void putBytes(byte[] bytes, int i, int i1)
+ {
+ getMessagingBuffer().putBytes(bytes, i, i1);
+ bodyLength += i1;
+ }
+
+ public void putDouble(double val)
+ {
+ getMessagingBuffer().putDouble(val);
+ bodyLength += SIZE_DOUBLE;
+ }
+
+ public void putShort(short val)
+ {
+ getMessagingBuffer().putShort(val);
+ bodyLength += SIZE_SHORT;
+ }
+
+ public void putChar(char val)
+ {
+ getMessagingBuffer().putChar(val);
+ bodyLength += SIZE_CHAR;
+ }
+
+ public void putBoolean(boolean b)
+ {
+ getMessagingBuffer().putBoolean(b);
+ bodyLength += SIZE_BOOLEAN;
+ }
+
+ public int getInt()
+ {
+ return getMessagingBuffer().getInt();
+ }
+
+ public long getLong()
+ {
+ return getMessagingBuffer().getLong();
+ }
+
+ public short getShort()
+ {
+ return getMessagingBuffer().getShort();
+ }
+
+ public boolean getBoolean()
+ {
+ return getMessagingBuffer().getBoolean();
+ }
+
+ public byte getByte()
+ {
+ return getMessagingBuffer().getByte();
+ }
+
+ public void getBytes(byte[] data)
+ {
+ getMessagingBuffer().getBytes(data);
+ }
+
+ public void getBytes(byte[] data, int read, int left)
+ {
+ getMessagingBuffer().getBytes(data, read, left);
+ }
+
+ public float getFloat()
+ {
+ return getMessagingBuffer().getFloat();
+ }
+
+ public double getDouble()
+ {
+ return getMessagingBuffer().getDouble();
+ }
+
+ public char getChar()
+ {
+ return getMessagingBuffer().getChar();
+ }
+
+ public short getUnsignedByte()
+ {
+ return getMessagingBuffer().getUnsignedByte();
+ }
+
+ public int getBodyLength()
+ {
+ return bodyLength;
+ }
+
+ public void setBodyLength(int bodyLength)
+ {
+ this.bodyLength = bodyLength;
+ }
+
+ public int remaining()
+ {
+ return getMessagingBuffer().remaining();
+ }
+
+ public int limit()
+ {
+ return getMessagingBuffer().limit() - HEADER_SIZE;
+ }
+
+ public void writeHeaders()
+ {
+ getMessagingBuffer().putInt(0, getLength() - SIZE_INT);
+ //we initially wrote true anyway
+ if(!isLastPacket())
+ {
+ getMessagingBuffer().putBoolean(SIZE_INT, isLastPacket());
+ }
+ }
+
+ public PacketFragment createNextPacketFragment(int packetFragmentSize)
+ {
+ lastPacket = false;
+ return new PacketFragmentImpl(packetId, correlationId + 1,
+ getMessagingBuffer().createNewBuffer(packetFragmentSize));
+ }
+
+ public void putBoolean(int pos, boolean val)
+ {
+ getMessagingBuffer().putBoolean(pos + HEADER_SIZE, val);
+ }
+
+ public void putInt(int pos, int val)
+ {
+ getMessagingBuffer().putInt(pos + HEADER_SIZE, val);
+ }
+
+ public void rewind()
+ {
+ getMessagingBuffer().rewind();
+ }
+
+ public void flip()
+ {
+ getMessagingBuffer().flip();
+ }
+
+ public void position(int position)
+ {
+ getMessagingBuffer().position(position);
+ }
+}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -31,12 +31,7 @@
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
@@ -82,13 +77,16 @@
private volatile boolean destroyed;
+ private PacketAssembler packetAssembler;
+
// Constructors
// ---------------------------------------------------------------------------------
public RemotingConnectionImpl(final Connection transportConnection,
final PacketDispatcher dispatcher, final Location location,
final long blockingCallTimeout, final long pingPeriod,
- final ScheduledExecutorService pingExecutor)
+ final ScheduledExecutorService pingExecutor,
+ final PacketAssembler packetAssembler)
{
this.transportConnection = transportConnection;
@@ -98,6 +96,8 @@
this.blockingCallTimeout = blockingCallTimeout;
+ this.packetAssembler = packetAssembler;
+
pinger = new Pinger();
pongHandler = new PongHandler(dispatcher.generateID());
@@ -106,11 +106,12 @@
future = pingExecutor.scheduleWithFixedDelay(pinger, pingPeriod, pingPeriod,
TimeUnit.MILLISECONDS);
+
}
public RemotingConnectionImpl(final Connection transportConnection,
final PacketDispatcher dispatcher, final Location location,
- final long blockingCallTimeout)
+ final long blockingCallTimeout, PacketAssembler packetAssembler)
{
this.transportConnection = transportConnection;
@@ -119,6 +120,8 @@
this.location = location;
this.blockingCallTimeout = blockingCallTimeout;
+
+ this.packetAssembler = packetAssembler;
}
// Public
@@ -300,11 +303,9 @@
throw new IllegalStateException("Cannot write packet to connection, it is destroyed");
}
- MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+ MessagingBuffer buffer = transportConnection.createBuffer(packetAssembler.getInitialPacketFragmentSize());
- packet.encode(buffer);
-
- transportConnection.write(buffer);
+ packetAssembler.disAssemble(buffer, transportConnection, packet);
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -39,10 +39,7 @@
import java.util.concurrent.ExecutorService;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
@@ -105,7 +102,9 @@
private final ConcurrentMap<Long, Long> lastPings = new ConcurrentHashMap<Long, Long>();
- public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService)
+ private PacketAssembler packetAssembler;
+
+ public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService, final PacketAssembler packetAssembler)
{
if (dispatcher == null)
{
@@ -122,6 +121,7 @@
{
executorFactory = null;
}
+ this.packetAssembler = packetAssembler;
}
public Set<Long> scanForFailedConnections(final long expirePeriod)
@@ -145,7 +145,12 @@
public void bufferReceived(final long connectionID, final MessagingBuffer buffer) throws Exception
{
- final Packet packet = decode(connectionID, buffer);
+ final Packet packet = packetAssembler.assemble(buffer, this, connectionID);
+
+ if(packet == null)
+ {
+ return;
+ }
if (executorFactory != null)
{
@@ -199,7 +204,7 @@
{
return -1;
}
-
+
return length;
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -36,12 +36,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.RemotingHandler;
-import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.spi.Acceptor;
import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
import org.jboss.messaging.core.remoting.spi.Connection;
@@ -83,7 +78,10 @@
private final Timer failedConnectionTimer = new Timer(true);
private TimerTask failedConnectionsTask;
-
+
+ private PacketAssembler packetAssembler;
+ private MessageCache messageCache;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -97,9 +95,12 @@
dispatcher = new PacketDispatcherImpl(null);
remotingExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-session-ordering-threads"));
+
+ messageCache = new InMemoryMessageCache();
+
+ packetAssembler = new PacketAssemblerImpl(config.getInitialPacketFragmentSize(), config.getPacketFragmentSize(), messageCache);
+ handler = new RemotingHandlerImpl(dispatcher, remotingExecutor, packetAssembler);
- handler = new RemotingHandlerImpl(dispatcher, remotingExecutor);
-
long pingPeriod = config.getConnectionParams().getPingInterval();
if (pingPeriod != -1)
@@ -232,7 +233,7 @@
public void connectionCreated(final Connection connection)
{
RemotingConnection rc =
- new RemotingConnectionImpl(connection, dispatcher, null, config.getConnectionParams().getCallTimeout());
+ new RemotingConnectionImpl(connection, dispatcher, null, config.getConnectionParams().getCallTimeout(), packetAssembler);
this.connections.put(connection.getID(), rc);
}
@@ -240,7 +241,9 @@
public void connectionDestroyed(long connectionID)
{
handler.removeLastPing(connectionID);
-
+
+ messageCache.clear(connectionID);
+
if (connections.remove(connectionID) == null)
{
throw new IllegalStateException("Cannot find connection with id " + connectionID);
@@ -250,6 +253,8 @@
public void connectionException(long connectionID, MessagingException me)
{
RemotingConnection rc = connections.remove(connectionID);
+
+ messageCache.clear(connectionID);
if (rc == null)
{
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -242,6 +242,18 @@
}
}
+ public void putBoolean(int pos, boolean b)
+ {
+ if (b)
+ {
+ buffer.put(pos, TRUE);
+ }
+ else
+ {
+ buffer.put(pos, FALSE);
+ }
+ }
+
public boolean getBoolean()
{
byte b = buffer.get();
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -84,15 +84,15 @@
public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
{
//TODO - we can avoid this entirely if we maintain fragmented packets in the handler
-
+
int start = in.position();
int length = handler.isReadyToHandle(new IoBufferWrapper(in));
-
+
+ in.position(start);
+
if (length == -1)
- {
- in.position(start);
-
+ {
return false;
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-07-29 13:42:17 UTC (rev 4744)
@@ -189,7 +189,6 @@
public void encode(MessagingBuffer buffer)
{
//The standard header fields
- buffer.putInt(0); //The length gets filled in at the end
buffer.putByte(type);
buffer.putInt(commandID);
buffer.putLong(responseTargetID);
@@ -197,13 +196,6 @@
buffer.putLong(executorID);
encodeBody(buffer);
-
- //The length doesn't include the actual length byte
- int len = buffer.position() - DataConstants.SIZE_INT;
-
- buffer.putInt(0, len);
-
- buffer.flip();
}
public void decode(final MessagingBuffer buffer)
More information about the jboss-cvs-commits
mailing list