[jboss-cvs] JBoss Messaging SVN: r3636 - in trunk: src/main/org/jboss/jms/client/impl and 16 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 28 09:35:42 EST 2008
Author: jmesnil
Date: 2008-01-28 09:35:41 -0500 (Mon, 28 Jan 2008)
New Revision: 3636
Added:
trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java
trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java
Modified:
trunk/.classpath
trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
Log:
* implemented JBMESSAGING-1196 - add heartbeat + tests
* removed unused version byte from AbstractPacket
* added oneWay boolean to AbstractPacket to know when a response is expected or not
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/.classpath 2008-01-28 14:35:41 UTC (rev 3636)
@@ -17,6 +17,8 @@
<classpathentry kind="src" path="docs/examples/topic/src"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
+ <classpathentry kind="src" path="tests/etc/ide"/>
+ <classpathentry excluding="ide/" kind="src" path="tests/etc"/>
<classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar"/>
<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
@@ -71,9 +73,8 @@
<classpathentry kind="lib" path="thirdparty/jboss/jbosssx-client/lib/jbosssx-client.jar"/>
<classpathentry kind="lib" path="lib/je-3.2.44.jar"/>
<classpathentry kind="lib" path="tests/lib/easymock.jar"/>
- <classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar" sourcepath="/home/tim/workspace/mina-trunk/core/src/main/java"/>
+ <classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar" sourcepath="thirdparty/apache-mina/lib/mina-core-sources.jar"/>
<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
- <classpathentry kind="lib" path="tests/etc"/>
<classpathentry kind="lib" path="src/etc/server/default/config"/>
<classpathentry kind="lib" path="src/etc/server/default/deploy"/>
<classpathentry kind="lib" path="tests/lib/jdbc-drivers/mysql-connector-java-5.1.5-bin.jar"/>
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -122,7 +122,7 @@
MessagingRemotingConnection remotingConnection = null;
try
{
- remotingConnection = new MessagingRemotingConnection(version, serverLocatorURI);
+ remotingConnection = new MessagingRemotingConnection(serverLocatorURI);
remotingConnection.start();
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -70,14 +70,10 @@
// explicitly remove it from the remoting client
private ConsolidatedRemotingConnectionListener remotingConnectionListener;
- private Version version;
-
// Constructors ---------------------------------------------------------------------------------
- public MessagingRemotingConnection(Version version, String serverLocatorURI) throws Exception
+ public MessagingRemotingConnection(String serverLocatorURI) throws Exception
{
- this.version = version;
-
serverLocator = new ServerLocator(serverLocatorURI);
log.trace(this + " created");
@@ -130,9 +126,6 @@
public void sendOneWay(String id, AbstractPacket packet) throws JMSException
{
packet.setTargetID(id);
-
- packet.setVersion(version.getProviderIncrementingVersion());
-
client.sendOneWay(packet);
}
@@ -140,8 +133,6 @@
{
packet.setTargetID(id);
- packet.setVersion(version.getProviderIncrementingVersion());
-
try
{
AbstractPacket response = (AbstractPacket) client.sendBlocking(packet);
@@ -166,6 +157,9 @@
public synchronized void addConnectionListener(ConsolidatedRemotingConnectionListener listener)
{
this.remotingConnectionListener = listener;
+ if (client != null)
+ client.addConnectionListener(remotingConnectionListener);
+
}
public synchronized ConsolidatedRemotingConnectionListener getConnectionListener()
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -35,6 +35,7 @@
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.Logger;
@@ -47,7 +48,7 @@
*
* $Id$
*/
-public class SimpleConnectionManager implements ConnectionManager
+public class SimpleConnectionManager implements ConnectionManager, ConnectionExceptionListener
{
// Constants ------------------------------------------------------------------------------------
@@ -197,6 +198,13 @@
//NOOP
}
+ // ConnectionExceptionListener ------------------------------------------------------------------
+
+ public void handleConnectionException(Exception e, String clientSessionID)
+ {
+ handleClientFailure(clientSessionID , true);
+ }
+
// Public ---------------------------------------------------------------------------------------
/*
@@ -268,7 +276,7 @@
{
try
{
- log.debug("clPearing up state for connection " + sce);
+ log.debug("clearing up state for connection " + sce);
sce.closing();
sce.close();
log.debug("cleared up state for connection " + sce);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -45,7 +45,6 @@
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
-import org.jboss.messaging.core.remoting.wireformat.ClosingMessage;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -306,19 +305,12 @@
} else if (type == MSG_BROWSER_RESET)
{
reset();
-
- response = new NullPacket();
} else if (type == PacketType.MSG_CLOSING)
{
- ClosingMessage request = (ClosingMessage) packet;
closing();
-
- response = new NullPacket();
} else if (type == MSG_CLOSE)
{
close();
-
- response = new NullPacket();
} else
{
response = new JMSExceptionMessage(new MessagingJMSException(
@@ -326,12 +318,16 @@
}
// reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
if (response != null)
{
response.normalize(packet);
sender.send(response);
}
-
} catch (JMSException e)
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -583,18 +583,12 @@
} else if (type == MSG_STOPCONNECTION)
{
stop();
-
- response = new NullPacket();
} else if (type == PacketType.MSG_CLOSING)
{
closing();
-
- response = new NullPacket();
} else if (type == MSG_CLOSE)
{
close();
-
- response = new NullPacket();
}
else if (type == REQ_GETCLIENTID)
{
@@ -603,8 +597,6 @@
{
SetClientIDMessage message = (SetClientIDMessage) packet;
setClientID(message.getClientID());
-
- response = new NullPacket();
} else
{
response = new JMSExceptionMessage(new MessagingJMSException(
@@ -612,12 +604,16 @@
}
// reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
if (response != null)
{
response.normalize(packet);
sender.send(response);
}
-
} catch (JMSException e)
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -44,7 +44,6 @@
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
-import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -569,14 +568,10 @@
} else if (type == PacketType.MSG_CLOSING)
{
closing();
-
- response = new NullPacket();
} else if (type == MSG_CLOSE)
{
close();
setReplier(null);
-
- response = new NullPacket();
} else
{
response = new JMSExceptionMessage(new MessagingJMSException(
@@ -584,6 +579,11 @@
}
// reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
if (response != null)
{
response.normalize(packet);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -1540,37 +1540,26 @@
else if (type == PacketType.MSG_CLOSING)
{
closing();
-
- response = new NullPacket();
} else if (type == MSG_CLOSE)
{
close();
-
- response = new NullPacket();
} else if (type == MSG_UNSUBSCRIBE)
{
UnsubscribeMessage message = (UnsubscribeMessage) packet;
unsubscribe(message.getSubscriptionName());
-
- response = new NullPacket();
} else if (type == MSG_ADDTEMPORARYDESTINATION)
{
AddTemporaryDestinationMessage message = (AddTemporaryDestinationMessage) packet;
addTemporaryDestination(message.getDestination());
-
- response = new NullPacket();
} else if (type == MSG_DELETETEMPORARYDESTINATION)
{
DeleteTemporaryDestinationMessage message = (DeleteTemporaryDestinationMessage) packet;
deleteTemporaryDestination(message.getDestination());
-
- response = new NullPacket();
}
else if (type == PacketType.MSG_ACKNOWLEDGE)
{
SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
acknowledge(message.getDeliveryID(), message.isAllUpTo());
- response = new NullPacket();
}
else if (type == PacketType.MSG_COMMIT)
{
@@ -1592,12 +1581,16 @@
}
// reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
if (response != null)
{
response.normalize(packet);
sender.send(response);
}
-
} catch (JMSException e)
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
Modified: trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -72,8 +72,6 @@
reference.getDeliveryCount() + 1);
message.setTargetID(consumerID);
-
- message.setVersion((byte)0);
sender.send(message);
}
Added: trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,19 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface ConnectionExceptionListener
+{
+ void handleConnectionException(Exception e, String sessionID);
+
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,24 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface KeepAliveFactory
+{
+
+ Ping ping();
+
+ Pong pong();
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -13,6 +13,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
import org.jboss.messaging.util.Logger;
/**
@@ -33,6 +34,7 @@
// Static --------------------------------------------------------
public static final PacketDispatcher client = new PacketDispatcher();
+ public static final Map<String, String> sessions = new ConcurrentHashMap<String, String>();
// Constructors --------------------------------------------------
@@ -77,6 +79,18 @@
public void dispatch(AbstractPacket packet, PacketSender sender)
{
+ //FIXME better separation between client and server PacketDispatchers
+ if (this != client)
+ {
+ if (packet instanceof SetSessionIDMessage)
+ {
+ String clientSessionID = ((SetSessionIDMessage)packet).getSessionID();
+ if (log.isDebugEnabled())
+ log.debug("associated server session " + sender.getSessionID() + " to client " + clientSessionID);
+ sessions.put(sender.getSessionID(), clientSessionID);
+ return;
+ }
+ }
String targetID = packet.getTargetID();
if (NO_ID_SET.equals(targetID))
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -17,4 +17,6 @@
public interface PacketSender
{
void send(AbstractPacket packet);
+
+ String getSessionID();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -10,7 +10,6 @@
import static org.jboss.messaging.core.remoting.codec.DecoderStatus.NOT_OK;
import static org.jboss.messaging.core.remoting.codec.DecoderStatus.OK;
import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_ID_SET;
-import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_VERSION_SET;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -38,6 +37,8 @@
public static final byte FALSE = (byte) 1;
+ public static final int BOOLEAN_LENGTH = 1;
+
public static final int INT_LENGTH = 4;
public static final int FLOAT_LENGTH = 4;
@@ -101,12 +102,6 @@
assert packet != null;
assert buf != null;
- byte version = packet.getVersion();
- if (version == NO_VERSION_SET)
- {
- throw new IllegalStateException("packet must be versioned: " + packet);
- }
-
long correlationID = packet.getCorrelationID();
// to optimize the size of the packets, if the targetID
// or the callbackID are not set, they are encoded as null
@@ -121,14 +116,14 @@
{
callbackID = null;
}
- int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID);
+ int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + BOOLEAN_LENGTH;
buf.put(packet.getType().byteValue());
- buf.put(version);
buf.putInt(headerLength);
buf.putLong(correlationID);
buf.putNullableString(targetID);
buf.putNullableString(callbackID);
+ buf.putBoolean(packet.isOneWay());
encodeBody(packet, buf);
}
@@ -149,9 +144,9 @@
public DecoderStatus decodable(RemotingBuffer buffer)
{
- if (buffer.remaining() < 2)
+ if (buffer.remaining() < 1)
{
- // can not read packet type & version
+ // can not read packet type
return NEED_DATA;
}
byte t = buffer.get();
@@ -159,7 +154,6 @@
{
return NOT_OK;
}
- buffer.get(); // version
if (buffer.remaining() < INT_LENGTH)
{
if (log.isDebugEnabled())
@@ -189,7 +183,7 @@
{
return NOT_OK;
}
-
+ buffer.getBoolean(); // oneWay boolean
if (buffer.remaining() < INT_LENGTH)
{
if (log.isDebugEnabled())
@@ -214,19 +208,18 @@
public P decode(RemotingBuffer wrapper) throws Exception
{
wrapper.get(); // skip message type
- byte version = wrapper.get();
wrapper.getInt(); // skip header length
long correlationID = wrapper.getLong();
String targetID = wrapper.getNullableString();
String callbackID = wrapper.getNullableString();
-
+ boolean oneWay = wrapper.getBoolean();
+
P packet = decodeBody(wrapper);
if (packet == null)
{
return null;
}
- packet.setVersion(version);
if (targetID == null)
targetID = NO_ID_SET;
packet.setTargetID(targetID);
@@ -234,6 +227,7 @@
if (callbackID == null)
callbackID = NO_ID_SET;
packet.setCallbackID(callbackID);
+ packet.setOneWay(oneWay);
return packet;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -38,6 +38,10 @@
throws Exception
{
byte[] bytes = packet.getBytes();
+
+ int bodyLength = INT_LENGTH + bytes.length;
+
+ out.putInt(bodyLength);
out.putInt(bytes.length);
out.put(bytes);
}
@@ -51,8 +55,8 @@
{
return null;
}
-
- byte[] bytes = new byte[bodyLength];
+ int byteLength = in.getInt();
+ byte[] bytes = new byte[byteLength];
in.get(bytes);
return new BytesPacket(bytes);
Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETSESSIONID;
+
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class SetSessionIDMessageCodec extends
+ AbstractPacketCodec<SetSessionIDMessage>
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SetSessionIDMessageCodec()
+ {
+ super(MSG_SETSESSIONID);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(SetSessionIDMessage message, RemotingBuffer out)
+ throws Exception
+ {
+ String sessionID = message.getSessionID();
+
+ out.putInt(sizeof(sessionID));
+ out.putNullableString(sessionID);
+ }
+
+ @Override
+ protected SetSessionIDMessage decodeBody(RemotingBuffer in) throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ String sessionID = in.getNullableString();
+
+ return new SetSessionIDMessage(sessionID);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -99,26 +99,22 @@
return session.getID();
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.remoting.Client#sendOneWay(org.jboss.messaging.core.remoting.wireformat.AbstractPacket)
- */
public void sendOneWay(AbstractPacket packet) throws JMSException
{
assert packet != null;
checkConnected();
-
+ packet.setOneWay(true);
+
session.write(packet);
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.remoting.Client#sendBlocking(org.jboss.messaging.core.remoting.wireformat.AbstractPacket)
- */
public AbstractPacket sendBlocking(AbstractPacket packet)
throws IOException, JMSException
{
assert packet != null;
checkConnected();
+ packet.setOneWay(false);
try
{
AbstractPacket response = (AbstractPacket) session.writeAndBlock(packet,
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -70,11 +70,15 @@
serverDispatcher.dispatch((AbstractPacket) object,
new PacketSender()
{
-
public void send(AbstractPacket response)
{
PacketDispatcher.client.dispatch(response, null);
}
+
+ public String getSessionID()
+ {
+ return getID();
+ }
});
}
@@ -91,9 +95,17 @@
{
responses[0] = response;
}
+
+ public String getSessionID()
+ {
+ return getID();
+ }
});
- assert responses[0] != null;
+ if (responses[0] == null)
+ {
+ throw new IllegalStateException("No response received for request " + request);
+ }
return responses[0];
}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ClientKeepAliveFactory implements KeepAliveFactory
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // KeepAliveFactory implementation -------------------------------
+
+ public Ping ping()
+ {
+ return new Ping();
+ }
+
+ public Pong pong()
+ {
+ return new Pong();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,6 +6,7 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
+import static org.apache.mina.filter.keepalive.KeepAlivePolicy.EXCEPTION;
import static org.apache.mina.filter.logging.LogLevel.TRACE;
import static org.apache.mina.filter.logging.LogLevel.WARN;
@@ -15,9 +16,11 @@
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filter.logging.MdcInjectionFilter;
import org.apache.mina.filter.reqres.RequestResponseFilter;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -37,6 +40,24 @@
// Public --------------------------------------------------------
+ public static void addKeepAliveFilter(DefaultIoFilterChainBuilder filterChain,
+ KeepAliveFactory factory, int keepAliveInterval, int keepAliveTimeout)
+ {
+ assert filterChain != null;
+ assert factory != null;
+
+ if (keepAliveTimeout > keepAliveInterval)
+ {
+ throw new IllegalArgumentException("timeout must be greater than the interval: "
+ + "keepAliveTimeout= " + keepAliveTimeout
+ + ", keepAliveInterval=" + keepAliveInterval);
+ }
+
+ filterChain.addLast("keep-alive", new KeepAliveFilter(
+ new MinaKeepAliveFactory(factory), EXCEPTION, keepAliveInterval,
+ keepAliveTimeout));
+ }
+
// Package protected ---------------------------------------------
static void addCodecFilter(DefaultIoFilterChainBuilder filterChain)
@@ -73,23 +94,24 @@
filterChain.addLast("logger", filter);
}
-
+
static void addExecutorFilter(DefaultIoFilterChainBuilder filterChain)
{
ExecutorFilter executorFilter = new ExecutorFilter();
filterChain.addLast("executor", executorFilter);
}
-
+
static ScheduledExecutorService addBlockingRequestResponseFilter(
DefaultIoFilterChainBuilder filterChain)
{
assert filterChain != null;
- ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
+ ScheduledExecutorService executorService = Executors
+ .newScheduledThreadPool(1);
RequestResponseFilter filter = new RequestResponseFilter(
- new MinaInspector(), executorService);
+ new MinaInspector(), executorService);
filterChain.addLast("reqres", filter);
-
+
return executorService;
}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,21 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.concurrent.TimeoutException;
+
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface KeepAliveNotifier
+{
+ public abstract void notifyKeepAliveTimeout(TimeoutException e, String remoteSessionID);
+}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -10,14 +10,19 @@
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addBlockingRequestResponseFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.ConnectFuture;
@@ -28,19 +33,24 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
import org.jboss.messaging.core.remoting.NIOConnector;
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.ServerLocator;
import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
import org.jboss.messaging.util.Logger;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
-public class MinaConnector implements NIOConnector
+public class MinaConnector implements NIOConnector, KeepAliveNotifier
{
// Constants -----------------------------------------------------
@@ -51,14 +61,16 @@
private String host;
private int port;
-
+
private NioSocketConnector connector;
private ScheduledExecutorService blockingScheduler;
private IoSession session;
+ // FIXME clean up this listener mess
private Map<ConsolidatedRemotingConnectionListener, IoServiceListener> listeners = new HashMap<ConsolidatedRemotingConnectionListener, IoServiceListener>();
+ private ConnectionExceptionListener listener;
// Static --------------------------------------------------------
@@ -66,33 +78,68 @@
// Public --------------------------------------------------------
+ public MinaConnector(ServerLocator locator)
+ {
+ this(locator.getTransport(), locator.getHost(), locator.getPort(),
+ locator.getParameters(), new ClientKeepAliveFactory());
+ }
+
+ public MinaConnector(ServerLocator locator, KeepAliveFactory keepAliveFactory)
+ {
+ this(locator.getTransport(), locator.getHost(), locator.getPort(),
+ locator.getParameters(), keepAliveFactory);
+ }
+
public MinaConnector(TransportType transport, String host, int port)
{
+ this(transport, host, port, new HashMap<String, String>(),
+ new ClientKeepAliveFactory());
+ }
+
+ private MinaConnector(TransportType transport, String host, int port,
+ Map<String, String> parameters, KeepAliveFactory keepAliveFactory)
+ {
assert transport == TCP;
assert host != null;
assert port > 0;
-
+ assert parameters != null;
+ assert keepAliveFactory != null;
+
this.host = host;
this.port = port;
-
+
this.connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
-
+
+ // FIXME no hard-coded values
+ int keepAliveInterval = parameters.containsKey(KEEP_ALIVE_INTERVAL_KEY) ? Integer
+ .parseInt(parameters.get(KEEP_ALIVE_INTERVAL_KEY))
+ : 10;
+ int keepAliveTimeout = parameters.containsKey(KEEP_ALIVE_TIMEOUT_KEY) ? Integer
+ .parseInt(parameters.get(KEEP_ALIVE_TIMEOUT_KEY))
+ : 5;
+
addMDCFilter(filterChain);
addCodecFilter(filterChain);
addLoggingFilter(filterChain);
blockingScheduler = addBlockingRequestResponseFilter(filterChain);
+ addKeepAliveFilter(filterChain, keepAliveFactory, keepAliveInterval,
+ keepAliveTimeout);
addExecutorFilter(filterChain);
- connector.setHandler(new MinaHandler(PacketDispatcher.client));
+ connector.setHandler(new MinaHandler(PacketDispatcher.client, this));
connector.getSessionConfig().setKeepAlive(true);
connector.getSessionConfig().setReuseAddress(true);
}
// NIOConnector implementation -----------------------------------
-
- public NIOSession connect() throws IOException
+
+ public NIOSession connect() throws IOException
{
+ if (session != null)
+ {
+ return new MinaSession(session);
+ }
InetSocketAddress address = new InetSocketAddress(host, port);
ConnectFuture future = connector.connect(address);
connector.setDefaultRemoteAddress(address);
@@ -103,9 +150,13 @@
throw new IOException("Cannot connect to " + address.toString());
}
this.session = future.getSession();
+ AbstractPacket packet = new SetSessionIDMessage(Long.toString(session
+ .getId()));
+ session.write(packet);
+
return new MinaSession(session);
}
-
+
public boolean disconnect()
{
if (session == null)
@@ -118,14 +169,14 @@
connector.dispose();
blockingScheduler.shutdown();
-
+
connector = null;
blockingScheduler = null;
session = null;
return closed;
}
-
+
public void addConnectionListener(
final ConsolidatedRemotingConnectionListener listener)
{
@@ -139,8 +190,9 @@
if (log.isTraceEnabled())
log.trace("added listener " + listener + " to " + this);
}
-
- public void removeConnectionListener(ConsolidatedRemotingConnectionListener listener)
+
+ public void removeConnectionListener(
+ ConsolidatedRemotingConnectionListener listener)
{
assert listener != null;
assert connector != null;
@@ -151,7 +203,7 @@
if (log.isTraceEnabled())
log.trace("removed listener " + listener + " from " + this);
}
-
+
public String getServerURI()
{
if (connector == null)
@@ -162,11 +214,34 @@
if (address != null)
{
return TCP + "://" + address.toString();
- } else {
+ } else
+ {
return TCP + "://" + host + ":" + port;
}
}
+ public void setConnectionExceptionListener(ConnectionExceptionListener listener)
+ {
+ assert listener != null;
+
+ this.listener = listener;
+ }
+
+ // KeepAliveManager implementation -------------------------------
+
+ public void notifyKeepAliveTimeout(TimeoutException cause, String remoteSessionID)
+ {
+ if (listener != null)
+ listener.handleConnectionException(cause, remoteSessionID);
+
+ Iterator<ConsolidatedRemotingConnectionListener> set = listeners.keySet().iterator();
+ while (set.hasNext())
+ {
+ ConsolidatedRemotingConnectionListener listener = set.next();
+ listener.handleConnectionException(cause);
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,12 +6,16 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
+import java.util.concurrent.TimeoutException;
+
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.keepalive.KeepAliveTimeoutException;
import org.apache.mina.filter.reqres.Response;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
import org.jboss.messaging.util.Logger;
/**
@@ -30,13 +34,16 @@
private final PacketDispatcher dispatcher;
+ private KeepAliveNotifier keepAliveManager;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public MinaHandler(PacketDispatcher dispatcher)
+ public MinaHandler(PacketDispatcher dispatcher, KeepAliveNotifier keepAliveManager)
{
this.dispatcher = dispatcher;
+ this.keepAliveManager = keepAliveManager;
}
// Public --------------------------------------------------------
@@ -47,6 +54,13 @@
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception
{
+ if (cause instanceof KeepAliveTimeoutException && keepAliveManager != null)
+ {
+ String serverSessionID = Long.toString(session.getId());
+ TimeoutException e = new TimeoutException();
+ e.initCause(cause);
+ keepAliveManager.notifyKeepAliveTimeout(e, serverSessionID);
+ }
// FIXME ugly way to know we're on the server side
// close session only on the server side
if (dispatcher != PacketDispatcher.client)
@@ -66,6 +80,14 @@
// do nothing
return;
}
+
+ if (message instanceof Ping)
+ {
+ log.trace("received ping " + message);
+ // response is handled by the keep-alive filter.
+ // do nothing
+ return;
+ }
if (!(message instanceof AbstractPacket))
{
@@ -79,6 +101,11 @@
{
session.write(p);
}
+
+ public String getSessionID()
+ {
+ return Long.toString(session.getId());
+ }
};
if (log.isTraceEnabled())
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaKeepAliveFactory implements KeepAliveMessageFactory
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private KeepAliveFactory innerFactory;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public MinaKeepAliveFactory(KeepAliveFactory factory)
+ {
+ assert factory != null;
+
+ this.innerFactory = factory;
+ }
+
+ // Public --------------------------------------------------------
+
+ // KeepAliveMessageFactory implementation ------------------------
+
+ public Object getRequest(IoSession session)
+ {
+ return innerFactory.ping();
+ }
+
+ public Object getResponse(IoSession session, Object request)
+ {
+ return innerFactory.pong();
+ }
+
+ public boolean isRequest(IoSession session, Object request)
+ {
+ return (request instanceof Ping);
+ }
+
+ public boolean isResponse(IoSession session, Object response)
+ {
+ return (response instanceof Pong);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -9,15 +9,19 @@
import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.ServerLocator;
import org.jboss.messaging.core.remoting.TransportType;
@@ -25,16 +29,20 @@
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
-public class MinaService
+public class MinaService implements KeepAliveNotifier
{
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(MinaService.class);
+ public static final String KEEP_ALIVE_INTERVAL_KEY = "keepAliveInterval";
+ public static final String KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeout";
+ public static final String DISABLE_INVM_KEY = "disable-invm";
+
// Attributes ----------------------------------------------------
private TransportType transport;
@@ -42,68 +50,107 @@
private final String host;
private final int port;
-
+
private Map<String, String> parameters;
-
+
private NioSocketAcceptor acceptor;
- private int blockingRequestTimeout = 5;
-
private PacketDispatcher dispatcher;
+ private ConnectionExceptionListener listener;
+
+ private KeepAliveFactory factory;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public MinaService(String transport, String host, int port)
{
- this(TransportType.valueOf(transport.toUpperCase()), host, port);
+ this(TransportType.valueOf(transport.toUpperCase()), host, port, new ServerKeepAliveFactory());
}
-
- public MinaService(TransportType transport, String host, int port)
+
+ public MinaService(TransportType transport, String host, int port, KeepAliveFactory factory)
{
assert transport != null;
- assert host != null;
+ assert host != null;
assert port > 0;
-
+ assert factory != null;
+
this.transport = transport;
this.host = host;
this.port = port;
this.parameters = new HashMap<String, String>();
+ this.factory = factory;
this.dispatcher = new PacketDispatcher();
}
+ public void setConnectionExceptionListener(ConnectionExceptionListener listener)
+ {
+ assert listener != null;
+
+ this.listener = listener;
+ }
+
+ // KeepAliveManager implementation -------------------------------
+
+ public void notifyKeepAliveTimeout(TimeoutException e, String remoteSessionID)
+ {
+ if (listener != null)
+ {
+ String clientSessionID = PacketDispatcher.sessions.get(remoteSessionID);
+ listener.handleConnectionException(e, clientSessionID);
+ }
+ }
+
// Public --------------------------------------------------------
-
+
public void setParameters(Map<String, String> parameters)
{
assert parameters != null;
-
+
this.parameters = parameters;
}
+ public void setKeepAliveFactory(KeepAliveFactory factory)
+ {
+ assert factory != null;
+
+ this.factory = factory;
+ }
+
public ServerLocator getLocator()
{
return new ServerLocator(transport, host, port, parameters);
}
-
+
public PacketDispatcher getDispatcher()
{
return dispatcher;
}
-
+
public void start() throws Exception
{
if (acceptor == null)
{
acceptor = new NioSocketAcceptor();
DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
-
+
addMDCFilter(filterChain);
addCodecFilter(filterChain);
addLoggingFilter(filterChain);
+ if (parameters.containsKey(KEEP_ALIVE_INTERVAL_KEY)
+ && parameters.containsKey(KEEP_ALIVE_TIMEOUT_KEY))
+ {
+ int keepAliveInterval = Integer.parseInt(parameters
+ .get(KEEP_ALIVE_INTERVAL_KEY));
+ int keepAliveTimeout = Integer.parseInt(parameters
+ .get(KEEP_ALIVE_TIMEOUT_KEY));
+ addKeepAliveFilter(filterChain, factory,
+ keepAliveInterval, keepAliveTimeout);
+ }
addExecutorFilter(filterChain);
-
+
// Bind
acceptor.setLocalAddress(new InetSocketAddress(host, port));
acceptor.setReuseAddress(true);
@@ -111,11 +158,17 @@
acceptor.getSessionConfig().setKeepAlive(true);
acceptor.setDisconnectOnUnbind(false);
- acceptor.setHandler(new MinaHandler(dispatcher));
+ acceptor.setHandler(new MinaHandler(dispatcher, this));
acceptor.bind();
-
- REGISTRY.register(getLocator(), dispatcher);
- }
+
+ boolean disableInvm = false;
+ if (parameters.containsKey(DISABLE_INVM_KEY))
+ disableInvm = Boolean.valueOf(parameters.get(DISABLE_INVM_KEY)).booleanValue();
+ if (log.isDebugEnabled())
+ log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
+ if (!disableInvm)
+ REGISTRY.register(getLocator(), dispatcher);
+ }
}
public void stop()
@@ -124,12 +177,13 @@
{
acceptor.unbind();
acceptor.dispose();
+ System.err.println(acceptor.isDisposed());
acceptor = null;
-
+
REGISTRY.unregister(getLocator());
- }
+ }
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -7,6 +7,8 @@
package org.jboss.messaging.core.remoting.impl.mina;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.jboss.messaging.core.remoting.codec.AbstractPacketCodec;
@@ -36,6 +38,7 @@
import org.jboss.messaging.core.remoting.codec.SessionCancelMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
import org.jboss.messaging.core.remoting.codec.SetClientIDMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SetSessionIDMessageCodec;
import org.jboss.messaging.core.remoting.codec.TextPacketCodec;
import org.jboss.messaging.core.remoting.codec.UnsubscribeMessageCodec;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -68,6 +71,8 @@
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
@@ -75,6 +80,7 @@
import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -107,6 +113,10 @@
addCodec(TextPacket.class, TextPacketCodec.class);
addCodec(BytesPacket.class, BytesPacketCodec.class);
+ addCodecForEmptyPacket(PING, Ping.class);
+ addCodecForEmptyPacket(PONG, Pong.class);
+ addCodec(SetSessionIDMessage.class, SetSessionIDMessageCodec.class);
+
addCodec(CreateConnectionRequest.class,
ConnectionFactoryCreateConnectionRequestCodec.class);
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ServerKeepAliveFactory implements KeepAliveFactory
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // KeepAliveFactory implementation -------------------------------
+
+ public Ping ping()
+ {
+ return new Ping();
+ }
+
+ public Pong pong()
+ {
+ return new Pong();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -8,6 +8,8 @@
import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import org.jboss.messaging.core.remoting.Client;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
*
@@ -21,12 +23,8 @@
public static final long NO_CORRELATION_ID = -1L;
- public static final byte NO_VERSION_SET = (byte)-1;
-
// Attributes ----------------------------------------------------
- private byte version = NO_VERSION_SET;
-
private long correlationID = NO_CORRELATION_ID;
private String targetID = NO_ID_SET;
@@ -35,6 +33,14 @@
private final PacketType type;
+ /**
+ * <code>oneWay</code> is <code>true</code> when the packet is sent "one way"
+ * by the client which does not expect any response to it.
+ *
+ * @see Client#sendOneWay(AbstractPacket)
+ */
+ private boolean oneWay = false;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -53,18 +59,6 @@
return type;
}
- public void setVersion(byte version)
- {
- assert version != NO_VERSION_SET;
-
- this.version = version;
- }
-
- public byte getVersion()
- {
- return version;
- }
-
public void setCorrelationID(long correlationID)
{
this.correlationID = correlationID;
@@ -99,11 +93,20 @@
return callbackID;
}
+ public void setOneWay(boolean oneWay)
+ {
+ this.oneWay = oneWay;
+ }
+
+ public boolean isOneWay()
+ {
+ return oneWay;
+ }
+
public void normalize(AbstractPacket other)
{
assert other != null;
- setVersion(other.getVersion());
setCorrelationID(other.getCorrelationID());
setTargetID(other.getCallbackID());
}
@@ -126,9 +129,9 @@
protected String getParentString()
{
- return "PACKET[type=" + type + ", version=" + version
+ return "PACKET[type=" + type
+ ", correlationID=" + correlationID + ", targetID=" + targetID
- + ", callbackID=" + callbackID;
+ + ", callbackID=" + callbackID + ", oneWay=" + oneWay;
}
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -17,6 +17,9 @@
MSG_JMSEXCEPTION ((byte) 2),
TEXT ((byte) 3),
BYTES ((byte) 4),
+ PING ((byte) 5),
+ PONG ((byte) 6),
+ MSG_SETSESSIONID ((byte) 7),
// Connection factory
REQ_CREATECONNECTION ((byte)10),
Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class Ping extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public Ping()
+ {
+ super(PING);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class Pong extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public Pong()
+ {
+ super(PONG);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,7 +6,9 @@
*/
package org.jboss.messaging.core.remoting.wireformat;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCEL;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -28,7 +30,7 @@
public SessionCancelMessage(long deliveryID, boolean expired)
{
- super(PacketType.MSG_CANCEL);
+ super(MSG_CANCEL);
this.deliveryID = deliveryID;
@@ -47,6 +49,12 @@
return expired;
}
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", deliveryID=" + deliveryID + ", expired=" + expired + "]";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,7 +6,9 @@
*/
package org.jboss.messaging.core.remoting.wireformat;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVER;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -24,7 +26,7 @@
public SessionRecoverMessage()
{
- super(PacketType.MSG_RECOVER);
+ super(MSG_RECOVER);
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,7 +6,9 @@
*/
package org.jboss.messaging.core.remoting.wireformat;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ROLLBACK;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -24,7 +26,7 @@
public SessionRollbackMessage()
{
- super(PacketType.MSG_ROLLBACK);
+ super(MSG_ROLLBACK);
}
// Public --------------------------------------------------------
Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETSESSIONID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SetSessionIDMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String sessionID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SetSessionIDMessage(String sessionID)
+ {
+ super(MSG_SETSESSIONID);
+
+ assertValidID(sessionID);
+ this.sessionID = sessionID;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getSessionID()
+ {
+ return sessionID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", sessionID=" + sessionID + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/build.xml 2008-01-28 14:35:41 UTC (rev 3636)
@@ -512,7 +512,7 @@
<exclude name="**/jms/MemLeakTest.class"/>
<exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
<exclude name="**/jms/stress/**"/>
- <exclude name="**/jms/crash/**"/>
+ <exclude name="**/jms/crash/ClientCrashTest.class"/>
<exclude name="**/jms/bridge/**"/>
<exclude name="**/jms/manual/**"/>
<exclude name="**/jms/clustering/**"/>
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -41,6 +41,8 @@
protected PacketDispatcher serverDispatcher;
+ private NIOConnector connector;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -49,7 +51,8 @@
public void testConnected() throws Exception
{
- Client client = new ClientImpl(createNIOConnector(), createServerLocator());
+ NIOConnector connector = createNIOConnector();
+ Client client = new ClientImpl(connector, createServerLocator());
assertFalse(client.isConnected());
@@ -59,6 +62,8 @@
assertTrue(client.disconnect());
assertFalse(client.isConnected());
assertFalse(client.disconnect());
+
+ connector.disconnect();
}
public void testSendOneWay() throws Exception
@@ -66,7 +71,6 @@
serverPacketHandler.expectMessage(1);
TextPacket packet = new TextPacket("testSendOneWay");
- packet.setVersion((byte) 1);
packet.setTargetID(serverPacketHandler.getID());
client.sendOneWay(packet);
@@ -86,7 +90,6 @@
for (int i = 0; i < MANY_MESSAGES; i++)
{
packets[i] = new TextPacket("testSendManyOneWay " + i);
- packets[i].setVersion((byte) 1);
packets[i].setTargetID(serverPacketHandler.getID());
client.sendOneWay(packets[i]);
}
@@ -110,7 +113,6 @@
PacketDispatcher.client.register(callbackHandler);
TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
- packet.setVersion((byte) 1);
packet.setTargetID(serverPacketHandler.getID());
packet.setCallbackID(callbackHandler.getID());
@@ -126,7 +128,6 @@
public void testSendBlocking() throws Exception
{
TextPacket request = new TextPacket("testSendBlocking");
- request.setVersion((byte) 1);
request.setTargetID(serverPacketHandler.getID());
AbstractPacket receivedPacket = client.sendBlocking(request);
@@ -140,7 +141,6 @@
public void testCorrelationCounter() throws Exception
{
TextPacket request = new TextPacket("testSendBlocking");
- request.setVersion((byte) 1);
request.setTargetID(serverPacketHandler.getID());
AbstractPacket receivedPacket = client.sendBlocking(request);
@@ -164,7 +164,6 @@
TextPacket packet = new TextPacket(
"testClientHandlePacketSentByServer from client");
- packet.setVersion((byte) 1);
packet.setTargetID(serverPacketHandler.getID());
// send a packet to create a sender when the server
// handles the packet
@@ -176,7 +175,6 @@
PacketSender sender = serverPacketHandler.getLastSender();
TextPacket packetFromServer = new TextPacket(
"testClientHandlePacketSentByServer from server");
- packetFromServer.setVersion((byte) 1);
packetFromServer.setTargetID(clientHandler.getID());
sender.send(packetFromServer);
@@ -196,7 +194,7 @@
serverDispatcher = startServer();
ServerLocator serverLocator = createServerLocator();
- NIOConnector connector = createNIOConnector();
+ connector = createNIOConnector();
client = new ClientImpl(connector, serverLocator);
client.connect();
@@ -209,6 +207,7 @@
{
serverDispatcher.unregister(serverPacketHandler.getID());
+ connector.disconnect();
client.disconnect();
stopServer();
Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,263 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_INTERVAL;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ClientKeepAliveTest extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MinaService service;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ service = new MinaService(TCP.toString(), "localhost", PORT);
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(KEEP_ALIVE_INTERVAL_KEY, Integer.toString(KEEP_ALIVE_INTERVAL));
+ parameters.put(KEEP_ALIVE_TIMEOUT_KEY, Integer.toString(KEEP_ALIVE_TIMEOUT));
+ service.setParameters(parameters);
+ service.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service.stop();
+ service = null;
+ }
+
+ public void testKeepAliveWithClientOK() throws Exception
+ {
+ KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+ // client never send ping
+ expect(factory.ping()).andStubReturn(null);
+ // client is responding
+ expect(factory.pong()).andReturn(new Pong()).atLeastOnce();
+
+ replay(factory);
+
+ MinaConnector connector = new MinaConnector(service.getLocator(), factory);
+ connector.connect();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ service.setConnectionExceptionListener(new ConnectionExceptionListener()
+ {
+ public void handleConnectionException(Exception e, String sessionID)
+ {
+ latch.countDown();
+ }
+ });
+
+ boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+ + KEEP_ALIVE_TIMEOUT + 1, SECONDS);
+ assertFalse(firedKeepAliveNotification);
+
+ connector.disconnect();
+
+ verify(factory);
+ }
+
+ public void testKeepAliveWithClientNotResponding() throws Exception
+ {
+ KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+ // client never send ping
+ expect(factory.ping()).andStubReturn(null);
+ // no pong -> client is not responding
+ expect(factory.pong()).andReturn(null).atLeastOnce();
+
+ replay(factory);
+
+ MinaConnector connector = new MinaConnector(service.getLocator(), factory);
+
+ NIOSession session = connector.connect();
+ String clientSessionID = session.getID();
+
+ final String[] clientSessionIDNotResponding = new String[1];
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ service.setConnectionExceptionListener(new ConnectionExceptionListener()
+ {
+ public void handleConnectionException(Exception e, String sessionID)
+ {
+ clientSessionIDNotResponding[0] = sessionID;
+ latch.countDown();
+ }
+ });
+
+ boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+ + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+ assertTrue("notification has not been received", firedKeepAliveNotification);
+ assertNotNull(clientSessionIDNotResponding[0]);
+ assertEquals(clientSessionID, clientSessionIDNotResponding[0]);
+
+ connector.disconnect();
+
+ verify(factory);
+ }
+
+ public void testKeepAliveWithClientTooLongToRespond() throws Exception
+ {
+ KeepAliveFactory factory = new KeepAliveFactory()
+ {
+ public Ping ping()
+ {
+ return null;
+ }
+
+ public synchronized Pong pong()
+ {
+ // like a TCP timeout, there is no response in the next 2 hours
+ try
+ {
+ wait(2 * 3600);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ return new Pong();
+ }
+ };
+
+ try
+ {
+ MinaConnector connector = new MinaConnector(service.getLocator(),
+ factory);
+
+ NIOSession session = connector.connect();
+ String clientSessionID = session.getID();
+
+ final String[] clientSessionIDNotResponding = new String[1];
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ service.setConnectionExceptionListener(new ConnectionExceptionListener()
+ {
+ public void handleConnectionException(Exception e, String sessionID)
+ {
+ clientSessionIDNotResponding[0] = sessionID;
+ latch.countDown();
+ }
+ });
+
+ boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+ + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+ assertTrue("notification has not been received", firedKeepAliveNotification);
+ assertNotNull(clientSessionIDNotResponding[0]);
+ assertEquals(clientSessionID, clientSessionIDNotResponding[0]);
+
+ connector.disconnect();
+
+ } finally
+ {
+ // test is done: wake up the factory
+ synchronized (factory)
+ {
+ factory.notify();
+ }
+ }
+ }
+
+ public void testKeepAliveWithClientRespondingAndClientNotResponding()
+ throws Exception
+ {
+ KeepAliveFactory notRespondingfactory = createMock(KeepAliveFactory.class);
+ expect(notRespondingfactory.ping()).andStubReturn(null);
+ expect(notRespondingfactory.pong()).andReturn(null).atLeastOnce();
+
+ KeepAliveFactory respondingfactory = createMock(KeepAliveFactory.class);
+ expect(respondingfactory.ping()).andStubReturn(null);
+ expect(respondingfactory.pong()).andReturn(new Pong()).atLeastOnce();
+
+ replay(notRespondingfactory, respondingfactory);
+
+ MinaConnector connectorNotResponding = new MinaConnector(service
+ .getLocator(), notRespondingfactory);
+ MinaConnector connectorResponding = new MinaConnector(service
+ .getLocator(), respondingfactory);
+
+ NIOSession sessionNotResponding = connectorNotResponding.connect();
+ String clientSessionIDNotResponding = sessionNotResponding.getID();
+
+ NIOSession sessionResponding = connectorResponding.connect();
+ String clientSessionIDResponding = sessionResponding.getID();
+
+ final String[] sessionIDNotResponding = new String[1];
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ service.setConnectionExceptionListener(new ConnectionExceptionListener()
+ {
+ public void handleConnectionException(Exception e, String sessionID)
+ {
+ sessionIDNotResponding[0] = sessionID;
+ latch.countDown();
+ }
+ });
+
+ boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+ + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+ assertTrue("notification has not been received", firedKeepAliveNotification);
+
+ assertNotNull(sessionIDNotResponding[0]);
+ assertEquals(clientSessionIDNotResponding, sessionIDNotResponding[0]);
+ assertNotSame(clientSessionIDResponding, sessionIDNotResponding[0]);
+
+ connectorNotResponding.disconnect();
+ connectorResponding.disconnect();
+
+ verify(notRespondingfactory, respondingfactory);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static org.easymock.EasyMock.createMock;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class FilterChainSupportTest extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAddKeepAliveFilterWithIncorrectParameters() throws Exception
+ {
+ int keepAliveInterval = 5; // seconds
+ int keepAliveTimeout = 10; // seconds
+
+ DefaultIoFilterChainBuilder filterChain = new DefaultIoFilterChainBuilder();
+ KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+ try
+ {
+ FilterChainSupport.addKeepAliveFilter(filterChain, factory,
+ keepAliveInterval, keepAliveTimeout);
+ fail("the interval must be greater than the timeout");
+ } catch (IllegalArgumentException e)
+ {
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -50,8 +50,6 @@
AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
packet.setTargetID(serverPacketHandler.getID());
- packet.setVersion((byte) 1);
-
try
{
client.sendBlocking(packet);
@@ -78,7 +76,7 @@
@Override
protected PacketDispatcher startServer() throws Exception
{
- service = new MinaService(TCP, "localhost", PORT);
+ service = new MinaService(TCP.toString(), "localhost", PORT);
service.start();
return service.getDispatcher();
}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -73,7 +73,7 @@
@Override
protected void setUp() throws Exception
{
- handler = new MinaHandler(PacketDispatcher.client);
+ handler = new MinaHandler(PacketDispatcher.client, null);
packetHandler = new TestPacketHandler();
PacketDispatcher.client.register(packetHandler);
Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_INTERVAL;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ServerKeepAliveTest extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MinaService service;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service.stop();
+ service = null;
+ }
+
+ public void testKeepAliveWithServerNotResponding() throws Exception
+ {
+ KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+ // server does not send ping
+ expect(factory.ping()).andStubReturn(null);
+ // no pong -> server is not responding
+ expect(factory.pong()).andReturn(null).atLeastOnce();
+
+ replay(factory);
+
+ service = new MinaService(TCP, "localhost", PORT, factory);
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(KEEP_ALIVE_INTERVAL_KEY, Integer.toString(KEEP_ALIVE_INTERVAL));
+ parameters.put(KEEP_ALIVE_TIMEOUT_KEY, Integer.toString(KEEP_ALIVE_TIMEOUT));
+ service.setParameters(parameters);
+ service.start();
+
+ MinaConnector connector = new MinaConnector(service.getLocator());
+ final String[] sessionIDNotResponding = new String[1];
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ connector.setConnectionExceptionListener(new ConnectionExceptionListener()
+ {
+ public void handleConnectionException(Exception e, String sessionID)
+ {
+ sessionIDNotResponding[0] = sessionID;
+ latch.countDown();
+ }
+ });
+
+ NIOSession session = connector.connect();
+
+ boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+ + KEEP_ALIVE_TIMEOUT + 1, SECONDS);
+ assertTrue(firedKeepAliveNotification);
+ assertEquals(session.getID(), sessionIDNotResponding[0]);
+
+ connector.disconnect();
+
+ verify(factory);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -18,10 +18,14 @@
public static final int MANY_MESSAGES = 500;
- // Attributes ----------------------------------------------------
+ public static final int KEEP_ALIVE_INTERVAL = 2; // in seconds
+ public static final int KEEP_ALIVE_TIMEOUT = 1; // in seconds
+
public static final int PORT = 9090;
+ // Attributes ----------------------------------------------------
+
// Static --------------------------------------------------------
public static String reverse(String text)
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -53,7 +53,7 @@
@Override
protected void setUp() throws Exception
{
- service = new MinaService(TCP, "localhost", PORT);
+ service = new MinaService(TCP.toString(), "localhost", PORT);
service.start();
connector = new MinaConnector(TCP, "localhost", PORT);
@@ -88,7 +88,6 @@
byte[] payloadBytes = generatePayload(PAYLOAD);
AbstractPacket packet = new BytesPacket(payloadBytes);
- packet.setVersion((byte) 19);
packet.setTargetID(handlerID);
long start = System.currentTimeMillis();
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,6 +6,7 @@
*/
package org.jboss.messaging.core.remoting.wireformat.test.unit;
+import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.BOOLEAN_LENGTH;
import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.FALSE;
import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.INT_LENGTH;
import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.LONG_LENGTH;
@@ -17,19 +18,28 @@
import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.NULL_STRING;
import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.UTF_8_ENCODER;
import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_ID_SET;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.BYTES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCEL;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CHANGERATE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_COMMIT;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELIVERMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_JMSEXCEPTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ROLLBACK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETSESSIONID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STOPCONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_HASNEXTMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_NEXTMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_NEXTMESSAGEBLOCK;
@@ -51,6 +61,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
import static org.jboss.messaging.core.remoting.wireformat.test.unit.CodecAssert.assertEqualsByteArrays;
import static org.jboss.messaging.test.unit.RandomUtil.randomByte;
+import static org.jboss.messaging.test.unit.RandomUtil.randomBytes;
import static org.jboss.messaging.test.unit.RandomUtil.randomLong;
import static org.jboss.messaging.test.unit.RandomUtil.randomString;
@@ -77,6 +88,7 @@
import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockResponseCodec;
import org.jboss.messaging.core.remoting.codec.BrowserNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionRequestCodec;
import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionResponseCodec;
import org.jboss.messaging.core.remoting.codec.ConsumerChangeRateMessageCodec;
@@ -93,8 +105,11 @@
import org.jboss.messaging.core.remoting.codec.GetClientIDResponseCodec;
import org.jboss.messaging.core.remoting.codec.JMSExceptionMessageCodec;
import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.codec.SessionAcknowledgeMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCancelMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
import org.jboss.messaging.core.remoting.codec.SetClientIDMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SetSessionIDMessageCodec;
import org.jboss.messaging.core.remoting.codec.TextPacketCodec;
import org.jboss.messaging.core.remoting.codec.UnsubscribeMessageCodec;
import org.jboss.messaging.core.remoting.impl.mina.PacketCodecFactory;
@@ -108,6 +123,7 @@
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
import org.jboss.messaging.core.remoting.wireformat.BrowserResetMessage;
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingMessage;
import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
@@ -128,12 +144,21 @@
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionRecoverMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
+import org.jboss.messaging.util.Logger;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -145,53 +170,46 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PacketTypeTest.class);
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
- private static void addVersion(AbstractPacket packet)
- {
- byte version = (byte) 19;
- packet.setVersion(version);
- }
-
private static ByteBuffer encode(int length, Object... args)
{
ByteBuffer buffer = ByteBuffer.allocate(length);
for (Object arg : args)
{
if (arg instanceof Byte)
- buffer.put(((Byte)arg).byteValue());
+ buffer.put(((Byte) arg).byteValue());
else if (arg instanceof Boolean)
{
Boolean bool = (Boolean) arg;
buffer.put(bool ? TRUE : FALSE);
- }
- else if (arg instanceof Integer)
- buffer.putInt(((Integer)arg).intValue());
+ } else if (arg instanceof Integer)
+ buffer.putInt(((Integer) arg).intValue());
else if (arg instanceof Long)
- buffer.putLong(((Long)arg).longValue());
+ buffer.putLong(((Long) arg).longValue());
else if (arg instanceof Float)
- buffer.putFloat(((Float)arg).floatValue());
+ buffer.putFloat(((Float) arg).floatValue());
else if (arg instanceof String)
- putNullableString((String)arg, buffer);
+ putNullableString((String) arg, buffer);
else if (arg == null)
putNullableString(null, buffer);
else if (arg instanceof byte[])
{
- byte[] b = (byte[])arg;
+ byte[] b = (byte[]) arg;
buffer.putInt(b.length);
buffer.put(b);
- }
- else if (arg instanceof long[])
+ } else if (arg instanceof long[])
{
- long[] longs = (long[])arg;
+ long[] longs = (long[]) arg;
for (long l : longs)
{
buffer.putLong(l);
}
- }
- else
+ } else
{
fail("no encoding defined for " + arg);
}
@@ -205,7 +223,7 @@
if (string == null)
{
buffer.put(NULL_STRING);
- } else
+ } else
{
buffer.put(NOT_NULL_STRING);
UTF_8_ENCODER.reset();
@@ -220,7 +238,6 @@
checkHeaderBytes(packet, buffer.buffer().buf());
assertEquals(buffer.get(), packet.getType().byteValue());
- assertEquals(buffer.get(), packet.getVersion());
String targetID = packet.getTargetID();
if (NO_ID_SET.equals(packet.getTargetID()))
@@ -229,7 +246,8 @@
if (NO_ID_SET.equals(packet.getCallbackID()))
callbackID = null;
- int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID);
+ int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+ + BOOLEAN_LENGTH;
assertEquals(buffer.getInt(), headerLength);
assertEquals(buffer.getLong(), packet.getCorrelationID());
@@ -239,28 +257,35 @@
String bufferCallbackID = buffer.getNullableString();
if (bufferCallbackID == null)
bufferCallbackID = NO_ID_SET;
+ boolean oneWay = buffer.getBoolean();
assertEquals(bufferTargetID, packet.getTargetID());
assertEquals(bufferCallbackID, packet.getCallbackID());
+ assertEquals(oneWay, packet.isOneWay());
}
private static void checkHeaderBytes(AbstractPacket packet, ByteBuffer actual)
{
- String targetID = (packet.getTargetID().equals(NO_ID_SET)? null : packet.getTargetID());
- String callbackID = (packet.getCallbackID().equals(NO_ID_SET)? null : packet.getCallbackID());
+ String targetID = (packet.getTargetID().equals(NO_ID_SET) ? null : packet
+ .getTargetID());
+ String callbackID = (packet.getCallbackID().equals(NO_ID_SET) ? null
+ : packet.getCallbackID());
- int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID);
- ByteBuffer expected = ByteBuffer.allocate(1 + 1 + INT_LENGTH + headerLength);
+ int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+ + BOOLEAN_LENGTH;
+ ByteBuffer expected = ByteBuffer.allocate(1 + 1 + INT_LENGTH
+ + headerLength);
expected.put(packet.getType().byteValue());
- expected.put(packet.getVersion());
-
+
expected.putInt(headerLength);
expected.putLong(packet.getCorrelationID());
putNullableString(targetID, expected);
putNullableString(callbackID, expected);
+ expected.put(packet.isOneWay() ? TRUE : FALSE);
expected.flip();
- assertEqualsByteArrays(expected.remaining(), expected.array(), actual.array());
+ assertEqualsByteArrays(expected.remaining(), expected.array(), actual
+ .array());
}
private static void checkBodyIsEmpty(RemotingBuffer buffer)
@@ -287,7 +312,6 @@
public void testNullPacket() throws Exception
{
NullPacket packet = new NullPacket();
- packet.setVersion(randomByte());
packet.setCallbackID(randomString());
packet.setCorrelationID(randomLong());
packet.setTargetID(randomString());
@@ -305,7 +329,6 @@
NullPacket p = (NullPacket) decodedPacket;
assertEquals(NULL, p.getType());
- assertEquals(packet.getVersion(), p.getVersion());
assertEquals(packet.getCallbackID(), p.getCallbackID());
assertEquals(packet.getCorrelationID(), p.getCorrelationID());
assertEquals(packet.getTargetID(), p.getTargetID());
@@ -316,7 +339,6 @@
JMSException e = new InvalidDestinationException(
"testJMSExceptionMessage");
JMSExceptionMessage message = new JMSExceptionMessage(e);
- addVersion(message);
AbstractPacketCodec<JMSExceptionMessage> codec = new JMSExceptionMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
@@ -334,15 +356,48 @@
.getException().getMessage());
}
+ public void testPing() throws Exception
+ {
+ Ping packet = new Ping();
+ AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
+ .createCodecForEmptyPacket(PING, Ping.class);
+
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ checkBodyIsEmpty(buffer);
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof Ping);
+ assertEquals(PING, decodedPacket.getType());
+ }
+
+ public void testPong() throws Exception
+ {
+ Pong packet = new Pong();
+ AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
+ .createCodecForEmptyPacket(PONG, Pong.class);
+
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ checkBodyIsEmpty(buffer);
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof Pong);
+ assertEquals(PONG, decodedPacket.getType());
+ }
+
public void testTextPacket() throws Exception
{
TextPacket packet = new TextPacket("testTextPacket");
- addVersion(packet);
AbstractPacketCodec<TextPacket> codec = new TextPacketCodec();
SimpleRemotingBuffer buffer = encode(packet, codec);
checkHeader(buffer, packet);
- checkBody(buffer, packet.getText());
+ checkBody(buffer, packet.getText());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -353,7 +408,44 @@
assertEquals(TEXT, p.getType());
assertEquals(packet.getText(), p.getText());
}
+
+ public void testBytesPacket() throws Exception
+ {
+ BytesPacket packet = new BytesPacket(randomBytes());
+ AbstractPacketCodec codec = new BytesPacketCodec();
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ checkBody(buffer, packet.getBytes());
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof BytesPacket);
+ BytesPacket p = (BytesPacket) decodedPacket;
+
+ assertEquals(BYTES, p.getType());
+ assertEqualsByteArrays(packet.getBytes(), p.getBytes());
+ }
+
+ public void testSetSessionIDMessage() throws Exception
+ {
+ SetSessionIDMessage message = new SetSessionIDMessage(randomString());
+
+ AbstractPacketCodec codec = new SetSessionIDMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ checkBody(buffer, message.getSessionID());
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SetSessionIDMessage);
+ SetSessionIDMessage decodedMessage = (SetSessionIDMessage) decodedPacket;
+ assertEquals(MSG_SETSESSIONID, decodedMessage.getType());
+ assertEquals(message.getSessionID(), decodedMessage.getSessionID());
+ }
+
public void testCreateConnectionRequest() throws Exception
{
byte version = randomByte();
@@ -366,13 +458,14 @@
String clientID = null;
CreateConnectionRequest request = new CreateConnectionRequest(version,
- remotingSessionID, clientVMID, username, password, prefetchSize, dupsOkBatchSize, null);
- addVersion(request);
-
+ remotingSessionID, clientVMID, username, password, prefetchSize,
+ dupsOkBatchSize, null);
+
AbstractPacketCodec<CreateConnectionRequest> codec = new ConnectionFactoryCreateConnectionRequestCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
- checkBody(buffer, version, remotingSessionID, clientVMID, username, password, prefetchSize, dupsOkBatchSize, clientID);
+ checkBody(buffer, version, remotingSessionID, clientVMID, username,
+ password, prefetchSize, dupsOkBatchSize, clientID);
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -388,13 +481,12 @@
assertEquals(request.getUsername(), decodedRequest.getUsername());
assertEquals(request.getPassword(), decodedRequest.getPassword());
}
-
+
public void testCreateConnectionResponse() throws Exception
{
CreateConnectionResponse response = new CreateConnectionResponse(
randomString());
- addVersion(response);
-
+
AbstractPacketCodec<CreateConnectionResponse> codec = new ConnectionFactoryCreateConnectionResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -409,16 +501,16 @@
assertEquals(response.getConnectionID(), decodedResponse
.getConnectionID());
}
-
+
public void testCreateSessionRequest() throws Exception
{
CreateSessionRequest request = new CreateSessionRequest(true, 0, false);
- addVersion(request);
-
+
AbstractPacketCodec codec = new CreateSessionRequestCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
- checkBody(buffer, request.isTransacted(), request.getAcknowledgementMode(), request.isXA());
+ checkBody(buffer, request.isTransacted(), request
+ .getAcknowledgementMode(), request.isXA());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -436,8 +528,7 @@
{
CreateSessionResponse response = new CreateSessionResponse(
randomString(), 23);
- addVersion(response);
-
+
AbstractPacketCodec codec = new CreateSessionResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -457,8 +548,7 @@
public void testSendMessage() throws Exception
{
SessionSendMessage packet = new SessionSendMessage(new MessageImpl());
- addVersion(packet);
-
+
AbstractPacketCodec codec = new SessionSendMessageCodec();
SimpleRemotingBuffer buffer = encode(packet, codec);
checkHeader(buffer, packet);
@@ -476,16 +566,17 @@
public void testCreateConsumerRequest() throws Exception
{
- Destination destination = new DestinationImpl(DestinationType.QUEUE, "testCreateConsumerRequest", false);
+ Destination destination = new DestinationImpl(DestinationType.QUEUE,
+ "testCreateConsumerRequest", false);
CreateConsumerRequest request = new CreateConsumerRequest(destination,
"color = 'red'", false, "subscription", false);
- addVersion(request);
-
+
AbstractPacketCodec codec = new CreateConsumerRequestCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
- checkBody(buffer, AbstractPacketCodec.encode(destination), request.getSelector(),
- request.isNoLocal(), request.getSubscriptionName(), request.isConnectionConsumer());
+ checkBody(buffer, AbstractPacketCodec.encode(destination), request
+ .getSelector(), request.isNoLocal(), request.getSubscriptionName(),
+ request.isConnectionConsumer());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -499,15 +590,15 @@
assertEquals(request.getSubscriptionName(), decodedRequest
.getSubscriptionName());
assertEquals(request.isConnectionConsumer(), decodedRequest
- .isConnectionConsumer());;
+ .isConnectionConsumer());
+ ;
}
public void testCreateDestinationRequest() throws Exception
{
CreateDestinationRequest request = new CreateDestinationRequest(
"testCreateDestinationRequest", false);
- addVersion(request);
-
+
AbstractPacketCodec codec = new CreateDestinationRequestCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
@@ -529,8 +620,7 @@
CreateDestinationResponse response = new CreateDestinationResponse(
destination);
- addVersion(response);
-
+
AbstractPacketCodec codec = new CreateDestinationResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -548,12 +638,12 @@
public void testCreateDestinationResponseForTopic() throws Exception
{
- JBossTopic destination = new JBossTopic("testCreateDestinationResponseForTopic");
+ JBossTopic destination = new JBossTopic(
+ "testCreateDestinationResponseForTopic");
CreateDestinationResponse response = new CreateDestinationResponse(
destination);
- addVersion(response);
-
+
AbstractPacketCodec codec = new CreateDestinationResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -574,8 +664,7 @@
CreateConsumerResponse response = new CreateConsumerResponse(
randomString(), 23, 42, randomLong());
- addVersion(response);
-
+
AbstractPacketCodec codec = new CreateConsumerResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -598,7 +687,6 @@
public void testStartConnectionMessage() throws Exception
{
StartConnectionMessage packet = new StartConnectionMessage();
- addVersion(packet);
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_STARTCONNECTION, StartConnectionMessage.class);
@@ -616,7 +704,6 @@
public void testStopConnectionMessage() throws Exception
{
StopConnectionMessage packet = new StopConnectionMessage();
- addVersion(packet);
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_STOPCONNECTION, StopConnectionMessage.class);
@@ -634,7 +721,6 @@
public void testChangeRateMessage() throws Exception
{
ConsumerChangeRateMessage message = new ConsumerChangeRateMessage(0.63f);
- addVersion(message);
AbstractPacketCodec codec = new ConsumerChangeRateMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
@@ -653,13 +739,12 @@
{
Message msg = new MessageImpl();
DeliverMessage message = new DeliverMessage(msg, randomLong(), 23);
- addVersion(message);
-
+
AbstractPacketCodec codec = new DeliverMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
- checkBody(buffer, encodeMessage(msg),
- message.getDeliveryID(), message.getDeliveryCount());
+ checkBody(buffer, encodeMessage(msg), message.getDeliveryID(), message
+ .getDeliveryCount());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -674,31 +759,122 @@
.getDeliveryCount());
}
+ public void testSessionAcknowledgeMessage() throws Exception
+ {
+ SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(
+ randomLong(), true);
+ AbstractPacketCodec codec = new SessionAcknowledgeMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ checkBody(buffer, message.getDeliveryID(), message.isAllUpTo());
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SessionAcknowledgeMessage);
+ SessionAcknowledgeMessage decodedMessage = (SessionAcknowledgeMessage) decodedPacket;
+ assertEquals(MSG_ACKNOWLEDGE, decodedMessage.getType());
+ assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
+ assertEquals(message.isAllUpTo(), decodedMessage.isAllUpTo());
+ }
+
+ public void testSessionCancelMessage() throws Exception
+ {
+ SessionCancelMessage message = new SessionCancelMessage(randomLong(),
+ true);
+
+ AbstractPacketCodec codec = new SessionCancelMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ checkBody(buffer, message.getDeliveryID(), message.isExpired());
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SessionCancelMessage);
+ SessionCancelMessage decodedMessage = (SessionCancelMessage) decodedPacket;
+ assertEquals(MSG_CANCEL, decodedMessage.getType());
+ assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
+ assertEquals(message.isExpired(), decodedMessage.isExpired());
+ }
+
+ public void testSessionCommitMessage() throws Exception
+ {
+ SessionCommitMessage message = new SessionCommitMessage();
+
+ AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
+ MSG_COMMIT, SessionCommitMessage.class);
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ checkBodyIsEmpty(buffer);
+
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SessionCommitMessage);
+ assertEquals(MSG_COMMIT, decodedPacket.getType());
+ }
+
+ public void testSessionRollbackMessage() throws Exception
+ {
+ SessionRollbackMessage message = new SessionRollbackMessage();
+
+ AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
+ MSG_ROLLBACK, SessionRollbackMessage.class);
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ checkBodyIsEmpty(buffer);
+
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SessionRollbackMessage);
+ assertEquals(MSG_ROLLBACK, decodedPacket.getType());
+ }
+
+ public void testSessionRecoverMessage() throws Exception
+ {
+ SessionRecoverMessage message = new SessionRecoverMessage();
+
+ AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
+ MSG_RECOVER, SessionRecoverMessage.class);
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ checkBodyIsEmpty(buffer);
+
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SessionRecoverMessage);
+ assertEquals(MSG_RECOVER, decodedPacket.getType());
+ }
+
public void testClosingMessage() throws Exception
{
ClosingMessage request = new ClosingMessage();
- addVersion(request);
-
- AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
+
+ AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
PacketType.MSG_CLOSING, ClosingMessage.class);
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
checkBodyIsEmpty(buffer);
+
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ClosingMessage);
ClosingMessage decodedRequest = (ClosingMessage) decodedPacket;
- assertEquals(PacketType.MSG_CLOSING, decodedRequest.getType());
+ assertEquals(PacketType.MSG_CLOSING, decodedRequest.getType());
}
-
public void testCloseMessage() throws Exception
{
CloseMessage message = new CloseMessage();
- addVersion(message);
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_CLOSE, CloseMessage.class);
@@ -714,11 +890,9 @@
assertEquals(MSG_CLOSE, decodedMessage.getType());
}
-
public void testGetClientIDRequest() throws Exception
{
GetClientIDRequest request = new GetClientIDRequest();
- addVersion(request);
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_GETCLIENTID, GetClientIDRequest.class);
@@ -736,8 +910,7 @@
public void testGetClientIDResponse() throws Exception
{
GetClientIDResponse response = new GetClientIDResponse(randomString());
- addVersion(response);
-
+
AbstractPacketCodec codec = new GetClientIDResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -755,8 +928,7 @@
public void testSetClientIDMessage() throws Exception
{
SetClientIDMessage message = new SetClientIDMessage(randomString());
- addVersion(message);
-
+
AbstractPacketCodec codec = new SetClientIDMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
@@ -770,18 +942,19 @@
assertEquals(MSG_SETCLIENTID, decodedMessage.getType());
assertEquals(message.getClientID(), decodedMessage.getClientID());
}
-
+
public void testCreateBrowserRequest() throws Exception
{
- Destination destination = new DestinationImpl(DestinationType.QUEUE, "testCreateBrowserRequest", false);
+ Destination destination = new DestinationImpl(DestinationType.QUEUE,
+ "testCreateBrowserRequest", false);
CreateBrowserRequest request = new CreateBrowserRequest(destination,
"color = 'red'");
- addVersion(request);
-
+
AbstractPacketCodec codec = new CreateBrowserRequestCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
- checkBody(buffer, AbstractPacketCodec.encode(destination), request.getSelector());
+ checkBody(buffer, AbstractPacketCodec.encode(destination), request
+ .getSelector());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -796,8 +969,7 @@
public void testCreateBrowserResponse() throws Exception
{
CreateBrowserResponse response = new CreateBrowserResponse(randomString());
- addVersion(response);
-
+
AbstractPacketCodec codec = new CreateBrowserResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -815,7 +987,6 @@
public void testBrowserResetMessage() throws Exception
{
BrowserResetMessage message = new BrowserResetMessage();
- addVersion(message);
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_BROWSER_RESET, BrowserResetMessage.class);
@@ -833,7 +1004,6 @@
public void testBrowserHasNextMessageRequest() throws Exception
{
BrowserHasNextMessageRequest request = new BrowserHasNextMessageRequest();
- addVersion(request);
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_BROWSER_HASNEXTMESSAGE, BrowserHasNextMessageRequest.class);
@@ -852,7 +1022,6 @@
{
BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(
false);
- addVersion(response);
AbstractPacketCodec codec = new BrowserHasNextMessageResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -870,7 +1039,6 @@
public void testBrowserNextMessageRequest() throws Exception
{
BrowserNextMessageRequest request = new BrowserNextMessageRequest();
- addVersion(request);
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_BROWSER_NEXTMESSAGE, BrowserNextMessageRequest.class);
@@ -889,8 +1057,7 @@
{
Message msg = new MessageImpl();
BrowserNextMessageResponse response = new BrowserNextMessageResponse(msg);
- addVersion(response);
-
+
AbstractPacketCodec codec = new BrowserNextMessageResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
@@ -910,8 +1077,7 @@
{
BrowserNextMessageBlockRequest request = new BrowserNextMessageBlockRequest(
randomLong());
- addVersion(request);
-
+
AbstractPacketCodec codec = new BrowserNextMessageBlockRequestCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
@@ -931,12 +1097,12 @@
Message[] messages = new Message[] { new MessageImpl(), new MessageImpl() };
BrowserNextMessageBlockResponse response = new BrowserNextMessageBlockResponse(
messages);
- addVersion(response);
-
+
AbstractPacketCodec codec = new BrowserNextMessageBlockResponseCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
- checkBody(buffer, messages.length, BrowserNextMessageBlockResponseCodec.encode(messages));
+ checkBody(buffer, messages.length, BrowserNextMessageBlockResponseCodec
+ .encode(messages));
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -954,8 +1120,7 @@
{
UnsubscribeMessage message = new UnsubscribeMessage(
"testUnsubscribeMessage");
- addVersion(message);
-
+
AbstractPacketCodec codec = new UnsubscribeMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
@@ -973,11 +1138,11 @@
public void testAddTemporaryDestinationMessage() throws Exception
{
- Destination destination = new DestinationImpl(DestinationType.QUEUE, "testAddTemporaryDestinationMessage", false);
+ Destination destination = new DestinationImpl(DestinationType.QUEUE,
+ "testAddTemporaryDestinationMessage", false);
AddTemporaryDestinationMessage message = new AddTemporaryDestinationMessage(
destination);
- addVersion(message);
-
+
AbstractPacketCodec codec = new AddTemporaryDestinationMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
@@ -994,11 +1159,12 @@
public void testDeleteTemporaryDestinationMessage() throws Exception
{
- Destination destination = new DestinationImpl(DestinationType.QUEUE, "testDeleteTemporaryDestinationMessage", false);;
+ Destination destination = new DestinationImpl(DestinationType.QUEUE,
+ "testDeleteTemporaryDestinationMessage", false);
+ ;
DeleteTemporaryDestinationMessage message = new DeleteTemporaryDestinationMessage(
destination);
- addVersion(message);
-
+
AbstractPacketCodec codec = new DeleteTemporaryDestinationMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
@@ -1021,6 +1187,8 @@
private SimpleRemotingBuffer encode(AbstractPacket packet,
AbstractPacketCodec codec) throws Exception
{
+ log.debug("encode " + packet);
+
IoBuffer b = IoBuffer.allocate(256);
b.setAutoExpand(true);
Added: trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java 2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,138 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.crash;
+
+import static java.lang.Boolean.TRUE;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.DISABLE_INVM_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_INTERVAL;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_TIMEOUT;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
+import org.jboss.test.messaging.jms.JMSTestCase;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class UnresponsiveServerTest extends JMSTestCase
+{
+ // Constants -----------------------------------------------------
+
+ private MinaService minaService;
+ private Map<String, String> originalParameters;
+
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public UnresponsiveServerTest(String name)
+ {
+ super(name);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ minaService = servers.get(0).getMessagingServer().getMinaService();
+ originalParameters = new HashMap<String, String>(minaService.getLocator().getParameters());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ minaService.stop();
+ minaService.setParameters(originalParameters);
+ minaService.setKeepAliveFactory(new ServerKeepAliveFactory());
+ minaService.start();
+
+ super.tearDown();
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testExceptionListenerWhenServerIsUnresponsive()
+ throws Exception
+ {
+ KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+ // server does not send ping
+ expect(factory.ping()).andStubReturn(null);
+ // no pong -> server is not responding
+ expect(factory.pong()).andReturn(null).atLeastOnce();
+
+ ExceptionListener listener = createMock(ExceptionListener.class);
+ listener.onException(isA(JMSException.class));
+ expectLastCall().once();
+
+ replay(listener, factory);
+
+ minaService.stop();
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(KEEP_ALIVE_INTERVAL_KEY, Integer.toString(KEEP_ALIVE_INTERVAL));
+ parameters.put(KEEP_ALIVE_TIMEOUT_KEY, Integer.toString(KEEP_ALIVE_TIMEOUT));
+ parameters.put(DISABLE_INVM_KEY, TRUE.toString());
+ minaService.setParameters(parameters);
+ minaService.setKeepAliveFactory(factory);
+ minaService.start();
+
+ QueueConnection conn = getConnectionFactory().createQueueConnection();
+ conn.setExceptionListener(listener);
+
+ // FIXME should deduce them from MinaConnector somehow...
+ Thread.sleep((5 + 10 + 1) * 1000);
+
+ verify(listener, factory);
+
+ conn.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list