[Jboss-cvs] JBoss Messaging SVN: r1235 - in branches/Branch_1_0: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/connectionmanager src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core/plugin tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 30 04:34:04 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-08-30 04:33:58 -0400 (Wed, 30 Aug 2006)
New Revision: 1235
Modified:
branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
Log:
fixed incomplete JMSRemotingConnection closing process. http://jira.jboss.org/jira/browse/JBMESSAGING-535
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -170,13 +170,13 @@
ConnectionState state = getState(invocation);
- //Finished with the connection - we need to shutdown callback server
- state.getRemotingConnection().close();
+ // Finished with the connection - we need to shutdown callback server
+ state.getRemotingConnection().stop();
- //Remove reference to resource manager
+ // Remove reference to resource manager
ResourceManagerFactory.instance.returnResourceManager(state.getServerID());
- //Remove reference to message id generator
+ // Remove reference to message id generator
MessageIdGeneratorFactory.instance.returnGenerator(state.getServerID());
return ret;
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -156,6 +156,7 @@
// Create a new connection
remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing);
+ remotingConnection.start();
client = remotingConnection.getInvokingClient();
@@ -208,7 +209,7 @@
{
try
{
- remotingConnection.close();
+ remotingConnection.stop();
}
catch (Throwable ignore)
{
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -96,7 +96,7 @@
return h.server;
}
- public synchronized void returnCallbackServer(String protocol)
+ public synchronized void stopCallbackServer(String protocol)
{
Holder h = (Holder)holders.get(protocol);
@@ -200,6 +200,7 @@
protected void stopCallbackServer(Connector server)
{
+ log.debug("Stopping and destroying callback server " + server.getLocator().getLocatorURI());
server.stop();
server.destroy();
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -29,6 +29,7 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
import org.jboss.remoting.transport.Connector;
@@ -57,56 +58,74 @@
// Attributes ----------------------------------------------------
protected Client client;
+ protected boolean clientPing;
protected Connector callbackServer;
protected InvokerLocator serverLocator;
protected CallbackManager callbackManager;
+ private InvokerCallbackHandler dummyCallbackHandler;
+
// Constructors --------------------------------------------------
public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Throwable
{
serverLocator = new InvokerLocator(serverLocatorURI);
-
- // Enable client pinging
- // Server leasing is enabled separately on the server side
+ this.clientPing = clientPing;
+ dummyCallbackHandler = new DummyCallbackHandler();
+
+ log.debug(this + " created");
+ }
+
+ // Public --------------------------------------------------------
+
+ public void start() throws Throwable
+ {
+ // Enable client pinging. Server leasing is enabled separately on the server side
+
Map config = new HashMap();
-
config.put(Client.ENABLE_LEASE, String.valueOf(clientPing));
client = new Client(serverLocator, config);
-
+
client.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
- if (log.isTraceEnabled()) { log.trace("created client"); }
-
+ if (log.isTraceEnabled()) { log.trace(this + " created client"); }
+
// Get the callback server
-
+
callbackServer = CallbackServerFactory.instance.getCallbackServer(serverLocator);
-
callbackManager = (CallbackManager)callbackServer.getInvocationHandlers()[0];
-
+
client.connect();
-
- // We explictly set the Marshaller since otherwise remoting tries to resolve the marshaller
+
+ // We explicitly set the Marshaller since otherwise remoting tries to resolve the marshaller
// every time which is very slow - see org.jboss.remoting.transport.socket.ProcessInvocation
// This can make a massive difference on performance. We also do this in
// ServerConnectionEndpoint.setCallbackClient.
client.setMarshaller(new JMSWireFormat());
client.setUnMarshaller(new JMSWireFormat());
-
- client.addListener(new DummyCallbackHandler(), callbackServer.getLocator());
- log.debug(this + " created");
+ // We add a dummy callback handler only to trigger the addListener method on the
+ // JMSServerInvocationHandler to be called, which allows the server to get hold of a reference
+ // to the callback client so it can make callbacks
+
+ client.addListener(dummyCallbackHandler, callbackServer.getLocator());
+
+ log.debug(this + " started");
}
- // Public --------------------------------------------------------
-
- public void close() throws Throwable
+ public void stop() throws Throwable
{
log.debug(this + " closing");
+
+ // explicitly remove the callback listener, to avoid race conditions on server
+ // (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
+
+ client.removeListener(dummyCallbackHandler);
+ dummyCallbackHandler = null;
- CallbackServerFactory.instance.returnCallbackServer(serverLocator.getProtocol());
+ CallbackServerFactory.instance.stopCallbackServer(serverLocator.getProtocol());
client.disconnect();
@@ -123,6 +142,11 @@
return callbackManager;
}
+ public String toString()
+ {
+ return "JMSRemotingConnection[" + serverLocator.getLocatorURI() + "]";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -117,13 +117,14 @@
if (jmsClientId != null)
{
- log.warn("A problem has been detected with the connection to remote client " + remotingSessionID
- + " It is possible the client has exited without closing its connection(s) or there is a network "
- + "problem. "
- + "All connection resources corresponding to that client process will now be removed.");
+ log.warn("A problem has been detected with the connection to remote client " +
+ remotingSessionID + ". It is possible the client has exited without closing " +
+ "its connection(s) or there is a network problem. All connection resources " +
+ "corresponding to that client process will now be removed.");
- //Remoting only provides one pinger per invoker, not per connection therefore when the pinger dies
- //we must close ALL the connections corresponding to that jms client id
+ // Remoting only provides one pinger per invoker, not per connection therefore when the
+ // pinger dies we must close ALL the connections corresponding to that jms client id
+
Map endpoints = (Map)jmsClients.get(jmsClientId);
if (endpoints != null)
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -735,7 +735,7 @@
try
{
- if (trace) { log.trace("handing " + list.size() + " message(s) over to the remoting layer"); }
+ if (trace) { log.trace(ServerConsumerEndpoint.this + "handing " + list.size() + " message(s) over to the remoting layer"); }
ClientDelivery del = new ClientDelivery(list, id);
@@ -745,7 +745,7 @@
MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
- if (trace) { log.trace("handed messages over to the remoting layer"); }
+ if (trace) { log.trace(ServerConsumerEndpoint.this + "handed messages over to the remoting layer"); }
HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
@@ -760,8 +760,8 @@
}
catch(Throwable t)
{
- log.warn("Failed to deliver the message to the client. See the server log for more details");
- log.debug("Failed to deliver the message to the client.", t);
+ log.warn("Failed to deliver the message to the client. See the server log for more details.");
+ log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -112,6 +112,7 @@
if (callbackHandler != null)
{
log.debug("found calllback handler for remoting session " + Util.guidToString(s));
+
i.getMetaData().addMetaData(MetaDataConstants.JMS,
MetaDataConstants.CALLBACK_HANDLER,
callbackHandler, PayloadKey.TRANSIENT);
Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -1861,16 +1861,13 @@
public void resetLoadedStatus(long channelID) throws Exception
{
- if (trace)
- {
- log.trace("resetting all channel data for channel " + channelID);
- }
+ if (trace) { log.trace("resetting all channel data for channel " + channelID); }
Connection conn = null;
PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
- log.trace("Resetting message data. This may take several minutes for large queues/subscriptions...");
+ log.debug("Resetting message data. This may take several minutes for large queues/subscriptions...");
try
{
@@ -1884,11 +1881,7 @@
int rows = ps.executeUpdate();
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(updateReliableRefsNotLoaded)
- + " updated " + rows + " rows");
- }
+ if (trace) { log.trace(JDBCUtil.statementToString(updateReliableRefsNotLoaded) + " updated " + rows + " rows"); }
ps.close();
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -85,19 +85,19 @@
assertFalse(server1 == server3);
- CallbackServerFactory.instance.returnCallbackServer(locator1.getProtocol());
+ CallbackServerFactory.instance.stopCallbackServer(locator1.getProtocol());
assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator1.getProtocol()));
- CallbackServerFactory.instance.returnCallbackServer(locator2.getProtocol());
+ CallbackServerFactory.instance.stopCallbackServer(locator2.getProtocol());
assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator2.getProtocol()));
- CallbackServerFactory.instance.returnCallbackServer(locator1.getProtocol());
+ CallbackServerFactory.instance.stopCallbackServer(locator1.getProtocol());
assertFalse(CallbackServerFactory.instance.containsCallbackServer(locator1.getProtocol()));
- CallbackServerFactory.instance.returnCallbackServer(locator2.getProtocol());
+ CallbackServerFactory.instance.stopCallbackServer(locator2.getProtocol());
assertFalse(CallbackServerFactory.instance.containsCallbackServer(locator2.getProtocol()));
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2006-08-30 08:33:58 UTC (rev 1235)
@@ -29,10 +29,12 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Message;
import javax.naming.InitialContext;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
+import EDU.oswego.cs.dl.util.concurrent.Slot;
/**
* The most comprehensive, yet simple, unit test.
@@ -47,7 +49,7 @@
// Constants -----------------------------------------------------
// Static --------------------------------------------------------
-
+
// Attributes ----------------------------------------------------
InitialContext ic;
@@ -66,9 +68,9 @@
super.setUp();
ServerManagement.start("all");
-
+
ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-
+
ServerManagement.deployQueue("JMSTestQueue");
log.debug("setup done");
@@ -189,7 +191,60 @@
conn.close();
}
+ public void test_NonPersistent_NonTransactional_Asynchronous_to_Client() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+ Connection conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final MessageConsumer cons = session.createConsumer(queue);
+
+ conn.start();
+
+ final Slot slot = new Slot();
+
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Message m = cons.receive(5000);
+ if (m != null)
+ {
+ slot.put(m);
+ }
+ }
+ catch(Exception e)
+ {
+ log.error("receive failed", e);
+ }
+
+ }
+ }, "Receiving Thread").start();
+
+
+ Thread.sleep(500);
+
+ MessageProducer prod = session.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage m = session.createTextMessage("message one");
+
+ prod.send(m);
+
+ TextMessage rm = (TextMessage)slot.poll(5000);
+
+ assertEquals("message one", rm.getText());
+
+ conn.close();
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list