[jboss-cvs] JBoss Messaging SVN: r3446 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/client/remoting and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 7 09:21:50 EST 2007
Author: jmesnil
Date: 2007-12-07 09:21:50 -0500 (Fri, 07 Dec 2007)
New Revision: 3446
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOConnector.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOSession.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/
Removed:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateSessionResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaInspector.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaHandlerTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaInspectorTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaRemotingBufferTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/MinaServiceWrapper.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544 Replace client-server transport with NIO based transport
* refactoring
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -36,6 +36,7 @@
import org.jboss.jms.delegate.TopologyResult;
import org.jboss.jms.exception.MessagingNetworkFailureException;
import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackRequest;
@@ -288,7 +289,7 @@
try
{
- client = new Client();
+ client = new Client(new MinaConnector());
client.connect(serverHost, serverPort, TCP);
}
catch (Exception e)
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -24,6 +24,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.Client;
import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
/**
* Encapsulates the state and behaviour from MINA needed for a JMS connection.
@@ -82,7 +83,7 @@
callbackManager = new CallbackManager();
- client = new Client();
+ client = new Client(new MinaConnector());
client.connect(serverHost, serverPort, TransportType.TCP);
log.trace(this + " started");
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -9,36 +9,16 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.logging.LoggingFilter;
-import org.apache.mina.filter.logging.MdcInjectionFilter;
import org.apache.mina.filter.reqres.Request;
-import org.apache.mina.filter.reqres.RequestResponseFilter;
import org.apache.mina.filter.reqres.Response;
-import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.exception.MessagingNetworkFailureException;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
-import org.jboss.messaging.core.remoting.internal.MinaHandler;
-import org.jboss.messaging.core.remoting.internal.MinaInspector;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
/**
@@ -54,22 +34,21 @@
// Attributes ----------------------------------------------------
- private IoSession session;
+ private final NIOConnector connector;
+ private NIOSession session;
+
// By default, a blocking request will timeout after 5 seconds
private int blockingRequestTimeout = 5;
private TimeUnit blockingRequestTimeUnit = SECONDS;
- private NioSocketConnector connector;
- private ScheduledExecutorService blockingScheduler;
- private Map<ConsolidatedRemotingConnectionListener, IoServiceListener> listeners = new HashMap<ConsolidatedRemotingConnectionListener, IoServiceListener>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public Client()
+ public Client(NIOConnector connector)
{
+ this.connector = connector;
}
// Public --------------------------------------------------------
@@ -87,65 +66,21 @@
assert port > 0;
assert transport != null;
- connector = new NioSocketConnector();
-
- MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
- connector.getFilterChain().addLast("mdc", mdcInjectionFilter);
-
- connector.getFilterChain().addLast("codec",
- new ProtocolCodecFilter(new PacketCodecFactory()));
-
- addBlockingRequestResponseFilter(connector.getFilterChain());
-
- connector.getFilterChain().addLast("logger", new LoggingFilter());
-
- connector.setHandler(new MinaHandler(PacketDispatcher.client));
- connector.getSessionConfig().setKeepAlive(true);
- connector.getSessionConfig().setReuseAddress(true);
- InetSocketAddress address = new InetSocketAddress(host, port);
- ConnectFuture future = connector.connect(address);
- connector.setDefaultRemoteAddress(address);
-
- future.awaitUninterruptibly();
- if (!future.isConnected())
- {
- throw new IOException("Cannot connect to " + address.toString());
- }
- this.session = future.getSession();
+ this.session = connector.connect(host, port, transport);
}
public boolean disconnect() throws Exception
{
- if (session == null)
- {
- // no session => not connected
- // do nothing
- return false;
- }
-
- assert connector != null;
- assert blockingScheduler != null;
-
- CloseFuture closeFuture = session.close().awaitUninterruptibly();
- boolean closed = closeFuture.isClosed();
-
- connector.dispose();
- blockingScheduler.shutdown();
-
- connector = null;
- blockingScheduler = null;
- session = null;
-
- return closed;
+ return connector.disconnect();
}
public String getSessionID()
{
- if (session == null)
+ if (session == null || !session.isConnected())
{
return null;
}
- return Long.toString(session.getId());
+ return Long.toString(session.getID());
}
public void sendOneWay(AbstractPacket packet) throws JMSException
@@ -189,27 +124,12 @@
public void addConnectionListener(
final ConsolidatedRemotingConnectionListener listener)
{
- assert listener != null;
- assert connector != null;
-
- IoServiceListener ioListener = new IoServiceListenerAdapter(listener);
- connector.addListener(ioListener);
- listeners.put(listener, ioListener);
-
- if (log.isTraceEnabled())
- log.trace("added listener " + listener + " to " + this);
+ connector.addConnectionListener(listener);
}
public void removeConnectionListener(ConsolidatedRemotingConnectionListener listener)
{
- assert listener != null;
- assert connector != null;
-
- connector.removeListener(listeners.get(listener));
- listeners.remove(listener);
-
- if (log.isTraceEnabled())
- log.trace("removed listener " + listener + " from " + this);
+ connector.removeConnectionListener(listener);
}
public boolean isConnected()
@@ -222,10 +142,7 @@
public String getURI()
{
- if (connector == null)
- return null;
- else
- return connector.getDefaultRemoteAddress().toString();
+ return connector.getServerURI();
}
@Override
@@ -240,14 +157,6 @@
// Private -------------------------------------------------------
- private void addBlockingRequestResponseFilter(
- DefaultIoFilterChainBuilder chain)
- {
- blockingScheduler = Executors.newScheduledThreadPool(1);
- RequestResponseFilter filter = new RequestResponseFilter(
- new MinaInspector(), blockingScheduler);
- chain.addLast("reqres", filter);
- }
private void checkConnected() throws JMSException
{
@@ -264,50 +173,4 @@
}
// Inner classes -------------------------------------------------
-
- private final class IoServiceListenerAdapter implements IoServiceListener
- {
- private final Logger log = Logger
- .getLogger(IoServiceListenerAdapter.class);
-
- private final ConsolidatedRemotingConnectionListener listener;
-
- private IoServiceListenerAdapter(
- ConsolidatedRemotingConnectionListener listener)
- {
- this.listener = listener;
- }
-
- public void serviceActivated(IoService service)
- {
- if (log.isTraceEnabled())
- log.trace("activated " + service);
- }
-
- public void serviceDeactivated(IoService service)
- {
- if (log.isTraceEnabled())
- log.trace("deactivated " + service);
- }
-
- public void serviceIdle(IoService service, IdleStatus idleStatus)
- {
- if (log.isTraceEnabled())
- log.trace("idle " + service + ", status=" + idleStatus);
- }
-
- public void sessionCreated(IoSession session)
- {
- if (log.isInfoEnabled())
- log.info("created session " + session);
- }
-
- public void sessionDestroyed(IoSession session)
- {
- log.warn("destroyed session " + session);
-
- Throwable t = new Throwable("MINA session has been destroyed");
- listener.handleConnectionException(t);
- }
- }
}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOConnector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOConnector.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOConnector.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import java.io.IOException;
+
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface NIOConnector
+{
+
+ public abstract NIOSession connect(String host, int port,
+ TransportType transport) throws IOException;
+
+ public abstract boolean disconnect();
+
+ public abstract void addConnectionListener(
+ final ConsolidatedRemotingConnectionListener listener);
+
+ public abstract void removeConnectionListener(
+ ConsolidatedRemotingConnectionListener listener);
+
+ public abstract String getServerURI();
+
+}
\ No newline at end of file
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOSession.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOSession.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/NIOSession.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -0,0 +1,24 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface NIOSession
+{
+
+ public abstract long getID();
+
+ public abstract void write(Object object);
+
+ public abstract boolean isConnected();
+
+}
\ No newline at end of file
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -20,7 +20,6 @@
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.impl.message.MessageFactory;
import org.jboss.messaging.core.remoting.Constants;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -151,8 +150,8 @@
byte version = wrapper.get();
wrapper.getInt(); // skip header length
long correlationID = wrapper.getLong();
- String targetID = MinaPacketCodec.getString(wrapper);
- String callbackID = MinaPacketCodec.getString(wrapper);
+ String targetID = wrapper.getNullableString();
+ String callbackID = wrapper.getNullableString();
P packet = decodeBody(wrapper);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
/**
@@ -72,11 +71,11 @@
return null;
}
byte version = in.get();
- String remotingSessionID = MinaPacketCodec.getString(in);
- String clientVMID = MinaPacketCodec.getString(in);
+ String remotingSessionID = in.getNullableString();
+ String clientVMID = in.getNullableString();
int failedNodeID = in.getInt();
- String username = MinaPacketCodec.getString(in);
- String password = MinaPacketCodec.getString(in);
+ String username = in.getNullableString();
+ String password = in.getNullableString();
return new CreateConnectionRequest(version, remotingSessionID,
clientVMID, failedNodeID, username, password);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATECONNECTION;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
/**
@@ -58,7 +57,7 @@
{
return null;
}
- String id = MinaPacketCodec.getString(in);
+ String id = in.getNullableString();
int serverID = in.getInt();
return new CreateConnectionResponse(id, serverID);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserRequestCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserRequestCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -9,7 +9,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
/**
@@ -63,7 +62,7 @@
byte[] b = new byte[destinationLength];
in.get(b);
JBossDestination destination = decode(b);
- String selector = MinaPacketCodec.getString(in);
+ String selector = in.getNullableString();
return new CreateBrowserRequest(destination, selector);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserResponseCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateBrowserResponseCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATEBROWSER;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
/**
@@ -56,7 +55,7 @@
return null;
}
- String browserID = MinaPacketCodec.getString(in);
+ String browserID = in.getNullableString();
return new CreateBrowserResponse(browserID);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerRequestCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerRequestCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -9,7 +9,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
/**
@@ -72,9 +71,9 @@
byte[] b = new byte[destinationLength];
in.get(b);
JBossDestination destination = decode(b);
- String selector = MinaPacketCodec.getString(in);
+ String selector = in.getNullableString();
boolean noLocal = in.getBoolean();
- String subName = MinaPacketCodec.getString(in);
+ String subName = in.getNullableString();
boolean connectionConsumer = in.getBoolean();
boolean autoflowControl = in.getBoolean();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerResponseCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateConsumerResponseCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATECONSUMER;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
/**
@@ -63,7 +62,7 @@
return null;
}
- String consumerID = MinaPacketCodec.getString(in);
+ String consumerID = in.getNullableString();
int bufferSize = in.getInt();
int maxDeliveries = in.getInt();
long redeliveryDelay = in.getLong();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
/**
@@ -58,7 +57,7 @@
return null;
}
- String name = MinaPacketCodec.getString(in);
+ String name = in.getNullableString();
boolean isQueue = in.getBoolean();
return new CreateDestinationRequest(name, isQueue);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateSessionResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateSessionResponseCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateSessionResponseCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -6,7 +6,6 @@
*/
package org.jboss.messaging.core.remoting.codec;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -58,7 +57,7 @@
return null;
}
- String sessionID = MinaPacketCodec.getString(in);
+ String sessionID = in.getNullableString();
int dupsOKBatchSize = in.getInt();
boolean strictTCK = in.getBoolean();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -9,7 +9,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELIVERMESSAGE;
import org.jboss.messaging.core.contract.Message;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
/**
@@ -70,7 +69,7 @@
byte[] encodedMsg = new byte[msgLength];
in.get(encodedMsg);
Message msg = decode(type, encodedMsg);
- String consumerID = MinaPacketCodec.getString(in);
+ String consumerID = in.getNullableString();
long deliveryID = in.getLong();
int deliveryCount = in.getInt();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETCLIENTID;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
/**
@@ -55,7 +54,7 @@
return null;
}
- String clientID = MinaPacketCodec.getString(in);
+ String clientID = in.getNullableString();
return new GetClientIDResponse(clientID);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -9,7 +9,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
+import org.jboss.messaging.core.remoting.integration.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -14,7 +14,6 @@
import java.util.List;
import org.jboss.jms.delegate.DeliveryRecovery;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
@@ -78,7 +77,7 @@
in.get(encodedDeliveries);
List<DeliveryRecovery> deliveries = decode(deliveriesSize,
encodedDeliveries);
- String sessionID = MinaPacketCodec.getString(in);
+ String sessionID = in.getNullableString();
return new RecoverDeliveriesMessage(deliveries, sessionID);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
/**
@@ -55,7 +54,7 @@
return null;
}
- String clientID = MinaPacketCodec.getString(in);
+ String clientID = in.getNullableString();
return new SetClientIDMessage(clientID);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
/**
@@ -53,7 +52,7 @@
{
return null;
}
- String text = MinaPacketCodec.getString(in);
+ String text = in.getNullableString();
return new TextPacket(text);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
/**
@@ -53,7 +52,7 @@
return null;
}
- String subscriptionName = MinaPacketCodec.getString(in);
+ String subscriptionName = in.getNullableString();
return new UnsubscribeMessage(subscriptionName);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
/**
@@ -58,8 +57,8 @@
{
return null;
}
- String remotingSessionID = MinaPacketCodec.getString(in);
- String clientVMID = MinaPacketCodec.getString(in);
+ String remotingSessionID = in.getNullableString();
+ String clientVMID = in.getNullableString();
boolean add = in.getBoolean();
return new UpdateCallbackMessage(remotingSessionID, clientVMID, add);
Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration (from rev 3431, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal)
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -0,0 +1,239 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.integration;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.filter.reqres.RequestResponseFilter;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaConnector implements NIOConnector
+{
+ // Constants -----------------------------------------------------
+
+ private final Logger log = Logger.getLogger(MinaConnector.class);
+
+ // Attributes ----------------------------------------------------
+
+ private NioSocketConnector connector;
+
+ private ScheduledExecutorService blockingScheduler;
+
+ private IoSession session;
+
+ private Map<ConsolidatedRemotingConnectionListener, IoServiceListener> listeners = new HashMap<ConsolidatedRemotingConnectionListener, IoServiceListener>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public MinaConnector() throws Exception
+ {
+ this.connector = new NioSocketConnector();
+
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ connector.getFilterChain().addLast("mdc", mdcInjectionFilter);
+
+ connector.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+
+ addBlockingRequestResponseFilter(connector.getFilterChain());
+
+ connector.getFilterChain().addLast("logger", new LoggingFilter());
+
+ connector.setHandler(new MinaHandler(PacketDispatcher.client));
+ connector.getSessionConfig().setKeepAlive(true);
+ connector.getSessionConfig().setReuseAddress(true);
+}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOConnector#connect(java.lang.String, int, org.jboss.messaging.core.remoting.TransportType)
+ */
+ public NIOSession connect(String host, int port, TransportType transport) throws IOException {
+ assert host != null;
+ assert port > 0;
+ assert transport != null;
+
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ ConnectFuture future = connector.connect(address);
+ connector.setDefaultRemoteAddress(address);
+
+ future.awaitUninterruptibly();
+ if (!future.isConnected())
+ {
+ throw new IOException("Cannot connect to " + address.toString());
+ }
+ this.session = future.getSession();
+ return new MinaSession(session);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOConnector#disconnect()
+ */
+ public boolean disconnect()
+ {
+ if (session == null)
+ {
+ return false;
+ }
+
+ CloseFuture closeFuture = session.close().awaitUninterruptibly();
+ boolean closed = closeFuture.isClosed();
+
+ connector.dispose();
+ blockingScheduler.shutdown();
+
+ connector = null;
+ blockingScheduler = null;
+ session = null;
+
+ return closed;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOConnector#addConnectionListener(org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener)
+ */
+ public void addConnectionListener(
+ final ConsolidatedRemotingConnectionListener listener)
+ {
+ assert listener != null;
+ assert connector != null;
+
+ IoServiceListener ioListener = new IoServiceListenerAdapter(listener);
+ connector.addListener(ioListener);
+ listeners.put(listener, ioListener);
+
+ if (log.isTraceEnabled())
+ log.trace("added listener " + listener + " to " + this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOConnector#removeConnectionListener(org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener)
+ */
+ public void removeConnectionListener(ConsolidatedRemotingConnectionListener listener)
+ {
+ assert listener != null;
+ assert connector != null;
+
+ connector.removeListener(listeners.get(listener));
+ listeners.remove(listener);
+
+ if (log.isTraceEnabled())
+ log.trace("removed listener " + listener + " from " + this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOConnector#getServerURI()
+ */
+ public String getServerURI()
+ {
+ if (connector == null)
+ {
+ return null;
+ }
+ InetSocketAddress address = connector.getDefaultRemoteAddress();
+ if (address != null)
+ {
+ return address.toString();
+ } else {
+ return null;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void addBlockingRequestResponseFilter(
+ DefaultIoFilterChainBuilder chain)
+ {
+ blockingScheduler = Executors.newScheduledThreadPool(1);
+ RequestResponseFilter filter = new RequestResponseFilter(
+ new MinaInspector(), blockingScheduler);
+ chain.addLast("reqres", filter);
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private final class IoServiceListenerAdapter implements IoServiceListener
+ {
+ private final Logger log = Logger
+ .getLogger(IoServiceListenerAdapter.class);
+
+ private final ConsolidatedRemotingConnectionListener listener;
+
+ private IoServiceListenerAdapter(
+ ConsolidatedRemotingConnectionListener listener)
+ {
+ this.listener = listener;
+ }
+
+ public void serviceActivated(IoService service)
+ {
+ if (log.isTraceEnabled())
+ log.trace("activated " + service);
+ }
+
+ public void serviceDeactivated(IoService service)
+ {
+ if (log.isTraceEnabled())
+ log.trace("deactivated " + service);
+ }
+
+ public void serviceIdle(IoService service, IdleStatus idleStatus)
+ {
+ if (log.isTraceEnabled())
+ log.trace("idle " + service + ", status=" + idleStatus);
+ }
+
+ public void sessionCreated(IoSession session)
+ {
+ if (log.isInfoEnabled())
+ log.info("created session " + session);
+ }
+
+ public void sessionDestroyed(IoSession session)
+ {
+ log.warn("destroyed session " + session);
+
+ Throwable t = new Throwable("MINA session has been destroyed");
+ listener.handleConnectionException(t);
+ }
+ }
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java 2007-12-06 16:38:05 UTC (rev 3431)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaHandler.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -4,7 +4,7 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
-package org.jboss.messaging.core.remoting.internal;
+package org.jboss.messaging.core.remoting.integration;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaInspector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java 2007-12-06 16:38:05 UTC (rev 3431)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaInspector.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -4,7 +4,7 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
-package org.jboss.messaging.core.remoting.internal;
+package org.jboss.messaging.core.remoting.integration;
import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java 2007-12-06 16:38:05 UTC (rev 3431)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaPacketCodec.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -4,7 +4,7 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
-package org.jboss.messaging.core.remoting.internal;
+package org.jboss.messaging.core.remoting.integration;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
@@ -136,12 +136,6 @@
// Inner classes -------------------------------------------------
- public static String getString(RemotingBuffer in)
- throws CharacterCodingException
- {
- return in.getNullableString();
- }
-
public static final class BufferWrapper implements RemotingBuffer
{
private final IoBuffer buffer;
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java 2007-12-06 16:38:05 UTC (rev 3431)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -4,7 +4,7 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
-package org.jboss.messaging.core.remoting.internal;
+package org.jboss.messaging.core.remoting.integration;
import java.net.InetSocketAddress;
import java.util.Formatter;
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.integration;
+
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.NIOSession;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaSession implements NIOSession
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final IoSession session;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public MinaSession(IoSession session)
+ {
+ assert session != null;
+
+ this.session = session;
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOSession#getID()
+ */
+ public long getID()
+ {
+ return session.getId();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOSession#write(java.lang.Object)
+ */
+ public void write(Object object)
+ {
+ session.write(object);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.integration.NIOSession#isConnected()
+ */
+ public boolean isConnected()
+ {
+ return session.isConnected();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -7,6 +7,7 @@
package org.jboss.test.messaging.core.remoting;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.jboss.messaging.core.remoting.TransportType.TCP;
import java.io.IOException;
@@ -15,7 +16,10 @@
import javax.jms.IllegalStateException;
import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -41,7 +45,7 @@
public void testConnected() throws Exception
{
- Client client = new Client();
+ Client client = new Client(new MinaConnector());
assertFalse(client.isConnected());
@@ -53,9 +57,28 @@
assertFalse(client.disconnect());
}
+ public void testConnectionFailure() throws Exception
+ {
+ Client client = new Client(new NIOConnectorAdapter() {
+ @Override
+ public NIOSession connect(String host, int port,
+ TransportType transport) throws IOException
+ {
+ throw new IOException("connection exception");
+ }
+ });
+
+ try {
+ client.connect("localhost", PORT, TCP);
+ fail("connection must fail");
+ } catch (IOException e)
+ {
+ }
+ }
+
public void testSessionID() throws Exception
{
- Client client = new Client();
+ Client client = new Client(new MinaConnector());
assertNull(client.getSessionID());
client.connect("localhost", PORT, TCP);
assertNotNull(client.getSessionID());
@@ -65,7 +88,7 @@
public void testURI() throws Exception
{
- Client client = new Client();
+ Client client = new Client(new MinaConnector());
assertNull(client.getURI());
client.connect("localhost", PORT, TCP);
assertNotNull(client.getURI());
@@ -75,7 +98,7 @@
public void testCanNotSendPacketIfNotConnected() throws Exception
{
- Client client = new Client();
+ Client client = new Client(new MinaConnector());
try
{
@@ -96,7 +119,7 @@
packet.setTargetID(serverPacketHandler.getID());
client.sendOneWay(packet);
- serverPacketHandler.await();
+ assertTrue(serverPacketHandler.await(2, SECONDS));
List<TextPacket> messages = serverPacketHandler.getPackets();
assertEquals(1, messages.size());
@@ -117,7 +140,7 @@
client.sendOneWay(packets[i]);
}
- serverPacketHandler.await();
+ assertTrue(serverPacketHandler.await(10, SECONDS));
List<TextPacket> receivedPackets = serverPacketHandler.getPackets();
assertEquals(MANY_MESSAGES, receivedPackets.size());
@@ -142,7 +165,7 @@
client.sendOneWay(packet);
- callbackHandler.await();
+ assertTrue(callbackHandler.await(5, SECONDS));
assertEquals(1, callbackHandler.getPackets().size());
String response = callbackHandler.getPackets().get(0).getText();
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -6,6 +6,8 @@
*/
package org.jboss.test.messaging.core.remoting;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.util.List;
import org.jboss.messaging.core.remoting.PacketDispatcher;
@@ -47,7 +49,7 @@
// handles the packet
client.sendOneWay(packet);
- serverPacketHandler.await();
+ assertTrue(serverPacketHandler.await(2, SECONDS));
assertNotNull(serverPacketHandler.getLastSender());
PacketSender sender = serverPacketHandler.getLastSender();
@@ -57,7 +59,7 @@
packetFromServer.setTargetID(clientHandler.getID());
sender.send(packetFromServer);
- clientHandler.await();
+ assertTrue(clientHandler.await(2, SECONDS));
List<TextPacket> packets = clientHandler.getPackets();
assertEquals(1, packets.size());
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -10,6 +10,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
@@ -33,10 +34,11 @@
return id;
}
- public void await() throws InterruptedException
+ public boolean await(long time, TimeUnit timeUnit) throws InterruptedException
{
- if (latch != null)
- latch.await();
+ if (latch == null)
+ return false;
+ return latch.await(time, timeUnit);
}
public void expectMessage(int count)
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -11,7 +11,8 @@
import org.jboss.messaging.core.remoting.Client;
import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.internal.MinaService;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
+import org.jboss.messaging.core.remoting.integration.MinaService;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -89,7 +90,7 @@
void startClient(int port, TransportType transport, boolean useSSL)
throws Exception
{
- client = new Client();
+ client = new Client(new MinaConnector());
client.connect("localhost", port, transport, useSSL);
}
Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration (from rev 3431, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal)
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaHandlerTest.java 2007-12-06 16:38:05 UTC (rev 3431)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaHandlerTest.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -4,12 +4,12 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
-package org.jboss.test.messaging.core.remoting.internal;
+package org.jboss.test.messaging.core.remoting.integration;
import junit.framework.TestCase;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.internal.MinaHandler;
+import org.jboss.messaging.core.remoting.integration.MinaHandler;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
import org.jboss.test.messaging.core.remoting.TestPacketHandler;
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaInspectorTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaInspectorTest.java 2007-12-06 16:38:05 UTC (rev 3431)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaInspectorTest.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -4,7 +4,7 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
-package org.jboss.test.messaging.core.remoting.internal;
+package org.jboss.test.messaging.core.remoting.integration;
import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
@@ -12,7 +12,7 @@
import junit.framework.TestCase;
-import org.jboss.messaging.core.remoting.internal.MinaInspector;
+import org.jboss.messaging.core.remoting.integration.MinaInspector;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaRemotingBufferTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java 2007-12-06 16:38:05 UTC (rev 3431)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaRemotingBufferTest.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -4,14 +4,14 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
-package org.jboss.test.messaging.core.remoting.internal;
+package org.jboss.test.messaging.core.remoting.integration;
import static java.util.UUID.randomUUID;
import junit.framework.TestCase;
import org.apache.mina.common.IoBuffer;
import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
+import org.jboss.messaging.core.remoting.integration.MinaPacketCodec;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/MinaServiceWrapper.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/MinaServiceWrapper.java 2007-12-07 13:46:56 UTC (rev 3445)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/MinaServiceWrapper.java 2007-12-07 14:21:50 UTC (rev 3446)
@@ -21,7 +21,7 @@
*/
package org.jboss.test.messaging.tools.container;
-import org.jboss.messaging.core.remoting.internal.MinaService;
+import org.jboss.messaging.core.remoting.integration.MinaService;
More information about the jboss-cvs-commits
mailing list