[jboss-cvs] JBoss Messaging SVN: r3419 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/client and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Dec 6 04:11:25 EST 2007
Author: jmesnil
Date: 2007-12-06 04:11:24 -0500 (Thu, 06 Dec 2007)
New Revision: 3419
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerPacketHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java
Removed:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/RemotingConnectionConfigurationTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/thirdparty/remoting/CallbackServerTimeoutTest.java
Modified:
branches/Branch_JBMESSAGING-544/src/etc/remoting/remoting-bisocket-service.xml
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/JBossConnection.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/state/ConsumerState.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544 Replace client-server transport with NIO based transport
* refactoring
* removed some tests specific to JBoss Remoting
Modified: branches/Branch_JBMESSAGING-544/src/etc/remoting/remoting-bisocket-service.xml
===================================================================
--- branches/Branch_JBMESSAGING-544/src/etc/remoting/remoting-bisocket-service.xml 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/etc/remoting/remoting-bisocket-service.xml 2007-12-06 09:11:24 UTC (rev 3419)
@@ -36,8 +36,10 @@
<!-- End immutable parameters -->
<!-- Periodicity of client pings. Server window by default is twice this figure -->
+ <!--
<attribute name="clientLeasePeriod" isParam="true">10000</attribute>
-
+ -->
+
<!-- Number of seconds to wait for a connection in the client pool to become free -->
<attribute name="numberOfRetries" isParam="true">10</attribute>
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/JBossConnection.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/JBossConnection.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -223,7 +223,7 @@
{
ConnectionState state = (ConnectionState)((ClientConnectionDelegate)delegate).getState();
- return state.getRemotingConnection().getNewRemotingClient().getSessionID();
+ return state.getRemotingConnection().getRemotingClient().getSessionID();
}
public ConnectionDelegate getDelegate()
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -43,7 +43,6 @@
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -182,11 +181,11 @@
// remove the consolidated remoting connection listener
-// ConsolidatedRemotingConnectionListener l = remotingConnection.removeConnectionListener();
-// if (l != null)
-// {
-// l.clear();
-// }
+ ConsolidatedRemotingConnectionListener l = remotingConnection.removeConnectionListener();
+ if (l != null)
+ {
+ l.clear();
+ }
// Finished with the connection - we need to shutdown callback server
remotingConnection.stop();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -25,6 +25,7 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.delegate.ClientConsumerPacketHandler;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.client.state.ConnectionState;
@@ -34,15 +35,8 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.exception.MessagingShutdownException;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.MessageQueueNameHelper;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -66,7 +60,6 @@
private static final Logger log = Logger.getLogger(ConsumerAspect.class);
-
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
@@ -123,42 +116,11 @@
sessionState.addCallbackHandler(messageHandler);
- PacketDispatcher.client.register(new PacketHandler() {
+ PacketDispatcher.client.register(new ClientConsumerPacketHandler(messageHandler, consumerID));
- public String getID()
- {
- return consumerID;
- }
-
- public void handle(AbstractPacket packet, PacketSender sender)
- {
- try
- {
- PacketType type = packet.getType();
- if (type == PacketType.MSG_DELIVERMESSAGE)
- {
- DeliverMessage message = (DeliverMessage) packet;
- // FIXME Classcast exception...
- MessageProxy proxy = JBossMessage.
- createThinDelegate(message.getDeliveryID(), (JBossMessage)message.getMessage(), message.getDeliveryCount());
- messageHandler.handleMessage(proxy);
- }
- } catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+ cm.registerHandler(consumerID, messageHandler);
- @Override
- public String toString()
- {
- return "ConsumerAspectPacketHandler[id=" + consumerID + "]";
- }
- });
-// CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
-// cm.registerHandler(consumerID, messageHandler);
-//
consumerState.setClientConsumer(messageHandler);
if (autoFlowControl)
@@ -196,6 +158,8 @@
CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.unregisterHandler(consumerState.getConsumerID());
+ PacketDispatcher.client.unregister(consumerState.getConsumerID());
+
//And then we cancel any messages still in the message callback handler buffer
consumerState.getClientConsumer().cancelBuffer();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -59,7 +59,6 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
* $Id$
*/
@@ -98,7 +97,6 @@
// install the consolidated remoting connection listener; it will be de-installed on
// connection closing by ConnectionAspect
- // FIXME get rif of JBR code
ConsolidatedRemotingConnectionListener listener =
new ConsolidatedRemotingConnectionListener();
@@ -117,7 +115,7 @@
new ConnectionState(serverID, connectionDelegate,
remotingConnection, versionToUse, idGenerator);
- listener.setConnectionState(connectionState);
+ listener.setConnectionState(connectionState);
connectionDelegate.setState(connectionState);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -85,16 +85,16 @@
state.synchronizeWith(newDelegate.getState());
- newClient = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
- getNewRemotingClient();
+ client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
+ getRemotingClient();
}
public void setState(HierarchicalState state)
{
super.setState(state);
- newClient = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
- getNewRemotingClient();
+ client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
+ getRemotingClient();
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -146,24 +146,13 @@
updateFailoverInfo(topology.getDelegates(), topology.getFailoverMap());
} else
{
- System.err.println("ClientClusteredConnectionFactoryDelegate.handle() unhandled packet: " + packet);
+ log.error("Unhandled packet " + packet + " by " + this);
}
}
});
- UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getNewRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, true);
- sendOneWay(remoting.getNewRemotingClient(), delegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
-
- // remoting.getCallbackManager().setConnectionfactoryCallbackHandler(new ConnectionFactoryCallbackHandler(this, remoting));
-
-// ConnectionFactoryAddCallbackRequest request =
-// new ConnectionFactoryAddCallbackRequest (JMSClientVMIdentifier.instance,
-// remoting.getRemotingClient().getSessionId(),
-// delegate.getID(),
-// Version.instance().getProviderIncrementingVersion());
-//
-// remoting.getRemotingClient().invoke(request, null);
-
+ UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, true);
+ sendOneWay(remoting.getRemotingClient(), delegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
}
private void addShutdownHook()
@@ -175,16 +164,8 @@
{
PacketDispatcher.client.unregister(currentDelegate.getID());
- UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getNewRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, false);
- sendOneWay(remoting.getNewRemotingClient(), currentDelegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
-
-// ConnectionFactoryRemoveCallbackRequest request =
-// new ConnectionFactoryRemoveCallbackRequest (JMSClientVMIdentifier.instance,
-// remoting.getRemotingClient().getSessionId(),
-// currentDelegate.getID(),
-// Version.instance().getProviderIncrementingVersion());
-//
-// remoting.getRemotingClient().invoke(request, null);
+ UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, false);
+ sendOneWay(remoting.getRemotingClient(), currentDelegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
}
protected void finalize() throws Throwable
@@ -367,7 +348,7 @@
byte version = Version.instance().getProviderIncrementingVersion();
- GetTopologyResponse response = (GetTopologyResponse) sendBlocking(remoting.getNewRemotingClient(), currentDelegate.getID(), version, new GetTopologyRequest());
+ GetTopologyResponse response = (GetTopologyResponse) sendBlocking(remoting.getRemotingClient(), currentDelegate.getID(), version, new GetTopologyRequest());
TopologyResult topology = response.getTopology();
updateFailoverInfo(topology.getDelegates(), topology.getFailoverMap());
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -137,7 +137,7 @@
// There is one RM per server, so we need to merge the rms if necessary
ResourceManagerFactory.instance.handleFailover(serverID, newDelegate.getServerID());
- newClient = thisState.getRemotingConnection().getNewRemotingClient();
+ client = thisState.getRemotingConnection().getRemotingClient();
serverID = newDelegate.getServerID();
}
@@ -146,9 +146,7 @@
{
super.setState(state);
-// client = ((ConnectionState)state).getRemotingConnection().getRemotingClient();
-
- newClient = ((ConnectionState)state).getRemotingConnection().getNewRemotingClient();
+ client = ((ConnectionState)state).getRemotingConnection().getRemotingClient();
}
// Closeable implementation ---------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -26,8 +26,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
import javax.jms.JMSException;
@@ -37,14 +35,11 @@
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.TopologyResult;
import org.jboss.jms.exception.MessagingNetworkFailureException;
-import org.jboss.jms.server.ServerPeer;
-import org.jboss.jms.wireformat.JMSWireFormat;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackRequest;
import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackResponse;
import org.jboss.messaging.util.Version;
-import org.jboss.remoting.Client;
import org.jboss.remoting.InvokerLocator;
/**
@@ -156,8 +151,8 @@
remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
remotingConnection.start();
- newClient = remotingConnection.getNewRemotingClient();
- String sessionID = newClient.getSessionID();
+ client = remotingConnection.getRemotingClient();
+ String sessionID = client.getSessionID();
CreateConnectionRequest request = new CreateConnectionRequest(v, sessionID, JMSClientVMIdentifier.instance, failedNodeID, username, password);
CreateConnectionResponse response = (CreateConnectionResponse) sendBlocking(request);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -90,8 +90,8 @@
ClientConsumerDelegate newDelegate = (ClientConsumerDelegate)nd;
// The client needs to be set first
- newClient = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
- getNewRemotingClient();
+ client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
+ getRemotingClient();
// synchronize server endpoint state
@@ -110,8 +110,8 @@
{
super.setState(state);
- newClient = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
- getNewRemotingClient();
+ client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
+ getRemotingClient();
}
// Closeable implementation ---------------------------------------------------------------------
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerPacketHandler.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerPacketHandler.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -0,0 +1,70 @@
+package org.jboss.jms.client.delegate;
+
+import org.jboss.jms.client.container.ClientConsumer;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.MessageProxy;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ClientConsumerPacketHandler implements PacketHandler
+{
+ /**
+ *
+ */
+ private final ClientConsumer messageHandler;
+ /**
+ *
+ */
+ private final String consumerID;
+
+ /**
+ * @param messageHandler
+ * @param consumerID
+ */
+ public ClientConsumerPacketHandler(ClientConsumer messageHandler,
+ String consumerID)
+ {
+ this.messageHandler = messageHandler;
+ this.consumerID = consumerID;
+ }
+
+ public String getID()
+ {
+ return consumerID;
+ }
+
+ public void handle(AbstractPacket packet, PacketSender sender)
+ {
+ try
+ {
+ PacketType type = packet.getType();
+ if (type == PacketType.MSG_DELIVERMESSAGE)
+ {
+ DeliverMessage message = (DeliverMessage) packet;
+ // FIXME Classcast exception...
+ MessageProxy proxy = JBossMessage.
+ createThinDelegate(message.getDeliveryID(), (JBossMessage)message.getMessage(), message.getDeliveryCount());
+ messageHandler.handleMessage(proxy);
+ }
+ } catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConsumerAspectPacketHandler[id=" + consumerID + "]";
+ }
+}
\ No newline at end of file
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -139,7 +139,7 @@
JMSRemotingConnection conn = ((ConnectionState)state.getParent()).getRemotingConnection();
- newClient = conn.getNewRemotingClient();
+ client = conn.getRemotingClient();
strictTck = conn.isStrictTck();
}
@@ -150,7 +150,7 @@
JMSRemotingConnection conn = ((ConnectionState)state.getParent()).getRemotingConnection();
- newClient = conn.getNewRemotingClient();
+ client = conn.getRemotingClient();
strictTck = conn.isStrictTck();
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -85,7 +85,7 @@
protected transient byte version;
- protected transient org.jboss.messaging.core.remoting.Client newClient;
+ protected transient Client client;
// Static ---------------------------------------------------------------------------------------
@@ -156,12 +156,12 @@
// Protected ------------------------------------------------------------------------------------
- protected void sendOneWay(AbstractPacket packet)
+ protected void sendOneWay(AbstractPacket packet) throws JMSException
{
- sendOneWay(newClient, id, version, packet);
+ sendOneWay(client, id, version, packet);
}
- protected static void sendOneWay(Client client, String targetID, byte version, AbstractPacket packet)
+ protected static void sendOneWay(Client client, String targetID, byte version, AbstractPacket packet) throws JMSException
{
assert client != null;
assertValidID(targetID);
@@ -175,7 +175,7 @@
protected AbstractPacket sendBlocking(AbstractPacket request) throws JMSException
{
- return sendBlocking(newClient, id, version, request);
+ return sendBlocking(client, id, version, request);
}
protected static AbstractPacket sendBlocking(Client client, String targetID, byte version, AbstractPacket request) throws JMSException
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -59,7 +59,6 @@
// Attributes -----------------------------------------------------------------------------------
protected Map<String, ClientConsumer> callbackHandlers;
- protected ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler;
// Constructors ---------------------------------------------------------------------------------
@@ -87,17 +86,6 @@
return (ClientConsumer)callbackHandlers.remove(consumerID);
}
-
- public ConnectionFactoryCallbackHandler getConnectionfactoryCallbackHandler()
- {
- return connectionfactoryCallbackHandler;
- }
-
- public void setConnectionfactoryCallbackHandler(ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler)
- {
- this.connectionfactoryCallbackHandler = connectionfactoryCallbackHandler;
- }
-
public String toString()
{
return "CallbackManager[" + Integer.toHexString(hashCode()) + "]";
Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -1,101 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.jms.client.remoting;
-
-import java.lang.ref.WeakReference;
-
-import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.logging.Logger;
-import org.jboss.remoting.Client;
-import org.jboss.remoting.ConnectionListener;
-
-/**
- * This class will manage ConnectionFactory messages updates
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class ConnectionFactoryCallbackHandler
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ConnectionFactoryCallbackHandler.class);
-
- // Attributes -----------------------------------------------------------------------------------
-
- // Without a WeakReference here, the CF would never be released!
- private WeakReference<ClientClusteredConnectionFactoryDelegate> delegateRef;
- private JMSRemotingConnection remotingConnection;
-
- // Static ---------------------------------------------------------------------------------------
-
- private static boolean trace = log.isTraceEnabled();
-
- // Constructors ---------------------------------------------------------------------------------
-
- public ConnectionFactoryCallbackHandler(ClientClusteredConnectionFactoryDelegate cfDelegate,
- JMSRemotingConnection remotingConnection)
- {
- this.delegateRef = new WeakReference<ClientClusteredConnectionFactoryDelegate>(cfDelegate);
- this.remotingConnection = remotingConnection;
- this.remotingConnection.addPlainConnectionListener(new CallbackConnectionListener());
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void handleMessage(Object message)
- {
- throw new IllegalStateException("JBoss Remoting must no longer be used");
- }
-
-
- public String toString()
- {
- return "ConnectionFactoryCallbackHandler[" + delegateRef.get() + "]";
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
- class CallbackConnectionListener implements ConnectionListener
- {
-
- public void handleConnectionException(Throwable throwable, Client client)
- {
- ClientClusteredConnectionFactoryDelegate delegate = delegateRef.get();
-
- if (delegate!=null)
- {
- delegate.establishCallback();
- }
-
- //remotingConnection.removePlainConnectionListener(this);
- }
- }
-}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -21,22 +21,16 @@
*/
package org.jboss.jms.client.remoting;
-import java.security.AccessController;
-import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
-import org.jboss.jms.server.ServerPeer;
-import org.jboss.jms.wireformat.JMSWireFormat;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.TransportType;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.remoting.Client;
-import org.jboss.remoting.ConnectionListener;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.callback.CallbackPoller;
-import org.jboss.remoting.callback.InvokerCallbackHandler;
import org.jboss.remoting.transport.bisocket.Bisocket;
import org.jboss.remoting.transport.socket.MicroSocketClientInvoker;
import org.jboss.remoting.transport.socket.SocketServerInvoker;
@@ -210,51 +204,49 @@
return metadata;
}
- /**
- * Configures and add the invokerCallbackHandler the right way (push or pull).
- *
- * @param configurer - passed for logging purposes only.
- * @param initialMetadata - some initial metadata that we might want to pass along when
- * registering invoker callback handler.
- */
- public static void addInvokerCallbackHandler(Object configurer,
- Client client,
- Map initialMetadata,
- InvokerLocator serverLocator,
- InvokerCallbackHandler invokerCallbackHandler)
- throws Throwable
- {
+// /**
+// * Configures and add the invokerCallbackHandler the right way (push or pull).
+// *
+// * @param configurer - passed for logging purposes only.
+// * @param initialMetadata - some initial metadata that we might want to pass along when
+// * registering invoker callback handler.
+// */
+// public static void addInvokerCallbackHandler(Object configurer,
+// Client client,
+// Map initialMetadata,
+// InvokerLocator serverLocator,
+// InvokerCallbackHandler invokerCallbackHandler)
+// throws Throwable
+// {
+//
+// // For transports derived from the socket transport, allow true push callbacks,
+// // with callback Connector. For http transport, simulate push callbacks.
+// String protocol = serverLocator.getProtocol();
+// boolean isBisocket = "bisocket".equals(protocol) || "sslbisocket".equals(protocol);
+// boolean isSocket = "socket".equals(protocol) || "sslsocket".equals(protocol);
+// boolean doPushCallbacks = isBisocket || isSocket;
+// Map metadata = createCallbackMetadata(doPushCallbacks, initialMetadata, serverLocator);
+//
+// if (doPushCallbacks)
+// {
+// log.trace(configurer + " is doing push callbacks");
+// client.addListener(invokerCallbackHandler, metadata, null, true);
+// }
+// else
+// {
+// log.trace(configurer + " is simulating push callbacks");
+// client.addListener(invokerCallbackHandler, metadata);
+// }
+// }
- // For transports derived from the socket transport, allow true push callbacks,
- // with callback Connector. For http transport, simulate push callbacks.
- String protocol = serverLocator.getProtocol();
- boolean isBisocket = "bisocket".equals(protocol) || "sslbisocket".equals(protocol);
- boolean isSocket = "socket".equals(protocol) || "sslsocket".equals(protocol);
- boolean doPushCallbacks = isBisocket || isSocket;
- Map metadata = createCallbackMetadata(doPushCallbacks, initialMetadata, serverLocator);
-
- if (doPushCallbacks)
- {
- log.trace(configurer + " is doing push callbacks");
- client.addListener(invokerCallbackHandler, metadata, null, true);
- }
- else
- {
- log.trace(configurer + " is simulating push callbacks");
- client.addListener(invokerCallbackHandler, metadata);
- }
- }
-
// Attributes -----------------------------------------------------------------------------------
- private Client client;
+ private org.jboss.messaging.core.remoting.Client client;
private boolean clientPing;
private InvokerLocator serverLocator;
private CallbackManager callbackManager;
private boolean strictTck;
- private org.jboss.messaging.core.remoting.Client newClient;
-
// When a failover is performed, this flag is set to true
protected boolean failed = false;
@@ -277,47 +269,17 @@
public void start() throws Throwable
{
- // Enable client pinging. Server leasing is enabled separately on the server side.
-
- Map config = new HashMap();
-
- config.put(Client.ENABLE_LEASE, String.valueOf(clientPing));
-
- client = new Client(serverLocator, config);
-
- client.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
-
if (log.isTraceEnabled()) { log.trace(this + " created client"); }
callbackManager = new CallbackManager();
- //Do a privileged Action to connect
- AccessController.doPrivileged( new PrivilegedExceptionAction()
- {
- public Object run() throws Exception
- {
- client.connect();
- return null;
- }
- });
+ client = new org.jboss.messaging.core.remoting.Client();
+ client.connect(serverLocator.getHost(), serverLocator.getPort() + 1000, TransportType.TCP);
- newClient = new org.jboss.messaging.core.remoting.Client();
- newClient.connect(serverLocator.getHost(), serverLocator.getPort() + 1000, TransportType.TCP);
-
- // We explicitly set the Marshaller since otherwise remoting tries to resolve the marshaller
- // every time which is very slow - see org.jboss.remoting.transport.socket.ProcessInvocation
- // This can make a massive difference on performance. We also do this in
- // ServerConnectionEndpoint.setCallbackClient.
-
- client.setMarshaller(new JMSWireFormat());
- client.setUnMarshaller(new JMSWireFormat());
-
Map metadata = new HashMap();
metadata.put(InvokerLocator.DATATYPE, "jms");
- addInvokerCallbackHandler(this, client, metadata, serverLocator, callbackManager);
-
log.trace(this + " started");
}
@@ -325,59 +287,24 @@
{
log.trace(this + " stop");
- // explicitly remove the callback listener, to avoid race conditions on server
- // (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
-
try
{
- client.removeListener(callbackManager);
+ client.disconnect();
}
- catch(Throwable ignore)
- {
- // very unlikely to get an exception on a local remove (I suspect badly designed API),
- // but we're failed anyway, so we don't care too much
-
- // Actually an exception will always be thrown here if the failure was detected by the connection
- // validator since the validator will disconnect the client before calling the connection
- // listener.
-
- log.trace(this + " failed to cleanly remove callback manager from the client", ignore);
- }
-
- try
- {
- client.disconnect();
- }
catch (Throwable ignore)
- {
- log.trace(this + " failed to disconnect the client", ignore);
- }
-
- client = null;
-
- try
- {
- newClient.disconnect();
- }
- catch (Throwable ignore)
{
log.trace(this + " failed to disconnect the new client", ignore);
}
- newClient = null;
+ client = null;
log.trace(this + " closed");
}
-
- public Client getRemotingClient()
+
+ public org.jboss.messaging.core.remoting.Client getRemotingClient()
{
return client;
}
-
- public org.jboss.messaging.core.remoting.Client getNewRemotingClient()
- {
- return newClient;
- }
public CallbackManager getCallbackManager()
{
@@ -402,20 +329,6 @@
public synchronized void setFailed()
{
failed = true;
-
- // Remoting has the bad habit of letting the job of cleaning after a failed connection up to
- // the application. Here, we take care of that, by disconnecting the remoting client, and
- // thus silencing both the connection validator and the lease pinger, and also locally
- // cleaning up the callback listener
-
- try
- {
- client.setDisconnectTimeout(0);
- }
- catch (Throwable ignore)
- {
- log.trace(this + " failed to set disconnect timeout", ignore);
- }
stop();
}
@@ -437,16 +350,6 @@
return true;
}
- public synchronized void addPlainConnectionListener(ConnectionListener listener)
- {
- client.addConnectionListener(listener);
- }
-
- public synchronized void removePlainConnectionListener(ConnectionListener listener)
- {
- client.removeConnectionListener(listener);
- }
-
public synchronized ConsolidatedRemotingConnectionListener getConnectionListener()
{
return remotingConnectionListener;
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -25,10 +25,12 @@
import org.jboss.jms.client.container.ClientConsumer;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientConsumerPacketHandler;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.util.Version;
@@ -141,9 +143,12 @@
ClientConsumer handler = oldCallbackManager.unregisterHandler(oldConsumerID);
ClientConsumer newHandler = newCallbackManager.unregisterHandler(consumerID);
-
+
+ PacketDispatcher.client.unregister(oldConsumerID);
+
handler.synchronizeWith(newHandler);
newCallbackManager.registerHandler(consumerID, handler);
+ PacketDispatcher.client.register(new ClientConsumerPacketHandler(handler, consumerID));
}
// Public ---------------------------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -26,7 +26,6 @@
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.messaging.core.contract.MessagingComponent;
import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
@@ -61,17 +60,11 @@
*/
List getActiveConnections();
- void addConnectionFactoryCallback(String uniqueName, String JVMID, String remotingSessionID, ServerInvokerCallbackHandler handler);
-
void addConnectionFactoryCallback(String uniqueName, String VMID,
String remotingSessionID, PacketSender sender);
- void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
-
void removeConnectionFactoryCallback(String uniqueName, String vmid, PacketSender sender);
- ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName);
-
PacketSender[] getConnectionFactorySenders(String uniqueName);
/**
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -45,7 +45,6 @@
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientDisconnectedException;
import org.jboss.remoting.ConnectionListener;
-import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -202,12 +201,6 @@
}
/** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
- public synchronized void addConnectionFactoryCallback(String uniqueName, String JVMID, String remotingSessionID, ServerInvokerCallbackHandler handler)
- {
- remotingSessions.put(remotingSessionID, JVMID);
- getCFInfo(uniqueName).addClient(JVMID, handler);
- }
-
public void addConnectionFactoryCallback(String uniqueName, String vmID,
String remotingSessionID, PacketSender sender)
{
@@ -215,11 +208,6 @@
getCFInfo(uniqueName).addClient(vmID, sender);
}
/** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
- public synchronized void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
- {
- getCFInfo(uniqueName).removeHandler(JVMID, handler);
- }
-
public synchronized void removeConnectionFactoryCallback(String uniqueName, String vmid,
PacketSender sender)
{
@@ -227,11 +215,6 @@
}
/** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
- public synchronized ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName)
- {
- return getCFInfo(uniqueName).getAllHandlers();
- }
-
public synchronized PacketSender[] getConnectionFactorySenders(String uniqueName)
{
return getCFInfo(uniqueName).getAllSenders();
@@ -392,31 +375,13 @@
}
}
}
-
+
for (ConnectionFactoryCallbackInformation cfInfo: cfCallbackInfo.values())
{
- ServerInvokerCallbackHandler[] handlers = cfInfo.getAllHandlers(jmsClientID);
- for (ServerInvokerCallbackHandler handler: handlers)
+ PacketSender[] senders = cfInfo.getAllSenders(jmsClientID);
+ for (PacketSender sender: senders)
{
- try
- {
- handler.getCallbackClient().disconnect();
- }
- catch (Throwable e)
- {
- log.warn (e, e);
- }
-
- try
- {
- handler.destroy();
- }
- catch (Throwable e)
- {
- log.warn (e, e);
- }
-
- cfInfo.removeHandler(jmsClientID, handler);
+ cfInfo.removeSender(jmsClientID, sender);
}
}
@@ -432,8 +397,6 @@
// We keep two lists, one containing all clients a CF will have to maintain and another
// organized by JVMId as we will need that organization when cleaning up dead clients
String uniqueName;
- Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<ServerInvokerCallbackHandler>> clientHandlersByVM;
- ConcurrentHashSet<ServerInvokerCallbackHandler> clientHandlers;
Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<PacketSender>> clientSendersByVM;
ConcurrentHashSet<PacketSender> clientSenders;
@@ -441,71 +404,32 @@
public ConnectionFactoryCallbackInformation(String uniqueName)
{
this.uniqueName = uniqueName;
- this.clientHandlersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<ServerInvokerCallbackHandler>>();
- this.clientHandlers = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
this.clientSendersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<PacketSender>>();
this.clientSenders = new ConcurrentHashSet<PacketSender>();
}
-
- public void addClient(String vmID, ServerInvokerCallbackHandler handler)
- {
- clientHandlers.add(handler);
- getHandlersList(vmID).add(handler);
- }
-
public void addClient(String vmID, PacketSender sender)
{
clientSenders.add(sender);
getSendersList(vmID).add(sender);
}
- public ServerInvokerCallbackHandler[] getAllHandlers(String vmID)
- {
- Set<ServerInvokerCallbackHandler> list = getHandlersList(vmID);
- ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[list.size()];
- return (ServerInvokerCallbackHandler[])list.toArray(array);
- }
-
+
public PacketSender[] getAllSenders(String vmID)
{
Set<PacketSender> list = getSendersList(vmID);
return (PacketSender[]) list.toArray(new PacketSender[list.size()]);
}
- public ServerInvokerCallbackHandler[] getAllHandlers()
- {
- ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[clientHandlers.size()];
- return (ServerInvokerCallbackHandler[])clientHandlers.toArray(array);
- }
-
public PacketSender[] getAllSenders()
{
return (PacketSender[]) clientSenders.toArray(new PacketSender[clientSenders.size()]);
}
- public void removeHandler(String vmID, ServerInvokerCallbackHandler handler)
- {
- clientHandlers.remove(handler);
- getHandlersList(vmID).remove(handler);
- }
-
public void removeSender(String vmID, PacketSender sender)
{
clientSenders.remove(sender);
getSendersList(vmID).remove(sender);
}
-
- private ConcurrentHashSet<ServerInvokerCallbackHandler> getHandlersList(String vmID)
- {
- ConcurrentHashSet<ServerInvokerCallbackHandler> perVMList = clientHandlersByVM.get(vmID);
- if (perVMList == null)
- {
- perVMList = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
- clientHandlersByVM.put(vmID, perVMList);
- perVMList = clientHandlersByVM.get(vmID);
- }
- return perVMList;
- }
private ConcurrentHashSet<PacketSender> getSendersList(String vmID)
{
@@ -518,7 +442,6 @@
}
return perVMList;
}
-
}
private void dump()
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -283,13 +283,6 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " getClientAOPStack");
}
}
-
- public void addCallback(String VMID, String remotingSessionID,
- ServerInvokerCallbackHandler callbackHandler) throws JMSException
- {
- log.debug("Adding callbackHandler on ConnectionFactory");
- serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, remotingSessionID, callbackHandler);
- }
public void addSender(String VMID, String remotingSessionID,
PacketSender sender) throws JMSException
@@ -297,13 +290,6 @@
log.debug("Adding PacketSender on ConnectionFactory");
serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, remotingSessionID, sender);
}
-
- public void removeCallback(String VMID, String remotingSessionID,
- ServerInvokerCallbackHandler callbackHandler) throws JMSException
- {
- log.debug("Removing callbackHandler on ConnectionFactory");
- serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
- }
public void removeSender(String VMID, String remotingSessionID,
PacketSender sender) throws JMSException
@@ -345,10 +331,9 @@
{
updateTopology(delegates, failoverMap);
- ServerInvokerCallbackHandler[] clientFactoriesToUpdate = serverPeer.getConnectionManager().getConnectionFactoryCallback(this.uniqueName);
- log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + clientFactoriesToUpdate.length);
-
PacketSender[] senders = serverPeer.getConnectionManager().getConnectionFactorySenders(uniqueName);
+ log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + senders.length);
+
GetTopologyResponse packet = new GetTopologyResponse(getTopology());
packet.setVersion(Version.instance().getProviderIncrementingVersion());
packet.setTargetID(id);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -94,26 +94,12 @@
return endpoint.getClientAOPStack();
}
- public void addCallback(String vmID, String remotingSessionID,
- ServerInvokerCallbackHandler callbackHandler) throws JMSException
- {
- ((ServerConnectionFactoryEndpoint)endpoint).addCallback(vmID, remotingSessionID,
- callbackHandler);
- }
-
public void addSender(String vmID, String remotingSessionID,
PacketSender sender) throws JMSException
{
((ServerConnectionFactoryEndpoint)endpoint).addSender(vmID, remotingSessionID,
sender);
}
-
- public void removeCallback(String vmID, String remotingSessionID,
- ServerInvokerCallbackHandler callbackHandler) throws JMSException
- {
- ((ServerConnectionFactoryEndpoint)endpoint).removeCallback(vmID, remotingSessionID,
- callbackHandler);
- }
public void removeSender(String vmID, String remotingSessionID,
PacketSender sender) throws JMSException
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -10,27 +10,36 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filter.logging.MdcInjectionFilter;
import org.apache.mina.filter.reqres.Request;
import org.apache.mina.filter.reqres.RequestResponseFilter;
-import org.apache.mina.filter.reqres.RequestTimeoutException;
import org.apache.mina.filter.reqres.Response;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.jboss.jms.exception.MessagingNetworkFailureException;
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
import org.jboss.messaging.core.remoting.internal.MinaHandler;
import org.jboss.messaging.core.remoting.internal.MinaInspector;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.remoting.ConnectionListener;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -39,6 +48,10 @@
*/
public class Client
{
+ // Constants -----------------------------------------------------
+
+ private final Logger log = Logger.getLogger(Client.class);
+
// Attributes ----------------------------------------------------
private IoSession session;
@@ -49,6 +62,8 @@
private NioSocketConnector connector;
private ScheduledExecutorService blockingScheduler;
+ private Map<ConnectionListener, IoServiceListener> listeners = new HashMap<ConnectionListener, IoServiceListener>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -89,6 +104,8 @@
connector.getSessionConfig().setReuseAddress(true);
InetSocketAddress address = new InetSocketAddress(host, port);
ConnectFuture future = connector.connect(address);
+ connector.setDefaultRemoteAddress(address);
+
future.awaitUninterruptibly();
if (!future.isConnected())
{
@@ -115,6 +132,10 @@
connector.dispose();
blockingScheduler.shutdown();
+ connector = null;
+ blockingScheduler = null;
+ session = null;
+
return closed;
}
@@ -127,7 +148,7 @@
return Long.toString(session.getId());
}
- public void sendOneWay(AbstractPacket packet)
+ public void sendOneWay(AbstractPacket packet) throws JMSException
{
assert packet != null;
checkConnected();
@@ -136,7 +157,7 @@
}
public AbstractPacket sendBlocking(AbstractPacket packet)
- throws TimeoutException
+ throws IOException, JMSException
{
assert packet != null;
checkConnected();
@@ -151,16 +172,11 @@
{
response = req.awaitResponse();
return (AbstractPacket) response.getMessage();
- } catch (RequestTimeoutException e)
+ } catch (Throwable t)
{
- TimeoutException toe = new TimeoutException();
- toe.initCause(e);
- throw toe;
- } catch (InterruptedException e)
- {
- TimeoutException toe = new TimeoutException();
- toe.initCause(e);
- throw toe;
+ IOException ioe = new IOException();
+ ioe.initCause(t);
+ throw ioe;
}
}
@@ -170,6 +186,53 @@
this.blockingRequestTimeUnit = unit;
}
+ public void addConnectionListener(final ConnectionListener listener)
+ {
+ assert listener != null;
+ assert connector != null;
+
+ IoServiceListener ioListener = new IoServiceListenerAdapter(listener);
+ connector.addListener(ioListener);
+ listeners.put(listener, ioListener);
+
+ if (log.isTraceEnabled())
+ log.trace("added listener " + listener + " to " + this);
+ }
+
+ public void removeConnectionListener(ConnectionListener listener)
+ {
+ assert listener != null;
+ assert connector != null;
+
+ connector.removeListener(listeners.get(listener));
+ listeners.remove(listener);
+
+ if (log.isTraceEnabled())
+ log.trace("removed listener " + listener + " from " + this);
+ }
+
+ public boolean isConnected()
+ {
+ if (session == null)
+ return false;
+ else
+ return session.isConnected();
+ }
+
+ public String getURI()
+ {
+ if (connector == null)
+ return null;
+ else
+ return connector.getDefaultRemoteAddress().toString();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Client[session=" + session + "]";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -185,14 +248,61 @@
chain.addLast("reqres", filter);
}
- private void checkConnected()
+ private void checkConnected() throws JMSException
{
if (session == null)
{
- throw new IllegalStateException("RemoteDispatcher is not connected.");
+ throw new IllegalStateException("Client " + this + " is not connected.");
}
+ if (!session.isConnected()) {
+ throw new MessagingNetworkFailureException("Client " + this + " is not connected.");
+ }
}
// Inner classes -------------------------------------------------
+ private final class IoServiceListenerAdapter implements
+ IoServiceListener
+ {
+ private final Logger log = Logger.getLogger(IoServiceListenerAdapter.class);
+
+ private final ConnectionListener listener;
+
+ private IoServiceListenerAdapter(ConnectionListener listener)
+ {
+ this.listener = listener;
+ }
+
+ public void serviceActivated(IoService service)
+ {
+ if (log.isTraceEnabled())
+ log.trace("activated " + service);
+ }
+
+ public void serviceDeactivated(IoService service)
+ {
+ if (log.isTraceEnabled())
+ log.trace("deactivated " + service);
+ }
+
+ public void serviceIdle(IoService service, IdleStatus idleStatus)
+ {
+ if (log.isTraceEnabled())
+ log.trace("idle " + service + ", status=" + idleStatus);
+ }
+
+ public void sessionCreated(IoSession session)
+ {
+ if (log.isInfoEnabled())
+ log.info("created session " + session);
+ }
+
+ public void sessionDestroyed(IoSession session)
+ {
+ log.warn("destroyed session " + session);
+
+ Throwable t = new Throwable("MINA session has been destroyed");
+ listener.handleConnectionException(t, null);
+ }
+ }
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -47,13 +47,13 @@
{
assertValidID(handler.getID());
assert handler != null;
+
+ handlers.put(handler.getID(), handler);
if (log.isDebugEnabled())
{
- log.debug("register " + handler);
+ log.debug("registered " + handler + " with ID " + handler.getID());
}
-
- handlers.put(handler.getID(), handler);
}
public void unregister(String handlerID)
@@ -64,7 +64,7 @@
if (log.isDebugEnabled())
{
- log.debug("unregister handler for " + handlerID);
+ log.debug("unregistered handler for " + handlerID);
}
}
Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java (from rev 3403, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ClientTest.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -0,0 +1,203 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.IllegalStateException;
+
+import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class ClientTest extends TestSupport
+{
+ private ReversePacketHandler serverPacketHandler;
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testConnected() throws Exception
+ {
+ Client client = new Client();
+
+ assertFalse(client.isConnected());
+
+ client.connect("localhost", PORT, TCP);
+ assertTrue(client.isConnected());
+
+ assertTrue(client.disconnect());
+ assertFalse(client.isConnected());
+ assertFalse(client.disconnect());
+ }
+
+ public void testSessionID() throws Exception
+ {
+ Client client = new Client();
+ assertNull(client.getSessionID());
+ client.connect("localhost", PORT, TCP);
+ assertNotNull(client.getSessionID());
+ client.disconnect();
+ assertNull(client.getSessionID());
+ }
+
+ public void testURI() throws Exception
+ {
+ Client client = new Client();
+ assertNull(client.getURI());
+ client.connect("localhost", PORT, TCP);
+ assertNotNull(client.getURI());
+ client.disconnect();
+ assertNull(client.getURI());
+ }
+
+ public void testCanNotSendPacketIfNotConnected() throws Exception
+ {
+ Client client = new Client();
+
+ try
+ {
+ client.sendOneWay(new NullPacket());
+ fail("can not send a packet if the dispatcher is not connected");
+ } catch (IllegalStateException e)
+ {
+
+ }
+ }
+
+ public void testSendOneWay() throws Exception
+ {
+ serverPacketHandler.expectMessage(1);
+
+ TextPacket packet = new TextPacket("testSendOneWay");
+ packet.setVersion((byte) 1);
+ packet.setTargetID(serverPacketHandler.getID());
+ client.sendOneWay(packet);
+
+ serverPacketHandler.await();
+
+ List<TextPacket> messages = serverPacketHandler.getPackets();
+ assertEquals(1, messages.size());
+ String response = ((TextPacket) messages.get(0)).getText();
+ assertEquals(packet.getText(), response);
+ }
+
+ public void testSendManyOneWay() throws Exception
+ {
+ serverPacketHandler.expectMessage(MANY_MESSAGES);
+
+ TextPacket[] packets = new TextPacket[MANY_MESSAGES];
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ packets[i] = new TextPacket("testSendManyOneWay " + i);
+ packets[i].setVersion((byte) 1);
+ packets[i].setTargetID(serverPacketHandler.getID());
+ client.sendOneWay(packets[i]);
+ }
+
+ serverPacketHandler.await();
+
+ List<TextPacket> receivedPackets = serverPacketHandler.getPackets();
+ assertEquals(MANY_MESSAGES, receivedPackets.size());
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ TextPacket receivedPacket = (TextPacket) receivedPackets.get(i);
+ assertEquals(packets[i].getText(), receivedPacket.getText());
+ }
+ }
+
+ public void testSendOneWayWithCallbackHandler() throws Exception
+ {
+ TestPacketHandler callbackHandler = new TestPacketHandler();
+ callbackHandler.expectMessage(1);
+
+ PacketDispatcher.client.register(callbackHandler);
+
+ TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
+ packet.setVersion((byte) 1);
+ packet.setTargetID(serverPacketHandler.getID());
+ packet.setCallbackID(callbackHandler.getID());
+
+ client.sendOneWay(packet);
+
+ callbackHandler.await();
+
+ assertEquals(1, callbackHandler.getPackets().size());
+ String response = callbackHandler.getPackets().get(0).getText();
+ assertEquals(reverse(packet.getText()), response);
+ }
+
+ public void testSendBlocking() throws Exception
+ {
+ TextPacket request = new TextPacket("testSendBlocking");
+ request.setVersion((byte) 1);
+ request.setTargetID(serverPacketHandler.getID());
+
+ AbstractPacket receivedPacket = client.sendBlocking(request);
+
+ assertNotNull(receivedPacket);
+ assertTrue(receivedPacket instanceof TextPacket);
+ TextPacket response = (TextPacket) receivedPacket;
+ assertEquals(reverse(request.getText()), response.getText());
+ }
+
+ public void testSendBlockingWithTimeout() throws Exception
+ {
+ client.setBlockingRequestTimeout(500, MILLISECONDS);
+ serverPacketHandler.setSleepTime(1000, MILLISECONDS);
+
+ AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+ packet.setVersion((byte) 1);
+
+ try
+ {
+ client.sendBlocking(packet);
+ fail("a IOException should be thrown");
+ } catch (IOException e)
+ {
+ }
+ }
+
+ // TestCase implementation ---------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startServer(TestSupport.PORT, TRANSPORT);
+ startClient(TestSupport.PORT, TRANSPORT);
+
+ serverPacketHandler = new ReversePacketHandler();
+ PacketDispatcher.server.register(serverPacketHandler);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
+ client.disconnect();
+ stopServer();
+ }
+}
Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -1,166 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.core.remoting;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import org.jboss.messaging.core.remoting.Client;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.TextPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class RemoteDispatcherTest extends TestSupport
-{
- private ReversePacketHandler serverPacketHandler;
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testCanNotSendPacketIfNotConnected() throws Exception
- {
- Client client = new Client();
-
- try
- {
- client.sendOneWay(new NullPacket());
- fail("can not send a packet if the dispatcher is not connected");
- } catch (IllegalStateException e)
- {
-
- }
- }
-
- public void testSendOneWay() throws Exception
- {
- serverPacketHandler.expectMessage(1);
-
- TextPacket packet = new TextPacket("testSendOneWay");
- packet.setVersion((byte)1);
- packet.setTargetID(serverPacketHandler.getID());
- client.sendOneWay(packet);
-
- serverPacketHandler.await();
-
- List<TextPacket> messages = serverPacketHandler.getPackets();
- assertEquals(1, messages.size());
- String response = ((TextPacket) messages.get(0)).getText();
- assertEquals(packet.getText(), response);
- }
-
- public void testSendManyOneWay() throws Exception
- {
- serverPacketHandler.expectMessage(MANY_MESSAGES);
-
- TextPacket[] packets = new TextPacket[MANY_MESSAGES];
- for (int i = 0; i < MANY_MESSAGES; i++)
- {
- packets[i] = new TextPacket("testSendManyOneWay " + i);
- packets[i].setVersion((byte)1);
- packets[i].setTargetID(serverPacketHandler.getID());
- client.sendOneWay(packets[i]);
- }
-
- serverPacketHandler.await();
-
- List<TextPacket> receivedPackets = serverPacketHandler.getPackets();
- assertEquals(MANY_MESSAGES, receivedPackets.size());
- for (int i = 0; i < MANY_MESSAGES; i++)
- {
- TextPacket receivedPacket = (TextPacket) receivedPackets.get(i);
- assertEquals(packets[i].getText(), receivedPacket.getText());
- }
- }
-
- public void testSendOneWayWithCallbackHandler() throws Exception
- {
- TestPacketHandler callbackHandler = new TestPacketHandler();
- callbackHandler.expectMessage(1);
-
- PacketDispatcher.client.register(callbackHandler);
-
- TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
- packet.setVersion((byte)1);
- packet.setTargetID(serverPacketHandler.getID());
- packet.setCallbackID(callbackHandler.getID());
-
- client.sendOneWay(packet);
-
- callbackHandler.await();
-
- assertEquals(1, callbackHandler.getPackets().size());
- String response = callbackHandler.getPackets().get(0).getText();
- assertEquals(reverse(packet.getText()), response);
- }
-
- public void testSendBlocking() throws Exception
- {
- TextPacket request = new TextPacket("testSendBlocking");
- request.setVersion((byte)1);
- request.setTargetID(serverPacketHandler.getID());
-
- AbstractPacket receivedPacket = client.sendBlocking(request);
-
- assertNotNull(receivedPacket);
- assertTrue(receivedPacket instanceof TextPacket);
- TextPacket response = (TextPacket) receivedPacket;
- assertEquals(reverse(request.getText()), response.getText());
- }
-
- public void testSendBlockingWithTimeout() throws Exception
- {
- client.setBlockingRequestTimeout(500, MILLISECONDS);
- serverPacketHandler.setSleepTime(1000, MILLISECONDS);
-
- AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
- packet.setVersion((byte)1);
-
- try
- {
- client.sendBlocking(packet);
- fail("a RequestTimeoutException should be thrown");
- } catch (TimeoutException e)
- {
- }
- }
-
- // TestCase implementation ---------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- startServer(TestSupport.PORT, TRANSPORT);
- startClient(TestSupport.PORT, TRANSPORT);
-
- serverPacketHandler = new ReversePacketHandler();
- PacketDispatcher.server.register(serverPacketHandler);
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- PacketDispatcher.server.unregister(serverPacketHandler.getID());
-
- client.disconnect();
- stopServer();
- }
-}
Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/RemotingConnectionConfigurationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/RemotingConnectionConfigurationTest.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/RemotingConnectionConfigurationTest.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -1,212 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.HashSet;
-import java.util.Map;
-
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.remoting.Client;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.callback.CallbackPoller;
-import org.jboss.remoting.callback.InvokerCallbackHandler;
-import org.jboss.remoting.transport.Connector;
-import org.jboss.remoting.transport.PortUtil;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
-
-/**
- *
- * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
- * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
- *
- * $Id$
- */
-public class RemotingConnectionConfigurationTest extends JMSTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public RemotingConnectionConfigurationTest(String name)
- {
- super(name);
- }
- // Public --------------------------------------------------------
-
- /**
- * It only makes sense to run remote. Exclude it from "invm-tests" target configuration.
- */
- public void testDefaultHTTPCallbackPollPeriod() throws Exception
- {
- if (!ServerManagement.isRemote())
- {
- return;
- }
-
- JBossConnection connection = null;
-
- try
- {
- connection = (JBossConnection)cf.createConnection();
- connection.start();
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)connection.getDelegate();
- JMSRemotingConnection remotingConnection = delegate.getRemotingConnection();
- Client client = remotingConnection.getRemotingClient();
-
- Field field = JMSRemotingConnection.class.getDeclaredField("serverLocator");
- field.setAccessible(true);
- InvokerLocator locator = (InvokerLocator)field.get(remotingConnection);
- String transport = locator.getProtocol();
-
- if (!"http".equals(transport))
- {
- // not interesting
- return;
- }
-
- field = Client.class.getDeclaredField("callbackPollers");
- field.setAccessible(true);
- Map callbackPollers = (Map)field.get(client);
- assertEquals(1, callbackPollers.size());
-
- CallbackPoller callbackPoller = (CallbackPoller)callbackPollers.values().iterator().next();
-
- field = CallbackPoller.class.getDeclaredField("pollPeriod");
- field.setAccessible(true);
- Long pollPeriod = (Long)field.get(callbackPoller);
- assertEquals(ServiceContainer.HTTP_CONNECTOR_CALLBACK_POLL_PERIOD, pollPeriod.longValue());
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
-
-
- /**
- * It only makes sense to run remote. Exclude it from "invm-tests" target configuration.
- */
- public void testConnectionConfiguration() throws Exception
- {
- if (!ServerManagement.isRemote())
- {
- return;
- }
-
- JBossConnection connection = null;
-
- try
- {
- String address = InetAddress.getLocalHost().getHostAddress();
- System.setProperty("jboss.messaging.callback.bind.address", address);
-
- int freePort = PortUtil.findFreePort(InetAddress.getLocalHost().getHostName());
- System.setProperty("jboss.messaging.callback.bind.port", Integer.toString(freePort));
-
- String pollPeriod = "654";
- System.setProperty("jboss.messaging.callback.pollPeriod", pollPeriod);
-
- System.setProperty("jboss.messaging.callback.reportPollingStatistics", "true");
-
- connection = (JBossConnection)cf.createConnection();
- connection.start();
-
- ClientConnectionDelegate delegate = (ClientConnectionDelegate)connection.getDelegate();
- JMSRemotingConnection remotingConnection = delegate.getRemotingConnection();
- Client client = remotingConnection.getRemotingClient();
-
- Field field = JMSRemotingConnection.class.getDeclaredField("serverLocator");
- field.setAccessible(true);
- InvokerLocator locator = (InvokerLocator)field.get(remotingConnection);
- String transport = locator.getProtocol();
-
- if ("socket".equals(transport)
- || "sslsocket".equals(transport)
- || "bisocket".equals(transport)
- || "sslbisocket".equals(transport))
- {
- field = Client.class.getDeclaredField("callbackConnectors");
- field.setAccessible(true);
- Map callbackConnectors = (Map)field.get(client);
-
- InvokerCallbackHandler callbackHandler = remotingConnection.getCallbackManager();
- HashSet map = (HashSet) callbackConnectors.get(callbackHandler);
- Connector connector = (Connector)map.iterator().next();
- locator = new InvokerLocator(connector.getInvokerLocator());
- assertEquals(address, locator.getHost());
- assertEquals(freePort, locator.getPort());
- }
- else if ("http".equals(transport))
- {
- field = Client.class.getDeclaredField("callbackPollers");
- field.setAccessible(true);
- Map callbackPollers = (Map)field.get(client);
- assertEquals(1, callbackPollers.size());
-
- CallbackPoller callbackPoller =
- (CallbackPoller)callbackPollers.values().iterator().next();
-
- field = CallbackPoller.class.getDeclaredField("pollPeriod");
- field.setAccessible(true);
-
- assertEquals(pollPeriod, ((Long)field.get(callbackPoller)).toString());
-
- field = CallbackPoller.class.getDeclaredField("reportStatistics");
- field.setAccessible(true);
- assertEquals(true, ((Boolean) field.get(callbackPoller)).booleanValue());
- }
- else
- {
- fail("Unrecognized transport: " + transport);
- }
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
-
- System.clearProperty("jboss.messaging.callback.bind.address");
-
- System.clearProperty("jboss.messaging.callback.bind.port");
-
- System.clearProperty("jboss.messaging.callback.pollPeriod");
-
- System.clearProperty("jboss.messaging.callback.reportPollingStatistics");
-
- }
- }
-}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -32,7 +32,6 @@
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.container.Command;
import org.jboss.test.messaging.tools.container.Server;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
/**
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -153,7 +152,7 @@
public Object execute(Server server) throws Exception
{
- int size = server.getServerPeer().getConnectionManager().getConnectionFactoryCallback(uniqueName).length;
+ int size = server.getServerPeer().getConnectionManager().getConnectionFactorySenders(uniqueName).length;
return new Integer(size);
}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -289,7 +289,7 @@
protected String getLocatorURL(Connection conn)
{
return getConnectionState(conn).getRemotingConnection().
- getRemotingClient().getInvoker().getLocator().getLocatorURI();
+ getRemotingClient().getURI();
}
protected String getObjectId(Connection conn)
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -38,8 +38,8 @@
import org.jboss.jms.client.FailoverEvent;
import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
@@ -183,14 +183,14 @@
conn1.close();
assertNotNull(state2.getRemotingConnection());
- assertNotNull(state2.getRemotingConnection().getRemotingClient().getInvoker());
- assertTrue(state2.getRemotingConnection().getRemotingClient().getInvoker().isConnected());
+ assertNotNull(state2.getRemotingConnection().getRemotingClient());
+ assertTrue(state2.getRemotingConnection().getRemotingClient().isConnected());
conn2.close();
assertNotNull(state3.getRemotingConnection());
- assertNotNull(state3.getRemotingConnection().getRemotingClient().getInvoker());
- assertTrue(state3.getRemotingConnection().getRemotingClient().getInvoker().isConnected());
+ assertNotNull(state3.getRemotingConnection().getRemotingClient());
+ assertTrue(state3.getRemotingConnection().getRemotingClient().isConnected());
// When I created the testcase this was failing, throwing exceptions. This was basically why
// I created this testcase
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -314,7 +314,7 @@
getConnection(new Connection[]{conn1, conn2, conn3}, i);
String locator = ((ClientConnectionDelegate) connTest.getDelegate()).
- getRemotingConnection().getRemotingClient().getInvoker().getLocator().getLocatorURI();
+ getRemotingConnection().getRemotingClient().getURI();
log.info("Server " + i + " has locator=" + locator);
Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/thirdparty/remoting/CallbackServerTimeoutTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/thirdparty/remoting/CallbackServerTimeoutTest.java 2007-12-06 08:48:04 UTC (rev 3418)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/thirdparty/remoting/CallbackServerTimeoutTest.java 2007-12-06 09:11:24 UTC (rev 3419)
@@ -1,273 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.thirdparty.remoting;
-
-import javax.management.ObjectName;
-
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.wireformat.JMSWireFormat;
-import org.jboss.logging.Logger;
-import org.jboss.remoting.Client;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.ServerInvoker;
-import org.jboss.remoting.callback.Callback;
-import org.jboss.remoting.callback.HandleCallbackException;
-import org.jboss.remoting.callback.InvokerCallbackHandler;
-import org.jboss.remoting.marshal.MarshalFactory;
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
-import org.jboss.test.thirdparty.remoting.util.OnewayCallbackTrigger;
-import org.jboss.test.thirdparty.remoting.util.RemotingTestSubsystemService;
-
-import EDU.oswego.cs.dl.util.concurrent.Channel;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
-/**
- * An extra test for the same root problem that causes
- * http://jira.jboss.org/jira/browse/JBMESSAGING-371. The callback server seems to timeout never
- * to be heard from it again.
- *
- * @author <a href="mailto:ovidiu at svjboss.org">Ovidiu Feodorov</a>
- *
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class CallbackServerTimeoutTest extends MessagingTestCase
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(CallbackServerTimeoutTest.class);
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- private InvokerLocator serverLocator;
- private boolean firstTime = true;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public CallbackServerTimeoutTest(String name)
- {
- super(name);
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void testTimeoutOnewayCallback() throws Throwable
- {
- if (!isRemote())
- {
- fail("This test should be run in a remote configuration!");
- }
-
- Client client = null;
- ObjectName subsystemService = null;
- CallbackServerTimeoutTest.SimpleCallbackHandler callbackHandler = null;
-
- try
- {
- subsystemService = RemotingTestSubsystemService.deployService();
-
- client = new Client(serverLocator, RemotingTestSubsystemService.SUBSYSTEM_LABEL);
-
- callbackHandler = new SimpleCallbackHandler();
-
- client.connect();
-
- JMSRemotingConnection.
- addInvokerCallbackHandler("test", client, null, serverLocator, callbackHandler);
-
- client.invoke(new OnewayCallbackTrigger("blip"));
-
- // make sure we get the callback
-
- Callback c = callbackHandler.getNextCallback(3000);
-
- assertNotNull(c);
- assertEquals("blip", c.getParameter());
-
- // sleep for twice the timeout, to be sure
- long sleepTime = ServerInvoker.DEFAULT_TIMEOUT_PERIOD + 60000;
- log.info("sleeping for " + (sleepTime / 60000) + " minutes ...");
-
- Thread.sleep(sleepTime);
-
- log.debug("woke up");
-
- client.invoke(new OnewayCallbackTrigger("blop"));
-
- // make sure we get the callback
-
- c = callbackHandler.getNextCallback(3000);
-
- assertNotNull(c);
- assertEquals("blop", c.getParameter());
-
- }
- finally
- {
- if (client != null)
- {
- // Note. Calling Client.disconnect() does remove the InvokerCallbackHandler registered
- // above. For the http transport, the CallbackPoller will continue running, which will
- // generate a lot of ERROR log messages after the server has shut down.
- client.removeListener(callbackHandler);
- client.disconnect();
- }
-
- RemotingTestSubsystemService.undeployService(subsystemService);
- }
- }
-
- public void testTimeoutOnewayCallback2() throws Throwable
- {
- if (!isRemote())
- {
- fail("This test should be run in a remote configuration!");
- }
-
- Client client = null;
- ObjectName subsystemService = null;
- CallbackServerTimeoutTest.SimpleCallbackHandler callbackHandler = null;
-
- try
- {
- subsystemService = RemotingTestSubsystemService.deployService();
-
- client = new Client(serverLocator, RemotingTestSubsystemService.SUBSYSTEM_LABEL);
-
- callbackHandler = new SimpleCallbackHandler();
-
- client.connect();
-
- JMSRemotingConnection.
- addInvokerCallbackHandler("test", client, null, serverLocator, callbackHandler);
-
- log.info("added listener");
-
- // sleep for twice the timeout, to be sure
- long sleepTime = ServerInvoker.DEFAULT_TIMEOUT_PERIOD + 60000;
-
- client.invoke(new OnewayCallbackTrigger("blip", new long[] { 0, sleepTime + 10000 }));
-
- log.info("sent invocation");
-
- // make sure we get the callback
-
- Callback c = callbackHandler.getNextCallback(3000);
-
- assertNotNull(c);
- assertEquals("blip", c.getParameter());
-
- log.info("sleeping for " + (sleepTime / 60000) + " minutes ...");
-
- Thread.sleep(sleepTime);
-
- log.debug("woke up");
-
- // make sure we get the second callback
-
- c = callbackHandler.getNextCallback(20000);
-
- assertNotNull(c);
- assertEquals("blip1", c.getParameter());
-
- }
- finally
- {
- if (client != null)
- {
- client.disconnect();
- }
-
- RemotingTestSubsystemService.undeployService(subsystemService);
- }
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- // This test needs a special server, so make sure no other test left a server running.
- if (firstTime)
- {
- firstTime = false;
- ServerManagement.start(0, "remoting", null, true, false);
- ServerManagement.stop();
- }
-
- // start a "standard" (messaging-enabled) remoting, we need to strip off the
- // marshaller/unmarshaller, though, since it can only bring trouble to this test ...
- ServiceAttributeOverrides sao = new ServiceAttributeOverrides();
- sao.put(ServiceContainer.REMOTING_OBJECT_NAME,
- ServiceContainer.DO_NOT_USE_MESSAGING_MARSHALLERS, Boolean.TRUE);
-
-
- JMSWireFormat wf = new JMSWireFormat();
-
- MarshalFactory.addMarshaller("jms", wf, wf);
-
- ServerManagement.start(0, "remoting", sao, true, false);
-
- String s = (String)ServerManagement.
- getAttribute(ServiceContainer.REMOTING_OBJECT_NAME, "InvokerLocator");
-
- serverLocator = new InvokerLocator(s);
- log.info("InvokerLocator: " + serverLocator);
-
- log.debug("setup done");
- }
-
- protected void tearDown() throws Exception
- {
- serverLocator = null;
-
- ServerManagement.stop();
-
- super.tearDown();
- }
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
- private class SimpleCallbackHandler implements InvokerCallbackHandler
- {
- private Channel callbackHistory;
-
- public SimpleCallbackHandler()
- {
- callbackHistory = new LinkedQueue();
- }
-
- public void handleCallback(Callback callback) throws HandleCallbackException
- {
- try
- {
- callbackHistory.put(callback);
- }
- catch(InterruptedException e)
- {
- throw new HandleCallbackException("Got InterruptedException", e);
- }
- }
-
- public Callback getNextCallback(long timeout) throws InterruptedException
- {
- return (Callback)callbackHistory.poll(timeout);
- }
- }
-
-}
More information about the jboss-cvs-commits
mailing list