[jboss-cvs] JBoss Messaging SVN: r3538 - in trunk: src/main/org/jboss/jms/client/container and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Jan 6 07:27:32 EST 2008
Author: timfox
Date: 2008-01-06 07:27:30 -0500 (Sun, 06 Jan 2008)
New Revision: 3538
Removed:
trunk/src/main/org/jboss/jms/server/ConnectorManager.java
trunk/src/main/org/jboss/jms/server/connectormanager/
trunk/tests/src/org/jboss/test/messaging/jms/server/connectormanager/
Modified:
trunk/.classpath
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
Log:
Removed connector manager, also added executor filter on client side too (was only added on server side previously), and removed unnecessary client side delivery executor
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/.classpath 2008-01-06 12:27:30 UTC (rev 3538)
@@ -70,7 +70,7 @@
<classpathentry kind="lib" path="thirdparty/jboss/jbosssx-client/lib/jbosssx-client.jar"/>
<classpathentry kind="lib" path="lib/je-3.2.44.jar"/>
<classpathentry kind="lib" path="tests/lib/easymock.jar"/>
- <classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar"/>
+ <classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar" sourcepath="/home/tim/workspace/mina-trunk/core/src/main/java"/>
<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-06 12:27:30 UTC (rev 3538)
@@ -213,8 +213,7 @@
private boolean shouldAck;
private boolean handleFlowControl;
private long redeliveryDelay;
- private volatile int currentToken;
-
+
// Constructors ---------------------------------------------------------------------------------
public ClientConsumer(boolean isCC, int ackMode,
@@ -261,14 +260,35 @@
*/
public void handleMessage(final JBossMessage message) throws Exception
{
- //TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
- // failover where a message is sent then the valve is locked, and the message send cause
- // a message delivery back to the same client which tries to ack but can't get through
- // the valve. This won't be necessary when we move to a non blocking transport
-
- sessionExecutor.execute(new HandleMessageRunnable(currentToken, message));
+ synchronized (mainLock)
+ {
+ if (closed)
+ {
+ // Sanity - this should never happen - we should always wait for all deliveries to arrive
+ // when closing
+ throw new IllegalStateException(this + " is closed, so ignoring message");
+ }
+
+ message.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+
+ message.doBeforeReceive();
+
+ //Add it to the buffer
+ buffer.addLast(message, message.getJMSPriority());
+
+ lastDeliveryId = message.getDeliveryId();
+
+ if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+ messageAdded();
+
+ if (handleFlowControl)
+ {
+ checkStop();
+ }
+ }
}
-
+
public void setMessageListener(MessageListener listener) throws JMSException
{
synchronized (mainLock)
@@ -555,8 +575,6 @@
*/
public void synchronizeWith(ClientConsumer newHandler)
{
- currentToken++;
-
consumerID = newHandler.consumerID;
// Clear the buffer. This way the non persistent messages that managed to arrive are
@@ -854,67 +872,6 @@
}
}
- private class HandleMessageRunnable implements Runnable
- {
- private int token;
-
- private JBossMessage message;
-
- HandleMessageRunnable(int token, JBossMessage message)
- {
- this.token = token;
-
- this.message = message;
- }
-
- public void run()
- {
- try
- {
- if (trace) { log.trace(this + " receiving message " + message + " from the remoting layer"); }
-
- synchronized (mainLock)
- {
- if (closed)
- {
- // Sanity - this should never happen - we should always wait for all deliveries to arrive
- // when closing
- throw new IllegalStateException(this + " is closed, so ignoring message");
- }
-
- if (token != currentToken)
- {
- //This message was queued up from before failover - we don't want to add it
- log.trace("Ignoring message " + message);
- return;
- }
-
- message.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
- message.doBeforeReceive();
-
- //Add it to the buffer
- buffer.addLast(message, message.getJMSPriority());
-
- lastDeliveryId = message.getDeliveryId();
-
- if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
-
- messageAdded();
-
- if (handleFlowControl)
- {
- checkStop();
- }
- }
- }
- catch (Exception e)
- {
- log.error("Failed to handle message", e);
- }
- }
- }
-
/*
* This class handles the execution of onMessage methods
*/
Deleted: trunk/src/main/org/jboss/jms/server/ConnectorManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectorManager.java 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/ConnectorManager.java 2008-01-06 12:27:30 UTC (rev 3538)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server;
-
-import org.jboss.messaging.core.contract.MessagingComponent;
-
-/**
- *
- * A ConnectorManager.
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public interface ConnectorManager extends MessagingComponent
-{
- int registerConnector(String connectorName) throws Exception;
-
- void unregisterConnector(String connectorName) throws Exception;
-}
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2008-01-06 12:27:30 UTC (rev 3538)
@@ -37,7 +37,6 @@
import org.jboss.jms.server.connectionfactory.ConnectionFactoryDeployer;
import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
-import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
import org.jboss.jms.server.destination.DestinationDeployer;
import org.jboss.jms.server.destination.ManagedQueue;
import org.jboss.jms.server.destination.ManagedTopic;
@@ -116,7 +115,6 @@
private ConnectionFactoryJNDIMapper connFactoryJNDIMapper;
private TransactionRepository txRepository;
private SimpleConnectionManager connectionManager;
- private ConnectorManager connectorManager;
private IDManager messageIDManager;
private IDManager channelIDManager;
private IDManager transactionIDManager;
@@ -198,7 +196,6 @@
destinationJNDIMapper = new DestinationJNDIMapper(this);
connFactoryJNDIMapper = new ConnectionFactoryJNDIMapper(this);
connectionManager = new SimpleConnectionManager();
- connectorManager = new SimpleConnectorManager();
memoryManager = new SimpleMemoryManager();
destinationDeployer = new DestinationDeployer(this);
connectionFactoryDeployer = new ConnectionFactoryDeployer(this, minaService);
@@ -241,7 +238,6 @@
destinationJNDIMapper.start();
connFactoryJNDIMapper.start();
connectionManager.start();
- connectorManager.start();
memoryManager.start();
securityStore.setSuckerPassword(configuration.getSuckerPassword());
securityStore.start();
@@ -321,8 +317,6 @@
connFactoryJNDIMapper = null;
connectionManager.stop();
connectionManager = null;
- connectorManager.start();
- connectorManager = null;
memoryManager.stop();
memoryManager = null;
securityStore.stop();
@@ -662,11 +656,6 @@
return connectionManager;
}
- public ConnectorManager getConnectorManager()
- {
- return connectorManager;
- }
-
public MessageStore getMessageStore()
{
return getPostOffice().getMessageStore();
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2008-01-06 12:27:30 UTC (rev 3538)
@@ -12,7 +12,6 @@
import org.jboss.jms.client.plugin.LoadBalancingFactory;
import org.jboss.jms.server.ConnectionFactoryManager;
import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.ConnectorManager;
import org.jboss.jms.server.ServerPeer;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.ServerLocator;
@@ -67,8 +66,6 @@
private ConnectionFactoryManager connectionFactoryManager;
- private ConnectorManager connectorManager;
-
private ConnectionManager connectionManager;
private MinaService minaService;
@@ -125,7 +122,6 @@
}
connectionFactoryManager = serverPeer.getConnectionFactoryManager();
- connectorManager = serverPeer.getConnectorManager();
connectionManager = serverPeer.getConnectionManager();
// We use the MBean service name to uniquely identify the connection factory
@@ -136,7 +132,6 @@
defaultTempQueueFullSize, defaultTempQueuePageSize,
defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
loadBalancingFactory, strictTck);
- connectorManager.registerConnector(getName());
log.info("Server locator is " + serverLocator);
log.info(this + " started");
@@ -155,7 +150,6 @@
connectionFactoryManager.
unregisterConnectionFactory(getName(), supportsFailover, supportsLoadBalancing);
- connectorManager.unregisterConnector(getName());
log.info(this + " undeployed");
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-06 12:27:30 UTC (rev 3538)
@@ -1430,48 +1430,10 @@
DeliverMessage m = new DeliverMessage(ref.getMessage(), consumer.getID(), deliveryID, ref.getDeliveryCount());
m.setVersion(getConnectionEndpoint().getUsingVersion());
consumer.deliver(m);
+
+ //TODO - what if we get an exception from MINA?
+ //Surely we need to do exception logic too???
-// ClientDelivery del = new ClientDelivery(ref.getMessage(), consumer.getID(), deliveryID, ref.getDeliveryCount());
-//
-// Client callbackClient = callbackHandler.getCallbackClient();
-//
-// Callback callback = new Callback(del);
-//
-// try
-// {
-// // FIXME - due a design (flaw??) in the socket based transports, they use a pool of TCP
-// // connections, so subsequent invocations can end up using different underlying
-// // connections meaning that later invocations can overtake earlier invocations, if there
-// // are more than one user concurrently invoking on the same transport. We need someway
-// // of pinning the client object to the underlying invocation. For now we just serialize
-// // all access so that only the first connection in the pool is ever used - but this is
-// // far from ideal!!!
-// // See http://jira.jboss.com/jira/browse/JBMESSAGING-789
-//
-// Object invoker = null;
-//
-// if (callbackClient != null)
-// {
-// invoker = callbackClient.getInvoker();
-// }
-// else
-// {
-// // TODO: dummy synchronization object, in case there's no clientInvoker. This will
-// // happen during the first invocation anyway. It's a kludge, I know, but this whole
-// // synchronization thing is a huge kludge. Needs to be reviewed.
-// invoker = new Object();
-// }
-//
-// synchronized (invoker)
-// {
-// // one way invocation, no acknowledgment sent back by the client
-// if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-//
-// callbackHandler.handleCallbackOneway(callback);
-//
-// //We store the delivery id so we know to wait for any deliveries in transit on close
-// consumer.setLastDeliveryID(deliveryID);
-// }
// }
// catch (Throwable t)
// {
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-01-06 12:27:30 UTC (rev 3538)
@@ -25,6 +25,7 @@
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoServiceListener;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.logging.Logger;
@@ -59,7 +60,9 @@
private Map<ConsolidatedRemotingConnectionListener, IoServiceListener> listeners = new HashMap<ConsolidatedRemotingConnectionListener, IoServiceListener>();
+ private ExecutorFilter executorFilter;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -85,6 +88,8 @@
blockingScheduler = addBlockingRequestResponseFilter(filterChain);
addLoggingFilter(filterChain);
+
+ executorFilter = FilterChainSupport.addExecutorFilter(filterChain);
connector.setHandler(new MinaHandler(PacketDispatcher.client));
connector.getSessionConfig().setKeepAlive(true);
@@ -124,6 +129,8 @@
connector = null;
blockingScheduler = null;
session = null;
+
+ this.executorFilter.destroy();
return closed;
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-01-04 16:23:05 UTC (rev 3537)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-01-06 12:27:30 UTC (rev 3538)
@@ -554,8 +554,6 @@
public void undeployConnectionFactory(String objectName) throws Exception
{
((ServerPeer) getJmsServer()).getConnectionFactoryManager().unregisterConnectionFactory(objectName, true, true);
- ((ServerPeer) getJmsServer()).getConnectorManager().unregisterConnector(objectName);
-
}
public void configureSecurityForDestination(String destName, boolean isQueue, HashSet<Role> roles) throws Exception
More information about the jboss-cvs-commits
mailing list