[jboss-cvs] JBoss Messaging SVN: r4417 - in trunk: src/config and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 10 07:49:22 EDT 2008
Author: timfox
Date: 2008-06-10 07:49:22 -0400 (Tue, 10 Jun 2008)
New Revision: 4417
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
Modified:
trunk/docs/userguide/en/modules/configuration.xml
trunk/src/config/jbm-beans.xml
trunk/src/config/jbm-configuration.xml
trunk/src/config/jbm-standalone-beans.xml
trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
Log:
Some changes to pooling and queue locking
Modified: trunk/docs/userguide/en/modules/configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/configuration.xml 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/docs/userguide/en/modules/configuration.xml 2008-06-10 11:49:22 UTC (rev 4417)
@@ -867,7 +867,7 @@
<programlisting>
<![CDATA[
<bean name="RemotingService"
- class="org.jboss.messaging.core.remoting.impl.mina.MinaService">
+ class="org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl">
<constructor>
<parameter>
<inject bean="Configuration"/>
Modified: trunk/src/config/jbm-beans.xml
===================================================================
--- trunk/src/config/jbm-beans.xml 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/config/jbm-beans.xml 2008-06-10 11:49:22 UTC (rev 4417)
@@ -51,7 +51,7 @@
</constructor>
</bean>
- <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.MinaService">
+ <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl">
<constructor>
<parameter>
<inject bean="Configuration"/>
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/config/jbm-configuration.xml 2008-06-10 11:49:22 UTC (rev 4417)
@@ -3,8 +3,12 @@
<clustered>false</clustered>
- <scheduled-executor-max-pool-size>30</scheduled-executor-max-pool-size>
+ <!-- Maximum number of threads to use for scheduled deliveries -->
+ <scheduled-max-pool-size>30</scheduled-max-pool-size>
+ <!-- Maximum number of threads to use for sessions on the server side -->
+ <max-pool-size>30</max-pool-size>
+
<require-destinations>true</require-destinations>
<!-- Remoting configuration -->
Modified: trunk/src/config/jbm-standalone-beans.xml
===================================================================
--- trunk/src/config/jbm-standalone-beans.xml 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/config/jbm-standalone-beans.xml 2008-06-10 11:49:22 UTC (rev 4417)
@@ -64,7 +64,7 @@
</constructor>
</bean>
- <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.MinaService">
+ <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl">
<constructor>
<parameter>
<inject bean="Configuration"/>
Modified: trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -42,19 +42,7 @@
boolean isTcpNoDelay();
- void setTcpNoDelay(boolean tcpNoDelay);
-
- long getWriteQueueMaxBytes();
-
- void setWriteQueueMaxBytes(long maxBytes);
-
- long getWriteQueueMinBytes();
-
- void setWriteQueueMinBytes(long minBytes);
-
- long getWriteQueueBlockTimeout();
-
- void setWriteQueueBlockTimeout(long timeout);
+ void setTcpNoDelay(boolean tcpNoDelay);
int getTcpReceiveBufferSize();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -80,9 +80,7 @@
private boolean defaultBlockOnPersistentSend;
private boolean defaultBlockOnNonPersistentSend;
-
-
-
+
// Static ---------------------------------------------------------------------------------------
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
@@ -98,7 +96,7 @@
public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
-
+
// Constructors ---------------------------------------------------------------------------------
/**
@@ -153,7 +151,7 @@
defaultProducerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
defaultBlockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
defaultBlockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
- defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+ defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
this.location = location;
this.connectionParams = connectionParams;
this.remotingConnectionFactory = new RemotingConnectionFactoryImpl();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -44,9 +44,6 @@
protected String keyStorePassword;
protected String trustStorePath;
protected String trustStorePassword;
- protected long writeQueueBlockTimeout = 10000;
- protected long writeQueueMinBytes = 32 * 1024L;
- protected long writeQueueMaxBytes = 64 * 1024L;
public long getTimeout()
{
@@ -128,36 +125,6 @@
this.tcpSendBufferSize = tcpSendBufferSize;
}
- public long getWriteQueueBlockTimeout()
- {
- return writeQueueBlockTimeout;
- }
-
- public long getWriteQueueMaxBytes()
- {
- return writeQueueMaxBytes;
- }
-
- public long getWriteQueueMinBytes()
- {
- return writeQueueMinBytes;
- }
-
- public void setWriteQueueBlockTimeout(final long timeout)
- {
- this.writeQueueBlockTimeout = timeout;
- }
-
- public void setWriteQueueMaxBytes(final long bytes)
- {
- this.writeQueueMaxBytes = bytes;
- }
-
- public void setWriteQueueMinBytes(final long bytes)
- {
- this.writeQueueMinBytes = bytes;
- }
-
public boolean isSSLEnabled()
{
String sslEnabledProperty = System.getProperty(REMOTING_ENABLE_SSL);
@@ -258,9 +225,6 @@
cp.getTcpReceiveBufferSize() == this.getTcpReceiveBufferSize() &&
cp.getTcpSendBufferSize() == this.getTcpSendBufferSize() &&
cp.isSSLEnabled() == this.isSSLEnabled() &&
- cp.isSSLEnabledModified() == this.isSSLEnabledModified() &&
- cp.getWriteQueueBlockTimeout() == this.getWriteQueueBlockTimeout() &&
- cp.getWriteQueueMinBytes() == this.getWriteQueueMinBytes() &&
- cp.getWriteQueueMaxBytes() == this.getWriteQueueMaxBytes();
+ cp.isSSLEnabledModified() == this.isSSLEnabledModified();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -42,6 +42,8 @@
Boolean isClustered();
Integer getScheduledThreadPoolMaxSize();
+
+ Integer getThreadPoolMaxSize();
long getSecurityInvalidationInterval();
@@ -71,12 +73,6 @@
long getTimeout();
- long getWriteQueueMaxBytes();
-
- long getWriteQueueMinBytes();
-
- long getWriteQueueBlockTimeout();
-
boolean isSecurityEnabled();
String getKeyStorePath();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -57,6 +57,8 @@
protected int scheduledThreadPoolMaxSize = 30;
+ protected int threadPoolMaxSize = 30;
+
protected long securityInvalidationInterval = 10000;
protected boolean requireDestinations;
@@ -102,10 +104,7 @@
protected boolean invmDisabled = DEFAULT_INVM_DISABLED;
protected boolean invmDisabledModified = false;
protected boolean tcpNoDelay;
- protected long writeQueueBlockTimeout = 5000;
- protected long writeQueueMinBytes = 65536;
- protected long writeQueueMaxBytes = 1048576;
-
+
protected int tcpReceiveBufferSize = -1;
protected int tcpSendBufferSize = -1;
protected boolean sslEnabled = DEFAULT_SSL_ENABLED;
@@ -129,6 +128,11 @@
{
return scheduledThreadPoolMaxSize;
}
+
+ public Integer getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
public long getSecurityInvalidationInterval()
{
@@ -320,37 +324,7 @@
{
this.tcpSendBufferSize = size;
}
-
- public long getWriteQueueBlockTimeout()
- {
- return writeQueueBlockTimeout;
- }
-
- public long getWriteQueueMaxBytes()
- {
- return writeQueueMaxBytes;
- }
-
- public long getWriteQueueMinBytes()
- {
- return writeQueueMinBytes;
- }
-
- public void setWriteQueueBlockTimeout(final long timeout)
- {
- this.writeQueueBlockTimeout = timeout;
- }
-
- public void setWriteQueueMaxBytes(final long bytes)
- {
- this.writeQueueMaxBytes = bytes;
- }
-
- public void setWriteQueueMinBytes(final long bytes)
- {
- this.writeQueueMinBytes = bytes;
- }
-
+
public String getURI()
{
StringBuffer buff = new StringBuffer();
@@ -461,9 +435,6 @@
connectionParams.setTcpReceiveBufferSize(tcpReceiveBufferSize);
connectionParams.setTcpSendBufferSize(tcpSendBufferSize);
connectionParams.setTimeout(timeout);
- connectionParams.setWriteQueueBlockTimeout(writeQueueBlockTimeout);
- connectionParams.setWriteQueueMinBytes(writeQueueMinBytes);
- connectionParams.setWriteQueueMaxBytes(writeQueueMaxBytes);
return connectionParams;
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -58,7 +58,9 @@
clustered = getBoolean(e, "clustered", clustered);
- scheduledThreadPoolMaxSize = getInteger(e, "scheduled-executor-max-pool-size", scheduledThreadPoolMaxSize);
+ scheduledThreadPoolMaxSize = getInteger(e, "scheduled-max-pool-size", scheduledThreadPoolMaxSize);
+
+ threadPoolMaxSize = getInteger(e, "max-pool-size", threadPoolMaxSize);
transport = TransportType.valueOf(getString(e, "remoting-transport", TCP.name()));
@@ -83,12 +85,6 @@
keepAliveTimeout = getInteger(e, "remoting-keep-alive-timeout", ConnectionParams.DEFAULT_KEEP_ALIVE_TIMEOUT);
- writeQueueBlockTimeout = getLong(e, "remoting-writequeue-block-timeout", 10000L);
-
- writeQueueMinBytes = getLong(e, "remoting-writequeue-minbytes", 32 * 1024L);
-
- writeQueueMaxBytes = getLong(e, "remoting-writequeue-maxbytes", 64 * 1024L);
-
sslEnabled = getBoolean(e, "remoting-enable-ssl", false);
keyStorePath = getString(e, "remoting-ssl-keystore-path", null);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -8,7 +8,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.client.ConnectionParams;
@@ -16,7 +16,7 @@
/**
* The ConnectorRegistry keeps track of Configurations and NIOConnectors.
*
- * When a {@link MinaService} is started, it register its {@link Configuration}.
+ * When a {@link RemotingServiceImpl} is started, it register its {@link Configuration}.
*
* When a client is created, it gets its {@link NIOConnector} from the
* ConnectorRegistry using the {@link Configuration} corresponding to the server
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -13,9 +13,9 @@
*
* This is class is a simple way to intercepting server calls on JBoss Messaging.
*
- * To Add this interceptor, you have to modify jbm-configuration.xml, or call MinaService.addInterceptor manually.
+ * To Add this interceptor, you have to modify jbm-configuration.xml, or call RemotingServiceImpl.addInterceptor manually.
*
- * If you deploy any Interceptor as a POJO on the Microcontainer, MinaService.addInterceptor is called automagically.
+ * If you deploy any Interceptor as a POJO on the Microcontainer, RemotingServiceImpl.addInterceptor is called automagically.
*
* @author clebert.suconic at jboss.com
*/
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -149,10 +149,7 @@
threadPool = Executors.newCachedThreadPool();
//We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
//since they are put on the queue in order
- handler = new MinaHandler(dispatcher, threadPool, this, false, false,
- connectionParams.getWriteQueueBlockTimeout(),
- connectionParams.getWriteQueueMinBytes(),
- connectionParams.getWriteQueueMaxBytes());
+ handler = new MinaHandler(dispatcher, threadPool, this, false, false);
connector.setHandler(handler);
InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
ConnectFuture future = connector.connect(address);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -36,8 +36,6 @@
private static final Logger log = Logger.getLogger(MinaHandler.class);
- private static final AttributeKey BLOCKED = new AttributeKey(MinaHandler.class, "blocked");
-
private static boolean trace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -53,16 +51,6 @@
// Note! must use ConcurrentMap here to avoid race condition
private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
- private final long blockTimeout;
-
- //TODO - this is screwed - I want this to be zero, but unfortunately in messageSent, the current
- //messages bytes haven't been subtracted so this won't work!!
- private final long bytesLow;
-
- private final long bytesHigh;
-
- private boolean blocked;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -70,18 +58,11 @@
final ExecutorService executorService,
final CleanUpNotifier failureNotifier,
final boolean closeSessionOnExceptionCaught,
- final boolean useExecutor,
- final long blockTimeout,
- final long bytesLow,
- final long bytesHigh)
+ final boolean useExecutor)
{
assert dispatcher != null;
assert executorService != null;
- this.blockTimeout = blockTimeout;
- this.bytesLow = bytesLow;
- this.bytesHigh = bytesHigh;
-
this.dispatcher = dispatcher;
this.failureNotifier = failureNotifier;
this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
@@ -134,7 +115,7 @@
@Override
public void messageReceived(final IoSession session, final Object message)
- throws Exception
+ throws Exception
{
final Packet packet = (Packet) message;
@@ -177,58 +158,6 @@
}
}
-// @Override
-// public synchronized void messageSent(final IoSession session, final Object message) throws Exception
-// {
-// if (blocked)
-// {
-// long bytes = session.getScheduledWriteBytes();
-//
-// if (bytes <= bytesLow)
-// {
-// blocked = false;
-//
-// //Note that we need to notify all since there may be more than one thread waiting on this
-// //E.g. the response from a blocking acknowledge and a delivery
-// notifyAll();
-// }
-// }
-// }
-//
-// public synchronized void checkWrite(final IoSession session) throws Exception
-// {
-// while (session.getScheduledWriteBytes() >= bytesHigh)
-// {
-// blocked = true;
-//
-// long start = System.currentTimeMillis();
-//
-// long toWait = blockTimeout;
-//
-// do
-// {
-// wait(toWait);
-//
-// if (session.getScheduledWriteBytes() < bytesHigh)
-// {
-// break;
-// }
-//
-// long now = System.currentTimeMillis();
-//
-// toWait -= now - start;
-//
-// start = now;
-// }
-// while (toWait > 0);
-//
-// if (toWait <= 0)
-// {
-// throw new IllegalStateException("Timed out waiting for MINA queue to free");
-// }
-// }
-// }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -246,15 +175,6 @@
{
public void send(Packet p) throws Exception
{
-// try
-// {
-// checkWrite(session);
-// }
-// catch (Exception e)
-// {
-// log.error("Failed to acquire sem", e);
-// }
-
dispatcher.callFilters(p);
session.write(p);
@@ -276,7 +196,8 @@
returner = null;
}
- if (trace) {
+ if (trace)
+ {
log.trace("received packet " + packet);
}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -1,312 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import org.apache.mina.common.*;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.jboss.beans.metadata.api.annotations.Install;
-import org.jboss.beans.metadata.api.annotations.Uninstall;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.ping.Pinger;
-import org.jboss.messaging.core.ping.impl.PingerImpl;
-import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingService;
-import static org.jboss.messaging.core.remoting.TransportType.INVM;
-import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @version <tt>$Revision$</tt>
- */
-public class MinaService implements RemotingService, CleanUpNotifier
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(MinaService.class);
-
- // Attributes ----------------------------------------------------
-
- private boolean started = false;
-
- private Configuration config;
-
- private NioSocketAcceptor acceptor;
-
- private IoServiceListener acceptorListener;
-
- private final PacketDispatcher dispatcher;
-
- private ExecutorService threadPool;
-
- private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
-
- private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
-
- private ServerKeepAliveFactory factory;
-
- private ScheduledExecutorService scheduledExecutor;
- private Map<IoSession, ScheduledFuture> currentScheduledPingers;
- private Map<IoSession, Pinger> currentPingers;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public MinaService(Configuration config)
- {
- this(config, new ServerKeepAliveFactory());
- }
-
- public MinaService(Configuration config, ServerKeepAliveFactory factory)
- {
- assert config != null;
- assert factory != null;
-
- validate(config);
-
- this.config = config;
- this.factory = factory;
- dispatcher = new PacketDispatcherImpl(filters);
-
- scheduledExecutor = new ScheduledThreadPoolExecutor(config.getScheduledThreadPoolMaxSize());
- currentScheduledPingers = new ConcurrentHashMap<IoSession, ScheduledFuture>();
- currentPingers = new ConcurrentHashMap<IoSession, Pinger>();
- }
-
- @Install
- public void addInterceptor(Interceptor filter)
- {
- filters.add(filter);
- }
-
- @Uninstall
- public void removeInterceptor(Interceptor filter)
- {
- filters.remove(filter);
- }
-
- public void addRemotingSessionListener(RemotingSessionListener listener)
- {
- assert listener != null;
-
- listeners.add(listener);
- }
-
- public void removeRemotingSessionListener(RemotingSessionListener listener)
- {
- assert listener != null;
-
- listeners.remove(listener);
- }
-
- // TransportService implementation -------------------------------
-
- public void start() throws Exception
- {
- if (log.isDebugEnabled())
- {
- log.debug("Start MinaService with configuration:" + config);
- }
-
- // if INVM transport is set, we bypass MINA setup
- if (config.getTransport() != INVM
- && acceptor == null)
- {
- acceptor = new NioSocketAcceptor();
-
- acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
-
- DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
-
- // addMDCFilter(filterChain);
- if (config.isSSLEnabled())
- {
- addSSLFilter(filterChain, false, config.getKeyStorePath(),
- config.getKeyStorePassword(), config
- .getTrustStorePath(), config
- .getTrustStorePassword());
- }
- addCodecFilter(filterChain);
-
- // Bind
- acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
- acceptor.getSessionConfig().setTcpNoDelay(config.isTcpNoDelay());
- int receiveBufferSize = config.getTcpReceiveBufferSize();
- if (receiveBufferSize != -1)
- {
- acceptor.getSessionConfig().setReceiveBufferSize(receiveBufferSize);
- }
- int sendBufferSize = config.getTcpSendBufferSize();
- if (sendBufferSize != -1)
- {
- acceptor.getSessionConfig().setSendBufferSize(sendBufferSize);
- }
- acceptor.setReuseAddress(true);
- acceptor.getSessionConfig().setReuseAddress(true);
- acceptor.getSessionConfig().setKeepAlive(true);
- acceptor.setCloseOnDeactivation(false);
-
- threadPool = Executors.newCachedThreadPool();
- acceptor.setHandler(new MinaHandler(dispatcher, threadPool,
- this, true, true,
- config.getWriteQueueBlockTimeout(),
- config.getWriteQueueMinBytes(),
- config.getWriteQueueMaxBytes()));
- acceptor.bind();
- acceptorListener = new MinaSessionListener();
- acceptor.addListener(acceptorListener);
- }
-
- // TODO reenable invm transport
-// boolean disableInvm = config.isInvmDisabled();
-// if (log.isDebugEnabled())
-// log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
- // if (!disableInvm)
-
- log.info("Registering:" + config.getLocation());
- REGISTRY.register(config.getLocation(), dispatcher);
-
- started = true;
- }
-
- public void stop()
- {
- if (acceptor != null)
- {
- // remove the listener before disposing the acceptor
- // so that we're not notified when the sessions are destroyed
- acceptor.removeListener(acceptorListener);
- acceptor.unbind();
- acceptor.dispose();
- acceptor = null;
- threadPool.shutdown();
- }
-
- REGISTRY.unregister(config.getLocation());
-
- started = false;
- }
-
- public PacketDispatcher getDispatcher()
- {
- return dispatcher;
- }
-
- public Configuration getConfiguration()
- {
- return config;
- }
-
- public ServerKeepAliveFactory getKeepAliveFactory()
- {
- return factory;
- }
-
- /**
- * This method must only be called by tests which requires
- * to insert Filters (e.g. to simulate network failures)
- */
- public DefaultIoFilterChainBuilder getFilterChain()
- {
- assert started == true;
- assert acceptor != null;
-
- return acceptor.getFilterChain();
- }
-
- // FailureNotifier implementation -------------------------------
-
- public void fireCleanup(long sessionID, MessagingException me)
- {
- if (factory.getSessions().contains(sessionID))
- {
- for (RemotingSessionListener listener : listeners)
- {
- listener.sessionDestroyed(sessionID, me);
- }
- factory.getSessions().remove(sessionID);
- }
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- private final class MinaSessionListener implements IoServiceListener
- {
-
- public void serviceActivated(IoService service)
- {
- }
-
- public void serviceDeactivated(IoService service)
- {
- }
-
- public void serviceIdle(IoService service, IdleStatus idleStatus)
- {
- }
-
- /**
- * register a pinger for the new client
- *
- * @param session
- */
- public void sessionCreated(IoSession session)
- {
- //register pinger
- if (config.getKeepAliveInterval() > 0)
- {
- Pinger pinger = new PingerImpl(getDispatcher(), new MinaSession(session, null), config.getKeepAliveTimeout(), MinaService.this);
- ScheduledFuture future = scheduledExecutor.scheduleAtFixedRate(pinger, config.getKeepAliveInterval(), config.getKeepAliveInterval(), TimeUnit.MILLISECONDS);
- currentScheduledPingers.put(session, future);
- currentPingers.put(session, pinger);
- factory.getSessions().add(session.getId());
- }
- }
-
- /**
- * destry th epinger and stop
- *
- * @param session
- */
- public void sessionDestroyed(IoSession session)
- {
- ScheduledFuture future = currentScheduledPingers.remove(session);
- if (future != null)
- {
- future.cancel(true);
- }
- Pinger pinger = currentPingers.remove(session);
- if (pinger != null)
- {
- pinger.close();
- }
- fireCleanup(session.getId(), null);
- }
- }
-}
\ No newline at end of file
Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java (from rev 4415, trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -0,0 +1,320 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
+import static org.jboss.messaging.core.remoting.TransportType.INVM;
+import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.beans.metadata.api.annotations.Install;
+import org.jboss.beans.metadata.api.annotations.Uninstall;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.ping.Pinger;
+import org.jboss.messaging.core.ping.impl.PingerImpl;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class RemotingServiceImpl implements RemotingService, CleanUpNotifier
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private boolean started = false;
+
+ private Configuration config;
+
+ private NioSocketAcceptor acceptor;
+
+ private IoServiceListener acceptorListener;
+
+ private final PacketDispatcher dispatcher;
+
+ private ExecutorService threadPool;
+
+ private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+
+ private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
+
+ private ServerKeepAliveFactory factory;
+
+ private ScheduledExecutorService scheduledExecutor;
+ private Map<IoSession, ScheduledFuture<?>> currentScheduledPingers;
+ private Map<IoSession, Pinger> currentPingers;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public RemotingServiceImpl(Configuration config)
+ {
+ this(config, new ServerKeepAliveFactory());
+ }
+
+ public RemotingServiceImpl(Configuration config, ServerKeepAliveFactory factory)
+ {
+ assert config != null;
+ assert factory != null;
+
+ validate(config);
+
+ this.config = config;
+ this.factory = factory;
+ dispatcher = new PacketDispatcherImpl(filters);
+
+ scheduledExecutor = new ScheduledThreadPoolExecutor(config.getScheduledThreadPoolMaxSize());
+ currentScheduledPingers = new ConcurrentHashMap<IoSession, ScheduledFuture<?>>();
+ currentPingers = new ConcurrentHashMap<IoSession, Pinger>();
+ }
+
+ @Install
+ public void addInterceptor(Interceptor filter)
+ {
+ filters.add(filter);
+ }
+
+ @Uninstall
+ public void removeInterceptor(Interceptor filter)
+ {
+ filters.remove(filter);
+ }
+
+ public void addRemotingSessionListener(RemotingSessionListener listener)
+ {
+ assert listener != null;
+
+ listeners.add(listener);
+ }
+
+ public void removeRemotingSessionListener(RemotingSessionListener listener)
+ {
+ assert listener != null;
+
+ listeners.remove(listener);
+ }
+
+ // TransportService implementation -------------------------------
+
+ public void start() throws Exception
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Start RemotingServiceImpl with configuration:" + config);
+ }
+
+ // if INVM transport is set, we bypass MINA setup
+ if (config.getTransport() != INVM
+ && acceptor == null)
+ {
+ acceptor = new NioSocketAcceptor();
+
+ acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
+
+ DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
+
+ // addMDCFilter(filterChain);
+ if (config.isSSLEnabled())
+ {
+ addSSLFilter(filterChain, false, config.getKeyStorePath(),
+ config.getKeyStorePassword(), config
+ .getTrustStorePath(), config
+ .getTrustStorePassword());
+ }
+ addCodecFilter(filterChain);
+
+ // Bind
+ acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
+ acceptor.getSessionConfig().setTcpNoDelay(config.isTcpNoDelay());
+ int receiveBufferSize = config.getTcpReceiveBufferSize();
+ if (receiveBufferSize != -1)
+ {
+ acceptor.getSessionConfig().setReceiveBufferSize(receiveBufferSize);
+ }
+ int sendBufferSize = config.getTcpSendBufferSize();
+ if (sendBufferSize != -1)
+ {
+ acceptor.getSessionConfig().setSendBufferSize(sendBufferSize);
+ }
+ acceptor.setReuseAddress(true);
+ acceptor.getSessionConfig().setReuseAddress(true);
+ acceptor.getSessionConfig().setKeepAlive(true);
+ acceptor.setCloseOnDeactivation(false);
+
+ threadPool = Executors.newCachedThreadPool();
+ acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true, true));
+ acceptor.bind();
+ acceptorListener = new MinaSessionListener();
+ acceptor.addListener(acceptorListener);
+ }
+
+ // TODO reenable invm transport
+// boolean disableInvm = config.isInvmDisabled();
+// if (log.isDebugEnabled())
+// log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
+ // if (!disableInvm)
+
+ log.info("Registering:" + config.getLocation());
+ REGISTRY.register(config.getLocation(), dispatcher);
+
+ started = true;
+ }
+
+ public void stop()
+ {
+ if (acceptor != null)
+ {
+ // remove the listener before disposing the acceptor
+ // so that we're not notified when the sessions are destroyed
+ acceptor.removeListener(acceptorListener);
+ acceptor.unbind();
+ acceptor.dispose();
+ acceptor = null;
+ threadPool.shutdown();
+ }
+
+ REGISTRY.unregister(config.getLocation());
+
+ started = false;
+ }
+
+ public PacketDispatcher getDispatcher()
+ {
+ return dispatcher;
+ }
+
+ public Configuration getConfiguration()
+ {
+ return config;
+ }
+
+ public ServerKeepAliveFactory getKeepAliveFactory()
+ {
+ return factory;
+ }
+
+ /**
+ * This method must only be called by tests which requires
+ * to insert Filters (e.g. to simulate network failures)
+ */
+ public DefaultIoFilterChainBuilder getFilterChain()
+ {
+ assert started == true;
+ assert acceptor != null;
+
+ return acceptor.getFilterChain();
+ }
+
+ // FailureNotifier implementation -------------------------------
+
+ public void fireCleanup(long sessionID, MessagingException me)
+ {
+ if (factory.getSessions().contains(sessionID))
+ {
+ for (RemotingSessionListener listener : listeners)
+ {
+ listener.sessionDestroyed(sessionID, me);
+ }
+ factory.getSessions().remove(sessionID);
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private final class MinaSessionListener implements IoServiceListener
+ {
+
+ public void serviceActivated(IoService service)
+ {
+ }
+
+ public void serviceDeactivated(IoService service)
+ {
+ }
+
+ public void serviceIdle(IoService service, IdleStatus idleStatus)
+ {
+ }
+
+ /**
+ * register a pinger for the new client
+ *
+ * @param session
+ */
+ public void sessionCreated(IoSession session)
+ {
+ //register pinger
+ if (config.getKeepAliveInterval() > 0)
+ {
+ Pinger pinger = new PingerImpl(getDispatcher(), new MinaSession(session, null), config.getKeepAliveTimeout(), RemotingServiceImpl.this);
+ ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate(pinger, config.getKeepAliveInterval(), config.getKeepAliveInterval(), TimeUnit.MILLISECONDS);
+ currentScheduledPingers.put(session, future);
+ currentPingers.put(session, pinger);
+ factory.getSessions().add(session.getId());
+ }
+ }
+
+ /**
+ * destry th epinger and stop
+ *
+ * @param session
+ */
+ public void sessionDestroyed(IoSession session)
+ {
+ ScheduledFuture<?> future = currentScheduledPingers.remove(session);
+ if (future != null)
+ {
+ future.cancel(true);
+ }
+ Pinger pinger = currentPingers.remove(session);
+ if (pinger != null)
+ {
+ pinger.close();
+ }
+ fireCleanup(session.getId(), null);
+ }
+ }
+}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -27,15 +27,16 @@
import org.jboss.messaging.core.deployers.DeploymentManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.RemotingService;
-import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.util.OrderedExecutorFactory;
/**
* This interface defines the internal interface of the Messaging Server exposed
@@ -96,4 +97,6 @@
PacketReturner sender) throws Exception;
DeploymentManager getDeploymentManager();
+
+ OrderedExecutorFactory getOrderedExecutorFactory();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -22,8 +22,11 @@
package org.jboss.messaging.core.server.impl;
import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -45,7 +48,7 @@
import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
@@ -62,8 +65,10 @@
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.VersionLoader;
+
/**
* A Messaging Server
*
@@ -100,6 +105,8 @@
private Deployer queueSettingsDeployer;
private JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
private DeploymentManager deploymentManager = new FileDeploymentManager();
+ private OrderedExecutorFactory orderedExecutorFactory;
+ private ExecutorService threadPool;
// plugins
@@ -115,7 +122,6 @@
private ResourceManager resourceManager = new ResourceManagerImpl(0);
private ScheduledExecutorService scheduledExecutor;
private MessagingServerPacketHandler serverPacketHandler;
- private CleanUpNotifier cleanUpNotifier = null;
// Constructors ---------------------------------------------------------------------------------
/**
@@ -140,11 +146,12 @@
this();
this.configuration = configuration;
createTransport = true;
- remotingService = new MinaService(configuration);
- cleanUpNotifier = (CleanUpNotifier) remotingService;
+ remotingService = new RemotingServiceImpl(configuration);
}
// lifecycle methods ----------------------------------------------------------------
+
+
public synchronized void start() throws Exception
{
log.debug("starting MessagingServer");
@@ -164,12 +171,14 @@
securityStore.setSecurityManager(securityManager);
securityDeployer = new SecurityDeployer(securityRepository);
queueSettingsRepository.setDefault(new QueueSettings());
- scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize());
+ scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));
queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
connectionManager = new ConnectionManagerImpl();
memoryManager = new SimpleMemoryManager();
postOffice = new PostOfficeImpl(storageManager, queueFactory, configuration.isRequireDestinations());
- queueSettingsDeployer = new QueueSettingsDeployer(queueSettingsRepository);
+ queueSettingsDeployer = new QueueSettingsDeployer(queueSettingsRepository);
+ threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), new JBMThreadFactory("JBM-session-threads"));
+ orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
if (createTransport)
{
@@ -226,6 +235,9 @@
postOffice = null;
scheduledExecutor.shutdown();
scheduledExecutor = null;
+ threadPool.shutdown();
+ threadPool = null;
+ orderedExecutorFactory = null;
if (createTransport)
{
remotingService.stop();
@@ -285,11 +297,6 @@
this.storageManager = storageManager;
}
- public void setCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
- {
- this.cleanUpNotifier = cleanUpNotifier;
- }
-
public PostOffice getPostOffice()
{
return postOffice;
@@ -300,7 +307,6 @@
this.postOffice = postOffice;
}
-
public HierarchicalRepository<HashSet<Role>> getSecurityRepository()
{
return securityRepository;
@@ -353,12 +359,17 @@
sender.getSessionID(), clientAddress,
remotingService.getDispatcher(), resourceManager, storageManager,
queueSettingsRepository,
- postOffice, securityStore, connectionManager);
+ postOffice, securityStore, connectionManager, orderedExecutorFactory);
remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
return new CreateConnectionResponse(connection.getID(), version);
}
+
+ public OrderedExecutorFactory getOrderedExecutorFactory()
+ {
+ return orderedExecutorFactory;
+ }
// Public ---------------------------------------------------------------------------------------
@@ -367,5 +378,22 @@
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ private static class JBMThreadFactory implements ThreadFactory
+ {
+ private ThreadGroup group;
+
+ JBMThreadFactory(final String groupName)
+ {
+ this.group = new ThreadGroup(groupName);
+ }
+
+ public Thread newThread(Runnable command)
+ {
+ return new Thread(group, command);
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -98,6 +98,8 @@
private int pos;
+ private boolean locked;
+
private AtomicInteger sizeBytes = new AtomicInteger(0);
private AtomicInteger messagesAdded = new AtomicInteger(0);
@@ -159,52 +161,56 @@
return name;
}
- public HandleStatus addLast(final MessageReference ref)
+ public synchronized HandleStatus addLast(final MessageReference ref)
{
- lock.lock();
+ if (locked)
+ {
+ lock.lock();
+ }
try
{
return add(ref, false);
}
finally
{
- lock.unlock();
+ if (locked)
+ {
+ lock.unlock();
+ }
}
}
- public HandleStatus addFirst(final MessageReference ref)
+ public synchronized HandleStatus addFirst(final MessageReference ref)
{
- lock.lock();
+ if (locked)
+ {
+ lock.lock();
+ }
try
{
return add(ref, true);
}
finally
{
- lock.unlock();
+ if (locked)
+ {
+ lock.unlock();
+ }
}
}
public void addListFirst(final LinkedList<MessageReference> list)
{
- lock.lock();
- try
+ ListIterator<MessageReference> iter = list.listIterator(list.size());
+
+ while (iter.hasPrevious())
{
- ListIterator<MessageReference> iter = list.listIterator(list.size());
-
- while (iter.hasPrevious())
- {
- MessageReference ref = iter.previous();
-
- messageReferences.addFirst(ref, ref.getMessage().getPriority());
- }
-
- deliver();
+ MessageReference ref = iter.previous();
+
+ messageReferences.addFirst(ref, ref.getMessage().getPriority());
}
- finally
- {
- lock.unlock();
- }
+
+ deliver();
}
public void deliverAsync(final Executor executor)
@@ -222,75 +228,67 @@
*
* @see org.jboss.messaging.newcore.intf.Queue#deliver()
*/
- public void deliver()
+ public synchronized void deliver()
{
- lock.lock();
- try
+ MessageReference reference;
+
+ ListIterator<MessageReference> iterator = null;
+
+ while (true)
{
- MessageReference reference;
-
- ListIterator<MessageReference> iterator = null;
-
- while (true)
+ if (iterator == null)
{
- if (iterator == null)
+ reference = messageReferences.peekFirst();
+ }
+ else
+ {
+ if (iterator.hasNext())
{
- reference = messageReferences.peekFirst();
+ reference = iterator.next();
}
else
{
- if (iterator.hasNext())
- {
- reference = iterator.next();
- }
- else
- {
- reference = null;
- }
+ reference = null;
}
-
- if (reference == null)
+ }
+
+ if (reference == null)
+ {
+ if (iterator == null)
{
- if (iterator == null)
- {
- // We delivered all the messages - go into direct delivery
- direct = true;
-
- promptDelivery = false;
- }
- return;
+ // We delivered all the messages - go into direct delivery
+ direct = true;
+
+ promptDelivery = false;
}
-
- HandleStatus status = deliver(reference);
-
- if (status == HandleStatus.HANDLED)
+ return;
+ }
+
+ HandleStatus status = deliver(reference);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ if (iterator == null)
{
- if (iterator == null)
- {
- messageReferences.removeFirst();
- }
- else
- {
- iterator.remove();
- }
+ messageReferences.removeFirst();
}
- else if (status == HandleStatus.BUSY)
+ else
{
- // All consumers busy - give up
- break;
+ iterator.remove();
}
- else if (status == HandleStatus.NO_MATCH && iterator == null)
- {
- // Consumers not all busy - but filter not accepting - iterate back
- // through the queue
- iterator = messageReferences.iterator();
- }
}
+ else if (status == HandleStatus.BUSY)
+ {
+ // All consumers busy - give up
+ break;
+ }
+ else if (status == HandleStatus.NO_MATCH && iterator == null)
+ {
+ // Consumers not all busy - but filter not accepting - iterate back
+ // through the queue
+ iterator = messageReferences.iterator();
+ }
}
- finally
- {
- lock.unlock();
- }
}
public synchronized void addConsumer(final Consumer consumer)
@@ -502,14 +500,18 @@
tx.commit();
}
- public void lock()
+ public synchronized void lock()
{
lock.lock();
+
+ locked = true;
}
- public void unlock()
- {
- lock.unlock();
+ public synchronized void unlock()
+ {
+ lock.unlock();
+
+ locked = false;
}
// Public
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -41,6 +41,7 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.ConcurrentHashSet;
+import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.SimpleString;
/**
@@ -89,6 +90,8 @@
private final SecurityStore securityStore;
private final ConnectionManager connectionManager;
+
+ private final OrderedExecutorFactory orderedExecutorFactory;
private final long createdTime;
@@ -111,7 +114,8 @@
final StorageManager persistenceManager,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice, final SecurityStore securityStore,
- final ConnectionManager connectionManager)
+ final ConnectionManager connectionManager,
+ final OrderedExecutorFactory orderedExecutorFactory)
{
this.id = id;
@@ -137,6 +141,8 @@
this.connectionManager = connectionManager;
+ this.orderedExecutorFactory = orderedExecutorFactory;
+
started = false;
createdTime = System.currentTimeMillis();
@@ -158,7 +164,8 @@
long id = dispatcher.generateID();
ServerSession session =
new ServerSessionImpl(id, autoCommitSends, autoCommitAcks, xa, this, resourceManager,
- sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore);
+ sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore,
+ orderedExecutorFactory.getOrderedExecutor());
sessions.add(session);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -27,8 +27,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.XAException;
@@ -131,12 +130,10 @@
private final AtomicLong deliveryIDSequence = new AtomicLong(0);
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final Executor executor;
private Transaction tx;
- // private final Object rollbackCancelLock = new Object();
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -146,7 +143,8 @@
final ResourceManager resourceManager, final PacketReturner sender,
final PacketDispatcher dispatcher, final StorageManager persistenceManager,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final PostOffice postOffice, final SecurityStore securityStore) throws Exception
+ final PostOffice postOffice, final SecurityStore securityStore,
+ final Executor executor) throws Exception
{
this.id = id;
@@ -174,6 +172,8 @@
this.postOffice = postOffice;
this.securityStore = securityStore;
+
+ this.executor = executor;
if (log.isTraceEnabled())
{
@@ -278,8 +278,6 @@
rollback();
- executor.shutdown();
-
deliveries.clear();
connection.removeSession(this);
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -172,7 +172,7 @@
conn2 = cf.createConnection();
- Session s = conn1.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session s = conn1.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer p = s.createProducer(queue1);
@@ -183,7 +183,7 @@
s.commit();
- Session s2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session s2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
// Create a consumer, start the session, close the consumer..
// This shouldn't cause any message to be lost
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -103,7 +103,7 @@
{
clientDispatcher = new PacketDispatcherImpl(null);
threadPool = Executors.newCachedThreadPool();
- handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
handler_1 = new TestPacketHandler(23);
clientDispatcher.register(handler_1);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -79,7 +79,7 @@
{
clientDispatcher = new PacketDispatcherImpl(null);
threadPool = Executors.newCachedThreadPool();
- handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
packetHandler = new TestPacketHandler(23);
clientDispatcher.register(packetHandler);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -15,7 +15,7 @@
import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
public class MinaServiceTest extends TestCase
{
@@ -66,7 +66,7 @@
ConfigurationImpl config = new ConfigurationImpl();
config.setTransport(INVM);
- invmService = new MinaService(config);
+ invmService = new RemotingServiceImpl(config);
invmService.start();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -13,7 +13,7 @@
import org.jboss.messaging.core.remoting.NIOConnector;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
import org.jboss.messaging.tests.unit.core.remoting.impl.SessionTestBase;
@@ -26,7 +26,7 @@
public class MinaSessionTest extends SessionTestBase
{
- private MinaService service;
+ private RemotingServiceImpl service;
// Constants -----------------------------------------------------
@@ -55,7 +55,7 @@
@Override
protected PacketDispatcher startServer() throws Exception
{
- service = new MinaService(createRemotingConfiguration());
+ service = new RemotingServiceImpl(createRemotingConfiguration());
service.start();
return service.getDispatcher();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -13,7 +13,7 @@
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -33,7 +33,7 @@
// Attributes ----------------------------------------------------
- private MinaService service;
+ private RemotingServiceImpl service;
// Static --------------------------------------------------------
@@ -64,7 +64,7 @@
"localhost", TestSupport.PORT);
clientConfig.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
clientConfig.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
- service = new MinaService(config, new DummyServerKeepAliveFactory());
+ service = new RemotingServiceImpl(config, new DummyServerKeepAliveFactory());
service.start();
MinaConnector connector = new MinaConnector(clientConfig.getLocation(), clientConfig.getConnectionParams(), new PacketDispatcherImpl(null));
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -17,7 +17,7 @@
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
@@ -28,7 +28,7 @@
*/
public abstract class MeasureBase extends TestCase
{
- protected MinaService service;
+ protected RemotingServiceImpl service;
protected PacketDispatcher serverDispatcher;
@Override
@@ -146,7 +146,7 @@
protected void startServer() throws Exception
{
- service = new MinaService(createConfiguration());
+ service = new RemotingServiceImpl(createConfiguration());
service.start();
serverDispatcher = service.getDispatcher();
System.out.println("Server Dispatcher = " + serverDispatcher);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java 2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java 2008-06-10 11:49:22 UTC (rev 4417)
@@ -32,7 +32,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.TransportType;
import static org.jboss.messaging.core.remoting.TransportType.TCP;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
import org.jboss.messaging.core.server.ConnectionManager;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.impl.MessagingServerImpl;
@@ -53,7 +53,7 @@
// Constants -----------------------------------------------------
Logger log = Logger.getLogger(ClientNetworkFailureTest.class);
private MessagingServer server;
- private MinaService minaService;
+ private RemotingServiceImpl minaService;
private NetworkFailureFilter networkFailureFilter;
// Static --------------------------------------------------------
@@ -80,7 +80,7 @@
newConfig.setKeepAliveTimeout(KEEP_ALIVE_TIMEOUT);
server = new MessagingServerImpl(newConfig);
server.start();
- minaService = (MinaService) server.getRemotingService();
+ minaService = (RemotingServiceImpl) server.getRemotingService();
networkFailureFilter = new NetworkFailureFilter();
minaService.getFilterChain().addFirst("network-failure",
networkFailureFilter);
More information about the jboss-cvs-commits
mailing list