[jboss-cvs] JBoss Messaging SVN: r4749 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/mina and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 30 08:27:15 EDT 2008
Author: trustin
Date: 2008-07-30 08:27:15 -0400 (Wed, 30 Jul 2008)
New Revision: 4749
Modified:
trunk/.classpath
trunk/build-thirdparty.xml
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java
Log:
upgraded to the latest MINA snapshot
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/.classpath 2008-07-30 12:27:15 UTC (rev 4749)
@@ -58,10 +58,10 @@
<classpathentry kind="lib" path="thirdparty/jboss/jbosssx-client/lib/jbosssx-client.jar"/>
<classpathentry kind="lib" path="thirdparty/easymock/lib/easymock.jar" sourcepath="thirdparty/easymock/lib/src.zip"/>
<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
- <classpathentry exported="true" kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080520.004618-19.jar" sourcepath="thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080520.004618-19-sources.jar"/>
<classpathentry kind="lib" path="tests/jms-tests/config"/>
<classpathentry kind="lib" path="thirdparty/easymock-classextension/lib/easymockclassextension.jar"/>
<classpathentry kind="lib" path="thirdparty/cglib/lib/cglib.jar"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
+ <classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1.jar" sourcepath="thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1-sources.jar"/>
<classpathentry kind="output" path="eclipse-output"/>
</classpath>
Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/build-thirdparty.xml 2008-07-30 12:27:15 UTC (rev 4749)
@@ -98,8 +98,7 @@
<componentref name="jboss/jbosssx-client" version="2.0.1.GA"/>
<componentref name="jboss/jboss-javaee" version="5.0.0.Beta3"/>
<componentref name="jboss/jboss-common-logging-spi" version="2.0.4.GA"/>
- <componentref name="apache-mina" version="2.0.0-M2-20080520.004618-19"/>
- <componentref name="netty" version="3.0.0.M4"/>
+ <componentref name="apache-mina" version="2.0.0-M3-20080730.120633-1"/>
<componentref name="slf4j/log4j" version="1.4.3"/>
<componentref name="jpa-api" version="1.0.0.GA"/>
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -24,7 +24,7 @@
import javax.net.ssl.SSLContext;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.remoting.RemotingHandler;
@@ -44,11 +44,11 @@
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
+
private FilterChainSupport()
- {
+ {
}
-
+
// Public --------------------------------------------------------
public static void addCodecFilter(final DefaultIoFilterChainBuilder filterChain,
@@ -72,9 +72,9 @@
filter.setUseClientMode(true);
filter.setWantClientAuth(true);
}
- filterChain.addLast("ssl", filter);
+ filterChain.addLast("ssl", filter);
}
-
+
// Package protected ---------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -18,25 +18,22 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
+import static org.jboss.messaging.util.DataConstants.*;
import java.nio.charset.Charset;
-import org.apache.mina.common.IoBuffer;
+import org.apache.mina.core.buffer.IoBuffer;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
/**
- *
+ *
* A BufferWrapper
- *
+ *
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -46,7 +43,7 @@
// Constants -----------------------------------------------------
private static final Charset utf8 = Charset.forName("UTF-8");
-
+
// Attributes ----------------------------------------------------
private final IoBuffer buffer;
@@ -58,15 +55,15 @@
public IoBufferWrapper(final int size)
{
buffer = IoBuffer.allocate(size);
-
+
buffer.setAutoExpand(true);
}
-
+
public IoBufferWrapper(final IoBuffer buffer)
{
this.buffer = buffer;
}
-
+
// Public --------------------------------------------------------
// MessagingBuffer implementation ----------------------------------------------
@@ -75,37 +72,37 @@
{
return buffer.array();
}
-
+
public int position()
{
return buffer.position();
}
-
+
public void position(final int position)
{
buffer.position(position);
}
-
+
public int limit()
{
return buffer.limit();
}
-
+
public void limit(final int limit)
{
buffer.limit(limit);
}
-
+
public int capacity()
{
return buffer.capacity();
}
-
+
public void flip()
{
buffer.flip();
}
-
+
public MessagingBuffer slice()
{
return new IoBufferWrapper(buffer.slice());
@@ -120,7 +117,7 @@
{
return buffer.remaining();
}
-
+
public void rewind()
{
buffer.rewind();
@@ -135,7 +132,7 @@
{
buffer.put(byteArray);
}
-
+
public void putBytes(final byte[] bytes, int offset, int length)
{
buffer.put(bytes, offset, length);
@@ -145,7 +142,7 @@
{
buffer.putInt(intValue);
}
-
+
public void putInt(final int pos, final int intValue)
{
buffer.putInt(pos, intValue);
@@ -160,27 +157,27 @@
{
buffer.putFloat(floatValue);
}
-
+
public void putDouble(final double d)
{
buffer.putDouble(d);
}
-
+
public void putShort(final short s)
{
buffer.putShort(s);
}
-
+
public void putChar(final char chr)
{
buffer.putChar(chr);
- }
-
+ }
+
public byte getByte()
- {
+ {
return buffer.get();
}
-
+
public short getUnsignedByte()
{
return buffer.getUnsigned();
@@ -190,7 +187,7 @@
{
buffer.get(b);
}
-
+
public void getBytes(final byte[] b, final int offset, final int length)
{
buffer.get(b, offset, length);
@@ -200,7 +197,7 @@
{
return buffer.getInt();
}
-
+
public long getLong()
{
return buffer.getLong();
@@ -210,22 +207,22 @@
{
return buffer.getFloat();
}
-
+
public short getShort()
{
return buffer.getShort();
}
-
+
public int getUnsignedShort()
{
return buffer.getUnsignedShort();
}
-
+
public double getDouble()
{
return buffer.getDouble();
}
-
+
public char getChar()
{
return buffer.getChar();
@@ -245,19 +242,19 @@
public boolean getBoolean()
{
byte b = buffer.get();
- return (b == TRUE);
+ return b == TRUE;
}
public void putString(final String nullableString)
{
buffer.putInt(nullableString.length());
-
+
for (int i = 0; i < nullableString.length(); i++)
{
buffer.putChar(nullableString.charAt(i));
- }
+ }
}
-
+
public void putNullableString(final String nullableString)
{
if (nullableString == null)
@@ -267,7 +264,7 @@
else
{
buffer.put(NOT_NULL);
-
+
putString(nullableString);
}
}
@@ -275,21 +272,21 @@
public String getString()
{
int len = buffer.getInt();
-
+
char[] chars = new char[len];
-
+
for (int i = 0; i < len; i++)
{
chars[i] = buffer.getChar();
}
-
- return new String(chars);
+
+ return new String(chars);
}
-
+
public String getNullableString()
{
byte check = buffer.get();
-
+
if (check == NULL)
{
return null;
@@ -299,12 +296,12 @@
return getString();
}
}
-
+
public void putUTF(final String str) throws Exception
{
buffer.putPrefixedString(str, utf8.newEncoder());
}
-
+
public void putNullableSimpleString(final SimpleString string)
{
if (string == null)
@@ -317,25 +314,25 @@
putSimpleString(string);
}
}
-
+
public void putSimpleString(final SimpleString string)
{
byte[] data = string.getData();
-
+
buffer.putInt(data.length);
buffer.put(data);
}
-
+
public SimpleString getSimpleString()
{
int len = buffer.getInt();
-
+
byte[] data = new byte[len];
buffer.get(data);
-
+
return new SimpleString(data);
}
-
+
public SimpleString getNullableSimpleString()
{
int b = buffer.get();
@@ -348,17 +345,17 @@
return getSimpleString();
}
}
-
+
public String getUTF() throws Exception
{
return buffer.getPrefixedString(utf8.newDecoder());
}
-
+
public Object getUnderlyingBuffer()
{
return buffer;
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -26,19 +26,19 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionAttributeMap;
-import org.apache.mina.common.IoSessionDataStructureFactory;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteRequestQueue;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.session.IoSessionAttributeMap;
+import org.apache.mina.core.session.IoSessionDataStructureFactory;
+import org.apache.mina.core.write.WriteRequest;
+import org.apache.mina.core.write.WriteRequestQueue;
/**
- *
+ *
* A MessagingIOSessionDataStructureFactory
- *
+ *
* Derived from:
* @author The Apache MINA Project (dev at mina.apache.org)
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -56,8 +56,8 @@
{
return new ConcurrentWriteRequestQueue();
}
-
-
+
+
private static class ConcurrentIoSessionAttributeMap implements IoSessionAttributeMap {
private final ConcurrentMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);
@@ -95,7 +95,7 @@
if (value == null) {
return null;
}
-
+
return attributes.putIfAbsent(key, value);
}
@@ -115,7 +115,7 @@
if (value == null) {
return false;
}
-
+
return attributes.remove(key, value);
}
@@ -128,20 +128,20 @@
}
public Set<Object> getAttributeKeys(IoSession session) {
- return new HashSet<Object>(attributes.keySet());
+ return new HashSet<Object>(attributes.keySet());
}
public void dispose(IoSession session) throws Exception {
}
}
-
+
private static class ConcurrentWriteRequestQueue implements WriteRequestQueue
{
private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
-
+
public void dispose(IoSession session) {
}
-
+
public void clear(IoSession session) {
q.clear();
}
@@ -157,11 +157,11 @@
public synchronized WriteRequest poll(IoSession session) {
return q.poll();
}
-
+
@Override
public String toString() {
return q.toString();
}
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -18,19 +18,19 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.mina;
import java.net.InetSocketAddress;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.service.IoServiceListener;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.messaging.core.config.Configuration;
@@ -52,22 +52,22 @@
private static final Logger log = Logger.getLogger(MinaAcceptor.class);
private SocketAcceptor acceptor;
-
+
private IoServiceListener acceptorListener;
-
+
private final Configuration configuration;
-
+
private final RemotingHandler handler;
-
- private ConnectionLifeCycleListener listener;
-
+
+ private final ConnectionLifeCycleListener listener;
+
public MinaAcceptor(final Configuration configuration, final RemotingHandler handler,
final ConnectionLifeCycleListener listener)
{
this.configuration = configuration;
-
+
this.handler = handler;
-
+
this.listener = listener;
}
@@ -78,15 +78,15 @@
//Already started
return;
}
-
+
acceptor = new NioSocketAcceptor();
acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
-
+
if (configuration.isSSLEnabled())
- {
+ {
FilterChainSupport.addSSLFilter(filterChain, false, configuration.getKeyStorePath(),
configuration.getKeyStorePassword(),
configuration.getTrustStorePath(),
@@ -95,7 +95,7 @@
FilterChainSupport.addCodecFilter(filterChain, handler);
// Bind
- acceptor.setDefaultLocalAddress(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ acceptor.setDefaultLocalAddress(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
acceptor.getSessionConfig().setTcpNoDelay(configuration.getConnectionParams().isTcpNoDelay());
int receiveBufferSize = configuration.getConnectionParams().getTcpReceiveBufferSize();
if (receiveBufferSize != -1)
@@ -124,41 +124,43 @@
{
return;
}
-
+
// remove the listener before disposing the acceptor
// so that we're not notified when the sessions are destroyed
acceptor.removeListener(acceptorListener);
acceptor.unbind();
acceptor.dispose();
- acceptor = null;
+ acceptor = null;
}
-
+
public DefaultIoFilterChainBuilder getFilterChain()
{
return acceptor.getFilterChain();
}
-
+
// Inner classes -----------------------------------------------------------------------------
private final class MinaHandler extends IoHandlerAdapter
{
+ @Override
public void exceptionCaught(final IoSession session, final Throwable cause)
throws Exception
{
log.error("caught exception " + cause + " for session " + session, cause);
-
+
MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "MINA exception");
-
+
me.initCause(cause);
-
+
listener.connectionException(session.getId(), me);
}
+ @Override
public void messageReceived(final IoSession session, final Object message)
throws Exception
{
IoBuffer buffer = (IoBuffer) message;
-
+
handler.bufferReceived(session.getId(), new IoBufferWrapper(buffer));
}
}
@@ -181,12 +183,12 @@
public void sessionCreated(final IoSession session)
{
Connection tc = new MinaConnection(session);
-
+
listener.connectionCreated(tc);
}
public void sessionDestroyed(final IoSession session)
- {
+ {
listener.connectionDestroyed(session.getId());
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -18,12 +18,12 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.mina;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.MessagingBuffer;
@@ -41,35 +41,35 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(MinaConnection.class);
-
+
// Attributes ----------------------------------------------------
private final IoSession session;
-
+
private boolean closed;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
+
public MinaConnection(final IoSession session)
{
this.session = session;
}
// Public --------------------------------------------------------
-
+
// Connection implementation ----------------------------
-
+
public synchronized void close()
{
if (closed)
{
return;
}
-
+
session.close().awaitUninterruptibly();
-
+
SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
if (sslFilter != null)
{
@@ -81,10 +81,10 @@
{
// ignore
}
-
-
+
+
}
-
+
closed = true;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -23,14 +23,14 @@
import java.net.InetSocketAddress;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.service.IoServiceListener;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketConnector;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
@@ -40,13 +40,13 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.Connector;
-import org.jboss.messaging.core.remoting.spi.Connection;
/**
- *
+ *
* A MinaConnector
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -59,60 +59,60 @@
// Attributes ----------------------------------------------------
private SocketConnector connector;
-
+
private final RemotingHandler handler;
-
+
private final Location location;
-
+
private final ConnectionLifeCycleListener listener;
-
+
private final ConnectionParams params;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
public MinaConnector(final Location location, final ConnectionParams params,
final RemotingHandler handler,
final ConnectionLifeCycleListener listener)
- {
+ {
if (location == null)
{
throw new IllegalArgumentException("Invalid argument null location");
}
-
+
if (params == null)
{
throw new IllegalArgumentException("Invalid argument null connection params");
}
-
+
if (handler == null)
{
throw new IllegalArgumentException("Invalid argument null handler");
}
-
+
if (listener == null)
{
throw new IllegalArgumentException("Invalid argument null listener");
}
-
- this.handler = handler;
+
+ this.handler = handler;
this.location = location;
this.listener = listener;
- this.params = params;
+ this.params = params;
}
-
+
public synchronized void start()
{
if (connector != null)
{
return;
}
-
+
connector = new NioSocketConnector();
-
+
SocketSessionConfig connectorConfig = connector.getSessionConfig();
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
@@ -123,7 +123,7 @@
if (params.getTcpReceiveBufferSize() != -1)
{
connectorConfig.setReceiveBufferSize(params.getTcpReceiveBufferSize());
- }
+ }
if (params.getTcpSendBufferSize() != -1)
{
connectorConfig.setSendBufferSize(params.getTcpSendBufferSize());
@@ -132,7 +132,7 @@
connectorConfig.setReuseAddress(true);
if (params.isSSLEnabled())
- {
+ {
try
{
FilterChainSupport.addSSLFilter(filterChain, true, params.getKeyStorePath(), params.getKeyStorePassword(), null, null);
@@ -145,20 +145,20 @@
}
}
FilterChainSupport.addCodecFilter(filterChain, handler);
-
- connector.setHandler(new MinaHandler());
-
+
+ connector.setHandler(new MinaHandler());
+
connector.addListener(new ServiceListener());
}
-
+
public synchronized void close()
{
if (connector != null)
- {
+ {
connector.dispose();
- }
+ }
}
-
+
public Connection createConnection()
{
InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
@@ -166,11 +166,11 @@
connector.setDefaultRemoteAddress(address);
future.awaitUninterruptibly();
-
+
if (future.isConnected())
- {
- IoSession session = future.getSession();
-
+ {
+ IoSession session = future.getSession();
+
return new MinaConnection(session);
}
else
@@ -178,7 +178,7 @@
return null;
}
}
-
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
@@ -212,32 +212,34 @@
}
public void sessionDestroyed(IoSession session)
- {
+ {
listener.connectionDestroyed(session.getId());
}
}
-
+
private final class MinaHandler extends IoHandlerAdapter
{
+ @Override
public void exceptionCaught(final IoSession session, final Throwable cause)
throws Exception
{
log.error("caught exception " + cause + " for session " + session, cause);
MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "MINA exception");
-
+
me.initCause(cause);
-
- listener.connectionException(session.getId(), me);
+
+ listener.connectionException(session.getId(), me);
}
+ @Override
public void messageReceived(final IoSession session, final Object message)
throws Exception
{
IoBuffer buffer = (IoBuffer) message;
-
+
handler.bufferReceived(session.getId(), new IoBufferWrapper(buffer));
}
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -18,14 +18,14 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.*;
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
@@ -37,7 +37,7 @@
/**
* A Mina ProtocolEncoder used to encode/decode messages.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
@@ -45,9 +45,9 @@
implements ProtocolEncoder, ProtocolCodecFactory
{
private static final Logger log = Logger.getLogger(MinaProtocolCodecFilter.class);
-
+
private final RemotingHandler handler;
-
+
public MinaProtocolCodecFilter(final RemotingHandler handler)
{
this.handler = handler;
@@ -68,6 +68,7 @@
// ProtocolEncoder implementation ------------------------------------------
+ @Override
public void dispose(final IoSession session) throws Exception
{
}
@@ -81,18 +82,19 @@
// CumulativeProtocolDecoder overrides
// -------------------------------------------------------------------------------------
+ @Override
public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
{
//TODO - we can avoid this entirely if we maintain fragmented packets in the handler
-
+
int start = in.position();
-
+
int length = handler.isReadyToHandle(new IoBufferWrapper(in));
-
+
if (length == -1)
- {
+ {
in.position(start);
-
+
return false;
}
@@ -101,12 +103,12 @@
copied.put(in);
copied.setAutoExpand(true);
copied.flip();
-
+
in.position(start + length + SIZE_INT);
-
- out.write(copied);
-
- return true;
+
+ out.write(copied);
+
+ return true;
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/IoBufferWrapperTest.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -18,11 +18,11 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.tests.unit.core.remoting.impl.mina;
-import org.apache.mina.common.IoBuffer;
+import org.apache.mina.core.buffer.IoBuffer;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.tests.unit.core.remoting.MessagingBufferTestBase;
@@ -47,7 +47,7 @@
// Public --------------------------------------------------------
// BufferWrapperBase overrides -----------------------------------
-
+
@Override
protected MessagingBuffer createBuffer()
{
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MessagingIOSessionDataStructureFactoryTest.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -18,18 +18,16 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.tests.unit.core.remoting.impl.mina;
-import static org.easymock.EasyMock.createStrictMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+import static org.easymock.EasyMock.*;
+import static org.jboss.messaging.tests.util.RandomUtil.*;
import junit.framework.TestCase;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionAttributeMap;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.session.IoSessionAttributeMap;
import org.jboss.messaging.core.remoting.impl.mina.MessagingIOSessionDataStructureFactory;
/**
@@ -55,10 +53,10 @@
IoSession session = createStrictMock(IoSession.class);
replay(session);
-
+
MessagingIOSessionDataStructureFactory factory = new MessagingIOSessionDataStructureFactory();
IoSessionAttributeMap map = factory.getAttributeMap(null);
-
+
try
{
map.getAttribute(session, null, null);
@@ -66,12 +64,12 @@
} catch (NullPointerException e)
{
}
-
+
String key = randomString();
Object defaultValue = randomString();
Object attribute = map.getAttribute(session, key, defaultValue);
assertEquals(defaultValue, attribute);
-
+
Object value = randomString();
map.setAttribute(session, key, value);
attribute = map.getAttribute(session, key, defaultValue);
@@ -79,16 +77,16 @@
verify(session);
}
-
+
public void testSetAttribute() throws Exception
{
IoSession session = createStrictMock(IoSession.class);
replay(session);
-
+
MessagingIOSessionDataStructureFactory factory = new MessagingIOSessionDataStructureFactory();
IoSessionAttributeMap map = factory.getAttributeMap(null);
-
+
try
{
map.setAttribute(session, null, randomString());
@@ -96,7 +94,7 @@
} catch (NullPointerException e)
{
}
-
+
String key = randomString();
Object defaultValue = randomString();
Object value = randomString();
@@ -107,7 +105,7 @@
map.setAttribute(session, key, null);
attribute = map.getAttribute(session, key, defaultValue);
assertEquals(defaultValue, attribute);
-
+
verify(session);
}
@@ -116,10 +114,10 @@
IoSession session = createStrictMock(IoSession.class);
replay(session);
-
+
MessagingIOSessionDataStructureFactory factory = new MessagingIOSessionDataStructureFactory();
IoSessionAttributeMap map = factory.getAttributeMap(null);
-
+
try
{
map.setAttributeIfAbsent(session, null, randomString());
@@ -127,28 +125,28 @@
} catch (NullPointerException e)
{
}
-
+
String key = randomString();
Object defaultValue = randomString();
Object value = randomString();
-
+
assertNull(map.setAttributeIfAbsent(session, key, null));
Object attribute = map.setAttributeIfAbsent(session, key, value);
assertNull(attribute);
assertEquals(value, map.getAttribute(session, key, defaultValue));
-
+
verify(session);
}
-
+
public void testRemoveAttribute() throws Exception
{
IoSession session = createStrictMock(IoSession.class);
replay(session);
-
+
MessagingIOSessionDataStructureFactory factory = new MessagingIOSessionDataStructureFactory();
IoSessionAttributeMap map = factory.getAttributeMap(null);
-
+
try
{
map.removeAttribute(session, null);
@@ -156,28 +154,28 @@
} catch (NullPointerException e)
{
}
-
+
String key = randomString();
Object value = randomString();
-
+
assertNull(map.removeAttribute(session, key));
-
+
map.setAttribute(session, key, value);
assertEquals(value, map.removeAttribute(session, key));
assertNull(map.removeAttribute(session, key));
-
+
verify(session);
}
-
+
public void testRemoveAttributeWithValue() throws Exception
{
IoSession session = createStrictMock(IoSession.class);
replay(session);
-
+
MessagingIOSessionDataStructureFactory factory = new MessagingIOSessionDataStructureFactory();
IoSessionAttributeMap map = factory.getAttributeMap(null);
-
+
try
{
map.removeAttribute(session, null, randomString());
@@ -185,31 +183,31 @@
} catch (NullPointerException e)
{
}
-
+
assertFalse(map.removeAttribute(session, randomString(), null));
-
+
String key = randomString();
Object value = randomString();
Object otherValue = randomString();
-
+
assertFalse(map.removeAttribute(session, key, value));
-
+
map.setAttribute(session, key, value);
assertFalse(map.removeAttribute(session, key, otherValue));
assertTrue(map.removeAttribute(session, key, value));
-
+
verify(session);
}
-
+
public void testReplaceAttribute() throws Exception
{
IoSession session = createStrictMock(IoSession.class);
replay(session);
-
+
MessagingIOSessionDataStructureFactory factory = new MessagingIOSessionDataStructureFactory();
IoSessionAttributeMap map = factory.getAttributeMap(null);
-
+
try
{
map.replaceAttribute(session, null, randomString(), randomString());
@@ -217,19 +215,19 @@
} catch (NullPointerException e)
{
}
-
+
assertFalse(map.replaceAttribute(session, randomString(), randomString(), randomString()));
}
-
+
public void testContainsAttribute() throws Exception
{
IoSession session = createStrictMock(IoSession.class);
replay(session);
-
+
MessagingIOSessionDataStructureFactory factory = new MessagingIOSessionDataStructureFactory();
IoSessionAttributeMap map = factory.getAttributeMap(null);
-
+
try
{
map.containsAttribute(session, null);
@@ -237,15 +235,15 @@
} catch (NullPointerException e)
{
}
-
+
String key = randomString();
Object value = randomString();
-
+
assertFalse(map.containsAttribute(session, key));
map.setAttribute(session, key, value);
assertTrue(map.containsAttribute(session, key));
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java 2008-07-30 09:34:22 UTC (rev 4748)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectionTest.java 2008-07-30 12:27:15 UTC (rev 4749)
@@ -21,16 +21,16 @@
*/
package org.jboss.messaging.tests.unit.core.remoting.impl.mina;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.core.session.IoSession;
import org.easymock.EasyMock;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnection;
import org.jboss.messaging.tests.util.UnitTestCase;
/**
- *
+ *
* A MinaConnectionTest
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -39,56 +39,56 @@
public void testGetID() throws Exception
{
IoSession session = EasyMock.createStrictMock(IoSession.class);
-
+
final long id = 192812;
-
+
EasyMock.expect(session.getId()).andReturn(id);
-
+
MinaConnection conn = new MinaConnection(session);
-
+
EasyMock.replay(session);
-
+
assertEquals(id, conn.getID());
-
+
EasyMock.verify(session);
}
-
+
public void testWrite() throws Exception
{
IoSession session = EasyMock.createStrictMock(IoSession.class);
-
+
final Object underlying = new Object();
-
+
MessagingBuffer buff = EasyMock.createStrictMock(MessagingBuffer.class);
-
+
EasyMock.expect(buff.getUnderlyingBuffer()).andReturn(underlying);
-
+
EasyMock.expect(session.write(underlying)).andReturn(null);
-
+
MinaConnection conn = new MinaConnection(session);
-
+
EasyMock.replay(session, buff);
-
+
conn.write(buff);
-
+
EasyMock.verify(session, buff);
}
-
+
public void testCreateBuffer() throws Exception
{
IoSession session = EasyMock.createStrictMock(IoSession.class);
-
+
MinaConnection conn = new MinaConnection(session);
-
+
EasyMock.replay(session);
-
+
final int size = 1234;
-
+
MessagingBuffer buff = conn.createBuffer(size);
-
+
assertEquals(size, buff.capacity());
-
+
EasyMock.verify(session);
}
-
+
}
More information about the jboss-cvs-commits
mailing list