[jboss-cvs] JBoss Messaging SVN: r3538 - in trunk: src/main/org/jboss/jms/client/container and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Jan 6 07:27:32 EST 2008


Author: timfox
Date: 2008-01-06 07:27:30 -0500 (Sun, 06 Jan 2008)
New Revision: 3538

Removed:
   trunk/src/main/org/jboss/jms/server/ConnectorManager.java
   trunk/src/main/org/jboss/jms/server/connectormanager/
   trunk/tests/src/org/jboss/test/messaging/jms/server/connectormanager/
Modified:
   trunk/.classpath
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
Log:
Removed connector manager, also added executor filter on client side too (was only added on server side previously), and removed unnecessary client side delivery executor


Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/.classpath	2008-01-06 12:27:30 UTC (rev 3538)
@@ -70,7 +70,7 @@
 	<classpathentry kind="lib" path="thirdparty/jboss/jbosssx-client/lib/jbosssx-client.jar"/>
 	<classpathentry kind="lib" path="lib/je-3.2.44.jar"/>
 	<classpathentry kind="lib" path="tests/lib/easymock.jar"/>
-	<classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar"/>
+	<classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar" sourcepath="/home/tim/workspace/mina-trunk/core/src/main/java"/>
 	<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-06 12:27:30 UTC (rev 3538)
@@ -213,8 +213,7 @@
    private boolean shouldAck;
    private boolean handleFlowControl;
    private long redeliveryDelay;
-   private volatile int currentToken;
-   
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ClientConsumer(boolean isCC, int ackMode,                                
@@ -261,14 +260,35 @@
     */
    public void handleMessage(final JBossMessage message) throws Exception
    {
-      //TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
-      //       failover where a message is sent then the valve is locked, and the message send cause
-      //       a message delivery back to the same client which tries to ack but can't get through
-      //       the valve. This won't be necessary when we move to a non blocking transport
-   	
-      sessionExecutor.execute(new HandleMessageRunnable(currentToken, message));         
+      synchronized (mainLock)
+      {
+         if (closed)
+         {
+            // Sanity - this should never happen - we should always wait for all deliveries to arrive
+            // when closing
+            throw new IllegalStateException(this + " is closed, so ignoring message");
+         }
+
+         message.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+
+         message.doBeforeReceive();
+
+         //Add it to the buffer
+         buffer.addLast(message, message.getJMSPriority());
+
+         lastDeliveryId = message.getDeliveryId();
+
+         if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+         messageAdded();
+
+         if (handleFlowControl)
+         {
+            checkStop();
+         }
+      }
    }
-   
+
    public void setMessageListener(MessageListener listener) throws JMSException
    {     
       synchronized (mainLock)
@@ -555,8 +575,6 @@
     */
    public void synchronizeWith(ClientConsumer newHandler)
    {
-      currentToken++;
-   	
       consumerID = newHandler.consumerID;
 
       // Clear the buffer. This way the non persistent messages that managed to arrive are
@@ -854,67 +872,6 @@
       }
    }
    
-   private class HandleMessageRunnable implements Runnable
-   {
-   	private int token;
-   	
-   	private JBossMessage message;
-   	
-   	HandleMessageRunnable(int token, JBossMessage message)
-   	{
-   		this.token = token;
-   		
-   		this.message = message;
-   	}
-   	
-   	public void run()
-      {
-         try
-         {
-             if (trace) { log.trace(this + " receiving message " + message + " from the remoting layer"); }
-
-             synchronized (mainLock)
-             {
-                if (closed)
-                {
-                   // Sanity - this should never happen - we should always wait for all deliveries to arrive
-                   // when closing
-                   throw new IllegalStateException(this + " is closed, so ignoring message");
-                }
-                
-                if (token != currentToken)
-                {
-               	 //This message was queued up from before failover - we don't want to add it
-               	 log.trace("Ignoring message " + message);
-               	 return;
-                }
-                
-                message.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
-                message.doBeforeReceive();
-
-                //Add it to the buffer
-                buffer.addLast(message, message.getJMSPriority());
-
-                lastDeliveryId = message.getDeliveryId();
-                
-                if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
-
-                messageAdded();
-
-                if (handleFlowControl)
-                {
-                	checkStop();
-                }
-             }
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to handle message", e);
-         }
-      }
-   }
-   
    /*
     * This class handles the execution of onMessage methods
     */

Deleted: trunk/src/main/org/jboss/jms/server/ConnectorManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectorManager.java	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/ConnectorManager.java	2008-01-06 12:27:30 UTC (rev 3538)
@@ -1,40 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.server;
-
-import org.jboss.messaging.core.contract.MessagingComponent;
-
-/**
- * 
- * A ConnectorManager.
- * 
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public interface ConnectorManager extends MessagingComponent
-{
-   int registerConnector(String connectorName) throws Exception;
-   
-   void unregisterConnector(String connectorName) throws Exception;
-}

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2008-01-06 12:27:30 UTC (rev 3538)
@@ -37,7 +37,6 @@
 import org.jboss.jms.server.connectionfactory.ConnectionFactoryDeployer;
 import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
-import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
 import org.jboss.jms.server.destination.DestinationDeployer;
 import org.jboss.jms.server.destination.ManagedQueue;
 import org.jboss.jms.server.destination.ManagedTopic;
@@ -116,7 +115,6 @@
    private ConnectionFactoryJNDIMapper connFactoryJNDIMapper;
    private TransactionRepository txRepository;
    private SimpleConnectionManager connectionManager;
-   private ConnectorManager connectorManager;
    private IDManager messageIDManager;
    private IDManager channelIDManager;
    private IDManager transactionIDManager;
@@ -198,7 +196,6 @@
          destinationJNDIMapper = new DestinationJNDIMapper(this);
          connFactoryJNDIMapper = new ConnectionFactoryJNDIMapper(this);
          connectionManager = new SimpleConnectionManager();
-         connectorManager = new SimpleConnectorManager();
          memoryManager = new SimpleMemoryManager();
          destinationDeployer = new DestinationDeployer(this);
          connectionFactoryDeployer = new ConnectionFactoryDeployer(this, minaService);
@@ -241,7 +238,6 @@
          destinationJNDIMapper.start();
          connFactoryJNDIMapper.start();
          connectionManager.start();
-         connectorManager.start();
          memoryManager.start();
          securityStore.setSuckerPassword(configuration.getSuckerPassword());
          securityStore.start();
@@ -321,8 +317,6 @@
          connFactoryJNDIMapper = null;
          connectionManager.stop();
          connectionManager = null;
-         connectorManager.start();
-         connectorManager = null;
          memoryManager.stop();
          memoryManager = null;
          securityStore.stop();
@@ -662,11 +656,6 @@
       return connectionManager;
    }
 
-   public ConnectorManager getConnectorManager()
-   {
-      return connectorManager;
-   }
-
    public MessageStore getMessageStore()
    {
       return getPostOffice().getMessageStore();

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2008-01-06 12:27:30 UTC (rev 3538)
@@ -12,7 +12,6 @@
 import org.jboss.jms.client.plugin.LoadBalancingFactory;
 import org.jboss.jms.server.ConnectionFactoryManager;
 import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.ConnectorManager;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.ServerLocator;
@@ -67,8 +66,6 @@
    
    private ConnectionFactoryManager connectionFactoryManager;
    
-   private ConnectorManager connectorManager;
-   
    private ConnectionManager connectionManager;
       
    private MinaService minaService;
@@ -125,7 +122,6 @@
          }
          
          connectionFactoryManager = serverPeer.getConnectionFactoryManager();
-         connectorManager = serverPeer.getConnectorManager();
          connectionManager = serverPeer.getConnectionManager();
 
          // We use the MBean service name to uniquely identify the connection factory
@@ -136,7 +132,6 @@
                                       defaultTempQueueFullSize, defaultTempQueuePageSize,                                      
                                       defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
                                       loadBalancingFactory, strictTck);               
-         connectorManager.registerConnector(getName());
       
          log.info("Server locator is " + serverLocator);
          log.info(this + " started");
@@ -155,7 +150,6 @@
          
          connectionFactoryManager.
             unregisterConnectionFactory(getName(), supportsFailover, supportsLoadBalancing);
-         connectorManager.unregisterConnector(getName());
          
          log.info(this + " undeployed");
       }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-06 12:27:30 UTC (rev 3538)
@@ -1430,48 +1430,10 @@
    	DeliverMessage m = new DeliverMessage(ref.getMessage(), consumer.getID(), deliveryID, ref.getDeliveryCount());
    	m.setVersion(getConnectionEndpoint().getUsingVersion());
    	consumer.deliver(m);
+   	
+   	//TODO - what if we get an exception from MINA? 
+   	//Surely we need to do exception logic too???
 
-//      ClientDelivery del = new ClientDelivery(ref.getMessage(), consumer.getID(), deliveryID, ref.getDeliveryCount());
-//
-//      Client callbackClient = callbackHandler.getCallbackClient();
-//
-//      Callback callback = new Callback(del);
-//
-//      try
-//      {
-//         // FIXME - due a design (flaw??) in the socket based transports, they use a pool of TCP
-//         // connections, so subsequent invocations can end up using different underlying
-//         // connections meaning that later invocations can overtake earlier invocations, if there
-//         // are more than one user concurrently invoking on the same transport. We need someway
-//         // of pinning the client object to the underlying invocation. For now we just serialize
-//         // all access so that only the first connection in the pool is ever used - but this is
-//         // far from ideal!!!
-//         // See http://jira.jboss.com/jira/browse/JBMESSAGING-789
-//
-//         Object invoker = null;
-//
-//         if (callbackClient != null)
-//         {
-//            invoker = callbackClient.getInvoker();                              
-//         }
-//         else
-//         {
-//            // TODO: dummy synchronization object, in case there's no clientInvoker. This will
-//            // happen during the first invocation anyway. It's a kludge, I know, but this whole
-//            // synchronization thing is a huge kludge. Needs to be reviewed.
-//            invoker = new Object();
-//         }
-//         
-//         synchronized (invoker)
-//         {
-//            // one way invocation, no acknowledgment sent back by the client
-//            if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-//            
-//            callbackHandler.handleCallbackOneway(callback);
-//                                    
-//            //We store the delivery id so we know to wait for any deliveries in transit on close
-//            consumer.setLastDeliveryID(deliveryID);
-//         }
 //      }
 //      catch (Throwable t)
 //      {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-01-06 12:27:30 UTC (rev 3538)
@@ -25,6 +25,7 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoServiceListener;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.logging.Logger;
@@ -59,7 +60,9 @@
 
    private Map<ConsolidatedRemotingConnectionListener, IoServiceListener> listeners = new HashMap<ConsolidatedRemotingConnectionListener, IoServiceListener>();
 
+   private ExecutorFilter executorFilter;
 
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -85,6 +88,8 @@
       blockingScheduler = addBlockingRequestResponseFilter(filterChain);
 
       addLoggingFilter(filterChain);
+      
+      executorFilter = FilterChainSupport.addExecutorFilter(filterChain);
 
       connector.setHandler(new MinaHandler(PacketDispatcher.client));
       connector.getSessionConfig().setKeepAlive(true);
@@ -124,6 +129,8 @@
       connector = null;
       blockingScheduler = null;
       session = null;
+      
+      this.executorFilter.destroy();
 
       return closed;
    }

Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-01-06 12:27:30 UTC (rev 3538)
@@ -554,8 +554,6 @@
    public void undeployConnectionFactory(String objectName) throws Exception
    {
       ((ServerPeer) getJmsServer()).getConnectionFactoryManager().unregisterConnectionFactory(objectName, true, true);
-      ((ServerPeer) getJmsServer()).getConnectorManager().unregisterConnector(objectName);
-
    }
 
    public void configureSecurityForDestination(String destName, boolean isQueue, HashSet<Role> roles) throws Exception




More information about the jboss-cvs-commits mailing list