[jboss-cvs] JBoss Messaging SVN: r4754 - in branches/Branch_Message_Chunking_new: src/main/org/jboss/messaging/core/remoting and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 31 03:37:21 EDT 2008
Author: ataylor
Date: 2008-07-31 03:37:20 -0400 (Thu, 31 Jul 2008)
New Revision: 4754
Added:
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentCache.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryPacketFragmentCache.java
Removed:
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
Modified:
branches/Branch_Message_Chunking_new/build-thirdparty.xml
branches/Branch_Message_Chunking_new/messaging.ipr
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingConnectionImplTest.java
branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java
branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionImplTest.java
branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingHandlerImplTest.java
branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java
branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java
branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java
Log:
migrated to latest Mina and some tidying up
Modified: branches/Branch_Message_Chunking_new/build-thirdparty.xml
===================================================================
--- branches/Branch_Message_Chunking_new/build-thirdparty.xml 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/build-thirdparty.xml 2008-07-31 07:37:20 UTC (rev 4754)
@@ -98,7 +98,7 @@
<componentref name="jboss/jbosssx-client" version="2.0.1.GA"/>
<componentref name="jboss/jboss-javaee" version="5.0.0.Beta3"/>
<componentref name="jboss/jboss-common-logging-spi" version="2.0.4.GA"/>
- <componentref name="apache-mina" version="2.0.0-M2-20080520.004618-19"/>
+ <componentref name="apache-mina" version="2.0.0-M3-20080730.120633-1"/>
<componentref name="netty" version="3.0.0.M4"/>
<componentref name="slf4j/log4j" version="1.4.3"/>
<componentref name="jpa-api" version="1.0.0.GA"/>
Modified: branches/Branch_Message_Chunking_new/messaging.ipr
===================================================================
--- branches/Branch_Message_Chunking_new/messaging.ipr 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/messaging.ipr 2008-07-31 07:37:20 UTC (rev 4754)
@@ -456,13 +456,13 @@
<root url="file://$PROJECT_DIR$/tests/config" />
<root url="file://$PROJECT_DIR$/tests/jms-tests/config" />
<root url="file://$PROJECT_DIR$/src/config" />
- <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080520.004618-19.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/easymock/lib/easymock.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/easymock-classextension/lib/easymockclassextension.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/cglib/lib/cglib.jar!/" />
<root url="jar://$PROJECT_DIR$/tools/lib/jbossbuild.jar!/" />
<root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant-junit.jar!/" />
<root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
Deleted: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -1,13 +0,0 @@
-package org.jboss.messaging.core.remoting;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public interface MessageCache
-{
- void cache(MessagingBuffer buffer, int length, long sessionId, int packetId, int correlationId);
-
- MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId);
-
- void clear(long id);
-}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -3,11 +3,13 @@
import org.jboss.messaging.core.remoting.spi.Connection;
/**
+ * Assembles and dissasembles packets to and from PAcket Fragments before sending and receiving.
+ *
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public interface PacketAssembler
{
- Packet assemble(MessagingBuffer buffer, RemotingHandler handler, long id) throws Exception;
+ Packet assemble(MessagingBuffer buffer, RemotingHandler handler, long connectionID) throws Exception;
void disAssemble(MessagingBuffer buffer, Connection connection, Packet message);
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -2,6 +2,8 @@
/**
+ * A Packet Fragment. Holds info and the contents of a fragment of an encoded Packet.
+ *
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public interface PacketFragment
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -24,6 +24,8 @@
import java.util.List;
/**
+ * Extends a MessagingBuffer to allow acces to th eunderlying PAcket Fragments.
+ *
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public interface PacketFragmentBuffer extends MessagingBuffer
Copied: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentCache.java (from rev 4744, branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java)
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentCache.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentCache.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -0,0 +1,15 @@
+package org.jboss.messaging.core.remoting;
+
+/**
+ * Used to cache the contents of Packet Fragments until needed.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragmentCache
+{
+ void cache(MessagingBuffer buffer, int length, long connectionID, int packetId, int correlationId);
+
+ MessagingBuffer retrieve(int length, long connectionID, int packetId, int correlationId);
+
+ void clear(long connectionID);
+}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,20 +22,20 @@
package org.jboss.messaging.core.remoting.impl;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.Connector;
import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
-import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.util.JBMThreadFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -84,9 +84,9 @@
}
else
{
- MessageCache messageCache = new InMemoryMessageCache();
+ PacketFragmentCache packetFragmentCache = new InMemoryPacketFragmentCache();
- PacketAssembler assembler = new PacketAssemblerImpl(connectionParams.getInitialPacketFragmentSize(), connectionParams.getPacketFragmentSize(), messageCache);
+ PacketAssembler assembler = new PacketAssemblerImpl(connectionParams.getInitialPacketFragmentSize(), connectionParams.getPacketFragmentSize(), packetFragmentCache);
PacketDispatcher dispatcher = new PacketDispatcherImpl(null);
Deleted: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.remoting.impl;
-
-import org.jboss.messaging.core.remoting.MessageCache;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.List;
-import java.util.HashMap;
-import java.util.ArrayList;
-
-/**
- * A simple in memory cache for message buffers.
- *
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class InMemoryMessageCache implements MessageCache
-{
- Map<Long, Map<Integer, List<MessagingBuffer>>> cache = new HashMap<Long, Map<Integer, List<MessagingBuffer>>>();
-
- public void cache(MessagingBuffer buffer, int length, long sessionId, int packetId, int correlationID)
- {
- getByteList(sessionId, packetId).add(buffer);
- }
-
- public MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId)
- {
- List<MessagingBuffer> buffers = getByteList(sessionId, packetId);
- MessagingBuffer buffer = buffers.remove(0);
- if(buffers.isEmpty())
- {
- Map<Integer, List<MessagingBuffer>> sessionMap = cache.get(sessionId);
- sessionMap.remove(packetId);
- if(sessionMap.isEmpty())
- {
- cache.remove(sessionId);
- }
- }
- return buffer;
- }
-
- public void clear(long id)
- {
- cache.remove(id);
- }
-
-
- protected List<MessagingBuffer> getByteList(long sessionId, int packetId)
- {
- if(cache.get(sessionId) == null)
- {
- cache.put(sessionId, new HashMap<Integer, List<MessagingBuffer>>());
- }
- if(cache.get(sessionId).get(packetId) == null)
- {
- cache.get(sessionId).put(packetId, new ArrayList<MessagingBuffer>());
- }
- return cache.get(sessionId).get(packetId);
- }
-
-}
Copied: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryPacketFragmentCache.java (from rev 4744, branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java)
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryPacketFragmentCache.java (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryPacketFragmentCache.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -0,0 +1,103 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.PacketFragmentCache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple in memory cache for message buffers.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class InMemoryPacketFragmentCache implements PacketFragmentCache
+{
+ Map<Long, Map<Integer, List<MessagingBuffer>>> cache = new HashMap<Long, Map<Integer, List<MessagingBuffer>>>();
+
+ /**
+ * adds a buffer to a list for later use. We make an assumption that theunderlying bytes don't get modified before
+ * they are needed.
+ *
+ * @param buffer the buffer to cache.
+ * @param length the length of the buffer.
+ * @param connectionID the connection id.
+ * @param packetId the Packet ID.
+ * @param correlationID The correlation ID.
+ */
+ public void cache(MessagingBuffer buffer, int length, long connectionID, int packetId, int correlationID)
+ {
+ getByteList(connectionID, packetId).add(buffer);
+ }
+
+ /**
+ * retrieves the first buffer from the list and removes it.
+ * @param length The length of the buffer.
+ * @param connectionID The Connection ID.
+ * @param packetId the Packet ID.
+ * @param correlationId The Correlation ID.
+ * @return the appropriate Messaging Buffer.
+ */
+ public MessagingBuffer retrieve(int length, long connectionID, int packetId, int correlationId)
+ {
+ List<MessagingBuffer> buffers = getByteList(connectionID, packetId);
+ MessagingBuffer buffer = buffers.remove(0);
+ if(buffers.isEmpty())
+ {
+ Map<Integer, List<MessagingBuffer>> sessionMap = cache.get(connectionID);
+ sessionMap.remove(packetId);
+ if(sessionMap.isEmpty())
+ {
+ cache.remove(connectionID);
+ }
+ }
+ return buffer;
+ }
+
+ /**
+ * clears the cache for a specific Connection.
+ * @param connectionID The connection ID.
+ */
+ public void clear(long connectionID)
+ {
+ cache.remove(connectionID);
+ }
+
+
+ protected List<MessagingBuffer> getByteList(long sessionId, int packetId)
+ {
+ if(cache.get(sessionId) == null)
+ {
+ cache.put(sessionId, new HashMap<Integer, List<MessagingBuffer>>());
+ }
+ if(cache.get(sessionId).get(packetId) == null)
+ {
+ cache.get(sessionId).put(packetId, new ArrayList<MessagingBuffer>());
+ }
+ return cache.get(sessionId).get(packetId);
+ }
+
+}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,15 +21,15 @@
*/
package org.jboss.messaging.core.remoting.impl;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.util.DataConstants;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.List;
-import java.util.HashMap;
-import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -42,28 +42,28 @@
private AtomicInteger packetId = new AtomicInteger(0);
private int packetFragmentSize;
private boolean useMessageChunking = false;
- private MessageCache messageCache;
+ private PacketFragmentCache packetFragmentCache;
private int initialPacketFragmentSize;
+ private List<Float> longs = new ArrayList<Float>();
-
- public PacketAssemblerImpl(final int initialPacketFragmentSize,final int packetFragmentSize,final MessageCache messageCache)
+ public PacketAssemblerImpl(final int initialPacketFragmentSize,final int packetFragmentSize,final PacketFragmentCache packetFragmentCache)
{
this.packetFragmentSize = packetFragmentSize;
this.initialPacketFragmentSize = initialPacketFragmentSize;
useMessageChunking = this.packetFragmentSize > 0;
- this.messageCache = messageCache;
+ this.packetFragmentCache = packetFragmentCache;
}
- public Packet assemble(final MessagingBuffer buffer,final RemotingHandler handler,final long id) throws Exception
+ public Packet assemble(final MessagingBuffer buffer,final RemotingHandler handler,final long connectionID) throws Exception
{
int length = buffer.getInt();
if (useMessageChunking)
{
- return assembleMulti(buffer, handler, id, length);
+ return assembleMulti(buffer, handler, connectionID, length);
}
else
{
- return assembleSingle(buffer, handler, id, length);
+ return assembleSingle(buffer, handler, connectionID, length);
}
}
@@ -127,7 +127,7 @@
connection.write(buffer);
}
- private Packet assembleMulti(final MessagingBuffer buffer, final RemotingHandler handler, final long id, final int length)
+ private Packet assembleMulti(final MessagingBuffer buffer, final RemotingHandler handler, final long connectionID, final int length)
throws Exception
{
boolean lastPacket = buffer.getBoolean();
@@ -137,24 +137,25 @@
if(lastPacket)
{
PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, buffer);
- getFragments(id, fragment.getPacketId()).add(fragment);
- List<PacketFragment> fragments = allFragments.get(id).remove(fragment.getPacketId());
+ getFragments(connectionID, fragment.getPacketId()).add(fragment);
+ List<PacketFragment> fragments = allFragments.get(connectionID).remove(fragment.getPacketId());
PacketFragmentBuffer buff = new PacketFragmentBufferImpl(fragments, packetFragmentSize);
- return handler.decode(id, buff);
+ return handler.decode(connectionID, buff);
}
else
{
//if we are part of a multi part packet then we need to be cached for later use
- PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, messageCache,id);
- getFragments(id, fragment.getPacketId()).add(fragment);
- messageCache.cache(buffer, length - 9, id, packetId, correlationId);
+ //PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, packetFragmentCache,connectionID);
+ PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, buffer);
+ getFragments(connectionID, fragment.getPacketId()).add(fragment);
+ //packetFragmentCache.cache(buffer, length - 9, connectionID, packetId, correlationId);
return null;
}
}
- private Packet assembleSingle(final MessagingBuffer buffer, final RemotingHandler handler, final long id, final int length)
+ private Packet assembleSingle(final MessagingBuffer buffer, final RemotingHandler handler, final long connectionID, final int length)
throws Exception
{
- return handler.decode(id, buffer);
+ return handler.decode(connectionID, buffer);
}
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,8 +21,8 @@
*/
package org.jboss.messaging.core.remoting.impl;
-import org.jboss.messaging.core.remoting.PacketFragment;
import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.PacketFragment;
import org.jboss.messaging.core.remoting.PacketFragmentBuffer;
import static org.jboss.messaging.util.DataConstants.*;
import org.jboss.messaging.util.SimpleString;
@@ -44,7 +44,6 @@
private List<PacketFragment> packetFragments;
private int currentPos = 0;
- private int size = 0;
private int correlationId = 0;
private int packetFragmentSize;
private int position = 0;
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,9 +21,9 @@
*/
package org.jboss.messaging.core.remoting.impl;
-import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.PacketFragment;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.PacketFragmentCache;
import static org.jboss.messaging.util.DataConstants.*;
/**
@@ -40,15 +40,15 @@
private long sessionId;
private int dataStartPosition = -1;
private MessagingBuffer messagingBuffer;
- private MessageCache messageCache;
+ private PacketFragmentCache packetFragmentCache;
private static final int HEADER_SIZE = (SIZE_INT * 3) + SIZE_BOOLEAN;
- public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessageCache messageCache, long id)
+ public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, PacketFragmentCache packetFragmentCache, long id)
{
this.packetId = packetId;
this.lastPacket = lastPacket;
this.correlationId = correlationId;
- this.messageCache = messageCache;
+ this.packetFragmentCache = packetFragmentCache;
this.bodyLength = length - HEADER_SIZE;
this.sessionId = id;
dataStartPosition = 0;
@@ -101,7 +101,7 @@
{
if(messagingBuffer == null)
{
- messagingBuffer = messageCache.retrieve(getLength(), sessionId, packetId, correlationId);
+ messagingBuffer = packetFragmentCache.retrieve(getLength(), sessionId, packetId, correlationId);
}
return messagingBuffer;
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,26 +22,21 @@
package org.jboss.messaging.core.remoting.impl;
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.*;
+import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
import org.jboss.messaging.core.remoting.spi.Acceptor;
import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.util.JBMThreadFactory;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
@@ -80,7 +75,7 @@
private TimerTask failedConnectionsTask;
private PacketAssembler packetAssembler;
- private MessageCache messageCache;
+ private PacketFragmentCache packetFragmentCache;
// Static --------------------------------------------------------
@@ -96,9 +91,9 @@
remotingExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-session-ordering-threads"));
- messageCache = new InMemoryMessageCache();
+ packetFragmentCache = new InMemoryPacketFragmentCache();
- packetAssembler = new PacketAssemblerImpl(config.getInitialPacketFragmentSize(), config.getPacketFragmentSize(), messageCache);
+ packetAssembler = new PacketAssemblerImpl(config.getInitialPacketFragmentSize(), config.getPacketFragmentSize(), packetFragmentCache);
handler = new RemotingHandlerImpl(dispatcher, remotingExecutor, packetAssembler);
long pingPeriod = config.getConnectionParams().getPingInterval();
@@ -242,7 +237,7 @@
{
handler.removeLastPing(connectionID);
- messageCache.clear(connectionID);
+ packetFragmentCache.clear(connectionID);
if (connections.remove(connectionID) == null)
{
@@ -254,7 +249,7 @@
{
RemotingConnection rc = connections.remove(connectionID);
- messageCache.clear(connectionID);
+ packetFragmentCache.clear(connectionID);
if (rc == null)
{
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,14 +22,14 @@
package org.jboss.messaging.core.remoting.impl.mina;
-import javax.net.ssl.SSLContext;
-
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.remoting.RemotingHandler;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
+import javax.net.ssl.SSLContext;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,17 +22,13 @@
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
-
-import java.nio.charset.Charset;
-
-import org.apache.mina.common.IoBuffer;
+import org.apache.mina.core.buffer.IoBuffer;
import org.jboss.messaging.core.remoting.MessagingBuffer;
+import static org.jboss.messaging.util.DataConstants.*;
import org.jboss.messaging.util.SimpleString;
+import java.nio.charset.Charset;
+
/**
*
* A BufferWrapper
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -19,6 +19,12 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.session.IoSessionAttributeMap;
+import org.apache.mina.core.session.IoSessionDataStructureFactory;
+import org.apache.mina.core.write.WriteRequest;
+import org.apache.mina.core.write.WriteRequestQueue;
+
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
@@ -26,12 +32,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionAttributeMap;
-import org.apache.mina.common.IoSessionDataStructureFactory;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteRequestQueue;
-
/**
*
* A MessagingIOSessionDataStructureFactory
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,15 +22,13 @@
package org.jboss.messaging.core.remoting.impl.mina;
-import java.net.InetSocketAddress;
-
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.service.IoServiceListener;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.messaging.core.config.Configuration;
@@ -41,6 +39,8 @@
import org.jboss.messaging.core.remoting.spi.Acceptor;
import org.jboss.messaging.core.remoting.spi.Connection;
+import java.net.InetSocketAddress;
+
/**
* A Mina TCP Acceptor that supports SSL
*
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,8 +22,8 @@
package org.jboss.messaging.core.remoting.impl.mina;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.MessagingBuffer;
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,16 +21,14 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import java.net.InetSocketAddress;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.service.IoServiceListener;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketConnector;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
@@ -40,9 +38,11 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.Connector;
-import org.jboss.messaging.core.remoting.spi.Connection;
+import java.net.InetSocketAddress;
+
/**
*
* A MinaConnector
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,18 +22,12 @@
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
-
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.*;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.RemotingHandler;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
/**
* A Mina ProtocolEncoder used to encode/decode messages.
@@ -45,7 +39,7 @@
implements ProtocolEncoder, ProtocolCodecFactory
{
private static final Logger log = Logger.getLogger(MinaProtocolCodecFilter.class);
-
+
private final RemotingHandler handler;
public MinaProtocolCodecFilter(final RemotingHandler handler)
@@ -101,11 +95,11 @@
copied.put(in);
copied.setAutoExpand(true);
copied.flip();
-
+
in.position(start + length + SIZE_INT);
-
+
out.write(copied);
-
+
return true;
}
}
Modified: branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingConnectionImplTest.java
===================================================================
--- branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingConnectionImplTest.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingConnectionImplTest.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,34 +21,28 @@
*/
package org.jboss.messaging.tests.timing.core.remoting.impl;
-import static org.easymock.EasyMock.getCurrentArguments;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.easymock.EasyMock;
+import static org.easymock.EasyMock.getCurrentArguments;
import org.easymock.IAnswer;
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.client.impl.LocationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.PacketAssemblerImpl;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.tests.util.UnitTestCase;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
*
* A RemotingConnectionImplTest
@@ -122,7 +116,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, pingPeriod, ex);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, pingPeriod, ex, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
@@ -173,7 +167,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, pingPeriod, ex);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, pingPeriod, ex, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
@@ -264,7 +258,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, pingPeriod, ex);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, pingPeriod, ex, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
@@ -311,7 +305,7 @@
final long callTimeout = 100;
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, callTimeout);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, callTimeout, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
@@ -419,7 +413,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, callTimeout);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, callTimeout, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
Modified: branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java
===================================================================
--- branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,19 +21,20 @@
*/
package org.jboss.messaging.tests.timing.core.remoting.impl;
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
import org.easymock.EasyMock;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.PacketAssemblerImpl;
import org.jboss.messaging.core.remoting.impl.RemotingHandlerImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.tests.util.UnitTestCase;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
/**
*
* A RemotingHandlerImplTest
@@ -53,7 +54,7 @@
super.setUp();
dispatcher = EasyMock.createStrictMock(PacketDispatcher.class);
executorService = EasyMock.createStrictMock(ExecutorService.class);
- handler = new RemotingHandlerImpl(dispatcher, executorService);
+ handler = new RemotingHandlerImpl(dispatcher, executorService, new PacketAssemblerImpl(1024, 1024, null));
buff = new ByteBufferWrapper(ByteBuffer.allocate(1024));
}
Modified: branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionImplTest.java
===================================================================
--- branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionImplTest.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionImplTest.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,33 +22,23 @@
package org.jboss.messaging.tests.unit.core.remoting.impl;
-import static org.easymock.EasyMock.getCurrentArguments;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
import org.easymock.EasyMock;
+import static org.easymock.EasyMock.getCurrentArguments;
import org.easymock.IAnswer;
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.client.impl.LocationImpl;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.PacketAssemblerImpl;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.tests.util.UnitTestCase;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+
/**
*
* A RemotingConnectionImplTest
@@ -67,7 +57,7 @@
final long id = 12123;
EasyMock.expect(connection.getID()).andReturn(id);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
EasyMock.replay(connection, dispatcher);
@@ -98,7 +88,7 @@
EasyMock.replay(connection, dispatcher, packet, buff);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
rc.sendOneWay(targetID, executorID, packet);
@@ -122,7 +112,7 @@
EasyMock.replay(connection, dispatcher, packet, buff);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
rc.sendOneWay(packet);
@@ -139,7 +129,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
@@ -204,7 +194,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
@@ -247,7 +237,7 @@
EasyMock.replay(connection, dispatcher, buff);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
MessagingBuffer buff2 = rc.createBuffer(size);
@@ -260,7 +250,7 @@
PacketDispatcher dispatcher = EasyMock.createStrictMock(PacketDispatcher.class);
Location location = new LocationImpl(TransportType.TCP, "blah", 1234);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
assertTrue(location == rc.getLocation());
}
@@ -271,7 +261,7 @@
PacketDispatcher dispatcher = EasyMock.createStrictMock(PacketDispatcher.class);
Location location = new LocationImpl(TransportType.TCP, "blah", 1234);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
assertTrue(dispatcher == rc.getPacketDispatcher());
}
@@ -286,7 +276,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000, new PacketAssemblerImpl(1024, 1024, null));
rc.destroy();
@@ -328,7 +318,7 @@
EasyMock.replay(connection, dispatcher, ex);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, pingPeriod, pingPeriod, ex);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, pingPeriod, pingPeriod, ex, new PacketAssemblerImpl(1024, 1024, null));
rc.destroy();
@@ -401,7 +391,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
@@ -495,7 +485,7 @@
EasyMock.replay(connection, dispatcher);
- RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L);
+ RemotingConnection rc = new RemotingConnectionImpl(connection, dispatcher, location, 1000L, new PacketAssemblerImpl(1024, 1024, null));
class Listener implements FailureListener
{
Modified: branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingHandlerImplTest.java
===================================================================
--- branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingHandlerImplTest.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingHandlerImplTest.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,12 +21,6 @@
*/
package org.jboss.messaging.tests.unit.core.remoting.impl;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.concurrent.ExecutorService;
-
-import javax.transaction.xa.Xid;
-
import org.easymock.EasyMock;
import org.jboss.messaging.core.client.impl.ClientMessageImpl;
import org.jboss.messaging.core.exception.MessagingException;
@@ -34,47 +28,9 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.PacketAssemblerImpl;
import org.jboss.messaging.core.remoting.impl.RemotingHandlerImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerSendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReceiveMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.core.transaction.impl.XidImpl;
@@ -83,6 +39,11 @@
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.Xid;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+
/**
*
* A RemotingHandlerImplTest
@@ -103,7 +64,7 @@
super.setUp();
dispatcher = EasyMock.createStrictMock(PacketDispatcher.class);
executorService = EasyMock.createStrictMock(ExecutorService.class);
- handler = new RemotingHandlerImpl(dispatcher, executorService);
+ handler = new RemotingHandlerImpl(dispatcher, executorService, new PacketAssemblerImpl(1024, 1024, null));
buff = new ByteBufferWrapper(ByteBuffer.allocate(1024));
}
@@ -153,7 +114,7 @@
public void testBufferReceivedNoExecutor() throws Exception
{
- handler = new RemotingHandlerImpl(dispatcher, null);
+ handler = new RemotingHandlerImpl(dispatcher, null, new PacketAssemblerImpl(1024, 1024, null));
Packet packet = new PacketImpl(PacketImpl.CLOSE);
Modified: branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java
===================================================================
--- branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,7 +22,7 @@
package org.jboss.messaging.tests.unit.core.remoting.impl.mina;
-import org.apache.mina.common.IoBuffer;
+import org.apache.mina.core.buffer.IoBuffer;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.tests.unit.core.remoting.MessagingBufferTestBase;
Modified: branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java
===================================================================
--- branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -22,15 +22,12 @@
package org.jboss.messaging.tests.unit.core.remoting.impl.mina;
-import static org.easymock.EasyMock.createStrictMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import junit.framework.TestCase;
-
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionAttributeMap;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.session.IoSessionAttributeMap;
+import static org.easymock.EasyMock.*;
import org.jboss.messaging.core.remoting.impl.mina.MessagingIOSessionDataStructureFactory;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
Modified: branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java
===================================================================
--- branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java 2008-07-30 16:16:18 UTC (rev 4753)
+++ branches/Branch_Message_Chunking_new/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java 2008-07-31 07:37:20 UTC (rev 4754)
@@ -21,7 +21,7 @@
*/
package org.jboss.messaging.tests.unit.core.remoting.impl.mina;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.session.IoSession;
import org.easymock.EasyMock;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnection;
More information about the jboss-cvs-commits
mailing list