[Jboss-cvs] JBoss Messaging SVN: r1235 - in branches/Branch_1_0: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/connectionmanager src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core/plugin tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 30 04:34:04 EDT 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-08-30 04:33:58 -0400 (Wed, 30 Aug 2006)
New Revision: 1235

Modified:
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
   branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
Log:
fixed incomplete JMSRemotingConnection closing process. http://jira.jboss.org/jira/browse/JBMESSAGING-535

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -170,13 +170,13 @@
       
       ConnectionState state = getState(invocation);
       
-      //Finished with the connection - we need to shutdown callback server
-      state.getRemotingConnection().close();
+      // Finished with the connection - we need to shutdown callback server
+      state.getRemotingConnection().stop();
       
-      //Remove reference to resource manager
+      // Remove reference to resource manager
       ResourceManagerFactory.instance.returnResourceManager(state.getServerID());
       
-      //Remove reference to message id generator
+      // Remove reference to message id generator
       MessageIdGeneratorFactory.instance.returnGenerator(state.getServerID());
       
       return ret;

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -156,6 +156,7 @@
          // Create a new connection
          
          remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing);
+         remotingConnection.start();
          
          client = remotingConnection.getInvokingClient();
          
@@ -208,7 +209,7 @@
          {
             try
             {
-               remotingConnection.close();
+               remotingConnection.stop();
             }
             catch (Throwable ignore)
             {               

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -96,7 +96,7 @@
       return h.server;
    }
    
-   public synchronized void returnCallbackServer(String protocol)
+   public synchronized void stopCallbackServer(String protocol)
    {
       Holder h = (Holder)holders.get(protocol);
       
@@ -200,6 +200,7 @@
    
    protected void stopCallbackServer(Connector server)
    {
+      log.debug("Stopping and destroying callback server " + server.getLocator().getLocatorURI());
       server.stop();
       server.destroy();
    }

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -29,6 +29,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
 import org.jboss.remoting.transport.Connector;
 
 
@@ -57,56 +58,74 @@
    // Attributes ----------------------------------------------------
 
    protected Client client;
+   protected boolean clientPing;
    protected Connector callbackServer;
    protected InvokerLocator serverLocator;
    protected CallbackManager callbackManager;
 
+   private InvokerCallbackHandler dummyCallbackHandler;
+
    // Constructors --------------------------------------------------
 
    public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Throwable
    { 
       serverLocator = new InvokerLocator(serverLocatorURI);
-            
-      // Enable client pinging
-      // Server leasing is enabled separately on the server side
+      this.clientPing = clientPing;
+      dummyCallbackHandler = new DummyCallbackHandler();
+
+      log.debug(this + " created");
+   }
+
+   // Public --------------------------------------------------------
+
+   public void start() throws Throwable
+   {
+      // Enable client pinging. Server leasing is enabled separately on the server side
+
       Map config = new HashMap();
-
       config.put(Client.ENABLE_LEASE, String.valueOf(clientPing));
 
       client = new Client(serverLocator, config);
-      
+
       client.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
 
-      if (log.isTraceEnabled()) { log.trace("created client"); }
-            
+      if (log.isTraceEnabled()) { log.trace(this + " created client"); }
+
       // Get the callback server
-      
+
       callbackServer = CallbackServerFactory.instance.getCallbackServer(serverLocator);
-      
       callbackManager = (CallbackManager)callbackServer.getInvocationHandlers()[0];
-               
+
       client.connect();
-      
-      // We explictly set the Marshaller since otherwise remoting tries to resolve the marshaller
+
+      // We explicitly set the Marshaller since otherwise remoting tries to resolve the marshaller
       // every time which is very slow - see org.jboss.remoting.transport.socket.ProcessInvocation
       // This can make a massive difference on performance. We also do this in
       // ServerConnectionEndpoint.setCallbackClient.
 
       client.setMarshaller(new JMSWireFormat());
       client.setUnMarshaller(new JMSWireFormat());
-      
-      client.addListener(new DummyCallbackHandler(), callbackServer.getLocator());
 
-      log.debug(this + " created");
+      // We add a dummy callback handler only to trigger the addListener method on the
+      // JMSServerInvocationHandler to be called, which allows the server to get hold of a reference
+      // to the callback client so it can make callbacks
+
+      client.addListener(dummyCallbackHandler, callbackServer.getLocator());
+
+      log.debug(this + " started");
    }
 
-   // Public --------------------------------------------------------
-
-   public void close() throws Throwable
+   public void stop() throws Throwable
    {
       log.debug(this + " closing");
+
+      // explicitly remove the callback listener, to avoid race conditions on server
+      // (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
+
+      client.removeListener(dummyCallbackHandler);
+      dummyCallbackHandler = null;
       
-      CallbackServerFactory.instance.returnCallbackServer(serverLocator.getProtocol());
+      CallbackServerFactory.instance.stopCallbackServer(serverLocator.getProtocol());
       
       client.disconnect();
       
@@ -123,6 +142,11 @@
       return callbackManager;
    }
 
+   public String toString()
+   {
+      return "JMSRemotingConnection[" + serverLocator.getLocatorURI() + "]";
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -117,13 +117,14 @@
       
       if (jmsClientId != null)
       {
-         log.warn("A problem has been detected with the connection to remote client " + remotingSessionID
-                + " It is possible the client has exited without closing its connection(s) or there is a network "
-                + "problem. "
-                + "All connection resources corresponding to that client process will now be removed.");
+         log.warn("A problem has been detected with the connection to remote client " +
+                  remotingSessionID + ". It is possible the client has exited without closing " +
+                  "its connection(s) or there is a network problem. All connection resources " +
+                  "corresponding to that client process will now be removed.");
 
-         //Remoting only provides one pinger per invoker, not per connection therefore when the pinger dies
-         //we must close ALL the connections corresponding to that jms client id
+         // Remoting only provides one pinger per invoker, not per connection therefore when the
+         // pinger dies we must close ALL the connections corresponding to that jms client id
+         
          Map endpoints = (Map)jmsClients.get(jmsClientId);                  
          
          if (endpoints != null)

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -735,7 +735,7 @@
 
             try
             {
-               if (trace) { log.trace("handing " + list.size() + " message(s) over to the remoting layer"); }
+               if (trace) { log.trace(ServerConsumerEndpoint.this + "handing " + list.size() + " message(s) over to the remoting layer"); }
             
                ClientDelivery del = new ClientDelivery(list, id);
                
@@ -745,7 +745,7 @@
                
                MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
 
-               if (trace) { log.trace("handed messages over to the remoting layer"); }
+               if (trace) { log.trace(ServerConsumerEndpoint.this + "handed messages over to the remoting layer"); }
                 
                HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
 
@@ -760,8 +760,8 @@
             }
             catch(Throwable t)
             {
-               log.warn("Failed to deliver the message to the client. See the server log for more details");
-               log.debug("Failed to deliver the message to the client.", t);
+               log.warn("Failed to deliver the message to the client. See the server log for more details.");
+               log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
                
                ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
                

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -112,6 +112,7 @@
          if (callbackHandler != null)
          {
             log.debug("found calllback handler for remoting session " + Util.guidToString(s));
+            
             i.getMetaData().addMetaData(MetaDataConstants.JMS,
                                         MetaDataConstants.CALLBACK_HANDLER,
                                         callbackHandler, PayloadKey.TRANSIENT);

Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -1861,16 +1861,13 @@
    
    public void resetLoadedStatus(long channelID) throws Exception
    {
-      if (trace)
-      {
-         log.trace("resetting all channel data for channel " + channelID);
-      }
+      if (trace) { log.trace("resetting all channel data for channel " + channelID); }
       
       Connection conn = null;
       PreparedStatement ps = null;
       TransactionWrapper wrap = new TransactionWrapper();
       
-      log.trace("Resetting message data. This may take several minutes for large queues/subscriptions...");
+      log.debug("Resetting message data. This may take several minutes for large queues/subscriptions...");
       
       try
       {
@@ -1884,11 +1881,7 @@
          
          int rows = ps.executeUpdate();
          
-         if (trace)
-         {
-            log.trace(JDBCUtil.statementToString(updateReliableRefsNotLoaded)
-                  + " updated " + rows + " rows");
-         }
+         if (trace) { log.trace(JDBCUtil.statementToString(updateReliableRefsNotLoaded) + " updated " + rows + " rows"); }
                
          ps.close();
          

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -85,19 +85,19 @@
       assertFalse(server1 == server3);
       
       
-      CallbackServerFactory.instance.returnCallbackServer(locator1.getProtocol());
+      CallbackServerFactory.instance.stopCallbackServer(locator1.getProtocol());
       
       assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator1.getProtocol()));
       
-      CallbackServerFactory.instance.returnCallbackServer(locator2.getProtocol());
+      CallbackServerFactory.instance.stopCallbackServer(locator2.getProtocol());
       
       assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator2.getProtocol()));
       
-      CallbackServerFactory.instance.returnCallbackServer(locator1.getProtocol());
+      CallbackServerFactory.instance.stopCallbackServer(locator1.getProtocol());
       
       assertFalse(CallbackServerFactory.instance.containsCallbackServer(locator1.getProtocol()));
       
-      CallbackServerFactory.instance.returnCallbackServer(locator2.getProtocol());
+      CallbackServerFactory.instance.stopCallbackServer(locator2.getProtocol());
       
       assertFalse(CallbackServerFactory.instance.containsCallbackServer(locator2.getProtocol()));
       

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-08-26 04:28:18 UTC (rev 1234)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-08-30 08:33:58 UTC (rev 1235)
@@ -29,10 +29,12 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Message;
 import javax.naming.InitialContext;
 
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
+import EDU.oswego.cs.dl.util.concurrent.Slot;
 
 /**
  * The most comprehensive, yet simple, unit test.
@@ -47,7 +49,7 @@
    // Constants -----------------------------------------------------
 
    // Static --------------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------
 
    InitialContext ic;
@@ -66,9 +68,9 @@
       super.setUp();
 
       ServerManagement.start("all");
-           
+
       ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-      
+
       ServerManagement.deployQueue("JMSTestQueue");
 
       log.debug("setup done");
@@ -189,7 +191,60 @@
       conn.close();
    }
 
+   public void test_NonPersistent_NonTransactional_Asynchronous_to_Client() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
 
+      Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+      Connection conn = cf.createConnection();
+
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      final MessageConsumer cons = session.createConsumer(queue);
+
+      conn.start();
+
+      final Slot slot = new Slot();
+
+      new Thread(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               Message m = cons.receive(5000);
+               if (m != null)
+               {
+                  slot.put(m);
+               }
+            }
+            catch(Exception e)
+            {
+               log.error("receive failed", e);
+            }
+
+         }
+      }, "Receiving Thread").start();
+
+
+      Thread.sleep(500);
+
+      MessageProducer prod = session.createProducer(queue);
+      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+      TextMessage m = session.createTextMessage("message one");
+
+      prod.send(m);
+
+      TextMessage rm = (TextMessage)slot.poll(5000);
+
+      assertEquals("message one", rm.getText());
+
+      conn.close();
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list