[jboss-cvs] JBoss Messaging SVN: r6615 - in trunk: src/config/jboss-as/non-clustered and 31 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 29 02:26:48 EDT 2009
Author: timfox
Date: 2009-04-29 02:26:47 -0400 (Wed, 29 Apr 2009)
New Revision: 6615
Removed:
trunk/src/main/org/jboss/messaging/core/memory/
Modified:
trunk/src/config/jboss-as/clustered/jbm-configuration.xml
trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml
trunk/src/config/stand-alone/clustered/jbm-configuration.xml
trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java
trunk/src/schemas/jbm-configuration.xsd
trunk/tests/config/ConfigurationTest-config.xml
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java
Log:
mainly https://jira.jboss.org/jira/browse/JBMESSAGING-1525
Modified: trunk/src/config/jboss-as/clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/jbm-configuration.xml 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/jboss-as/clustered/jbm-configuration.xml 2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
<clustered>true</clustered>
<!-- Maximum number of threads to use for scheduled deliveries -->
- <scheduled-max-pool-size>30</scheduled-max-pool-size>
+ <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
<security-enabled>true</security-enabled>
Modified: trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml 2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
<clustered>false</clustered>
<!-- Maximum number of threads to use for scheduled deliveries -->
- <scheduled-max-pool-size>30</scheduled-max-pool-size>
+ <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
<security-enabled>true</security-enabled>
Modified: trunk/src/config/stand-alone/clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/jbm-configuration.xml 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/stand-alone/clustered/jbm-configuration.xml 2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
<clustered>true</clustered>
<!-- Maximum number of threads to use for scheduled deliveries -->
- <scheduled-max-pool-size>30</scheduled-max-pool-size>
+ <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
<security-enabled>true</security-enabled>
Modified: trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml 2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
<clustered>false</clustered>
<!-- Maximum number of threads to use for scheduled deliveries -->
- <scheduled-max-pool-size>30</scheduled-max-pool-size>
+ <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
<security-enabled>true</security-enabled>
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -75,7 +75,11 @@
long getQueueActivationTimeout();
void setQueueActivationTimeout(long timeout);
+
+ int getThreadPoolMaxSize();
+ void setThreadPoolMaxSize(int maxSize);
+
int getScheduledThreadPoolMaxSize();
void setScheduledThreadPoolMaxSize(int maxSize);
@@ -206,10 +210,6 @@
// Paging Properties --------------------------------------------------------------------
- int getPagingMaxThreads();
-
- void setPagingMaxThread(int pagingMaxThreads);
-
String getPagingDirectory();
void setPagingDirectory(String dir);
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -55,7 +55,9 @@
public static final long DEFAULT_QUEUE_ACTIVATION_TIMEOUT = 30000;
- public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 30;
+ public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 10;
+
+ public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
public static final long DEFAULT_SECURITY_INVALIDATION_INTERVAL = 10000;
@@ -158,6 +160,8 @@
protected long queueActivationTimeout = DEFAULT_QUEUE_ACTIVATION_TIMEOUT;
protected int scheduledThreadPoolMaxSize = DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+ protected int threadPoolMaxSize = DEFAULT_THREAD_POOL_MAX_SIZE;
protected long securityInvalidationInterval = DEFAULT_SECURITY_INVALIDATION_INTERVAL;
@@ -205,8 +209,6 @@
protected String pagingDirectory = DEFAULT_PAGING_DIR;
- protected int pagingMaxThreads = DEFAULT_PAGE_MAX_THREADS;
-
// File related attributes -----------------------------------------------------------
protected String largeMessagesDirectory = DEFAULT_LARGE_MESSAGES_DIR;
@@ -355,7 +357,17 @@
{
scheduledThreadPoolMaxSize = maxSize;
}
+
+ public int getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
+ public void setThreadPoolMaxSize(final int maxSize)
+ {
+ threadPoolMaxSize = maxSize;
+ }
+
public long getSecurityInvalidationInterval()
{
return securityInvalidationInterval;
@@ -531,16 +543,6 @@
return journalType;
}
- public int getPagingMaxThreads()
- {
- return pagingMaxThreads;
- }
-
- public void setPagingMaxThread(final int pagingMaxThreads)
- {
- this.pagingMaxThreads = pagingMaxThreads;
- }
-
public void setPagingDirectory(final String dir)
{
pagingDirectory = dir;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -107,7 +107,9 @@
// NOTE! All the defaults come from the super class
- scheduledThreadPoolMaxSize = getInteger(e, "scheduled-max-pool-size", scheduledThreadPoolMaxSize);
+ scheduledThreadPoolMaxSize = getInteger(e, "scheduled-thread-pool-max-size", scheduledThreadPoolMaxSize);
+
+ threadPoolMaxSize = getInteger(e, "thread-pool-max-size", threadPoolMaxSize);
securityEnabled = getBoolean(e, "security-enabled", securityEnabled);
@@ -265,8 +267,6 @@
journalDirectory = getString(e, "journal-directory", journalDirectory);
- pagingMaxThreads = getInteger(e, "paging-max-threads", pagingMaxThreads);
-
pagingDirectory = getString(e, "paging-directory", pagingDirectory);
pagingMaxGlobalSize = getLong(e, "paging-max-global-size-bytes", pagingMaxGlobalSize);
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -30,7 +30,6 @@
import javax.management.openmbean.TabularData;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.utils.SimpleString;
/**
* This interface describes the core management interface exposed by the server
@@ -52,6 +51,8 @@
boolean isClustered();
int getScheduledThreadPoolMaxSize();
+
+ int getThreadPoolMaxSize();
long getSecurityInvalidationInterval();
@@ -186,8 +187,6 @@
boolean isPersistIDCache();
- int getPagingMaxThreads();
-
int getPagingGlobalWatermarkSize();
String getLargeMessagesDirectory();
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -203,6 +203,11 @@
{
return configuration.getScheduledThreadPoolMaxSize();
}
+
+ public int getThreadPoolMaxSize()
+ {
+ return configuration.getThreadPoolMaxSize();
+ }
public long getSecurityInvalidationInterval()
{
@@ -584,11 +589,6 @@
return configuration.getPagingGlobalWatermarkSize();
}
- public int getPagingMaxThreads()
- {
- return configuration.getPagingMaxThreads();
- }
-
public long getQueueActivationTimeout()
{
return configuration.getQueueActivationTimeout();
Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -148,6 +148,11 @@
{
return localControl.getScheduledThreadPoolMaxSize();
}
+
+ public int getThreadPoolMaxSize()
+ {
+ return localControl.getThreadPoolMaxSize();
+ }
public long getSecurityInvalidationInterval()
{
@@ -364,11 +369,6 @@
return localControl.getPagingGlobalWatermarkSize();
}
- public int getPagingMaxThreads()
- {
- return localControl.getPagingMaxThreads();
- }
-
public long getQueueActivationTimeout()
{
return localControl.getQueueActivationTimeout();
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -33,9 +33,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
@@ -47,7 +44,7 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.utils.OrderedExecutorFactory;
+import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUIDGenerator;
@@ -69,10 +66,8 @@
private final String directory;
- private final ExecutorService parentExecutor;
+ private final ExecutorFactory executorFactory;
- private final OrderedExecutorFactory executorFactory;
-
private final Executor globalDepagerExecutor;
private PagingManager pagingManager;
@@ -85,14 +80,12 @@
// Constructors --------------------------------------------------
- public PagingStoreFactoryNIO(final String directory, final int maxThreads)
+ public PagingStoreFactoryNIO(final String directory, final ExecutorFactory executorFactory)
{
this.directory = directory;
- parentExecutor = Executors.newFixedThreadPool(maxThreads, new org.jboss.messaging.utils.JBMThreadFactory("JBM-depaging-threads"));
+ this.executorFactory = executorFactory;
- executorFactory = new org.jboss.messaging.utils.OrderedExecutorFactory(parentExecutor);
-
globalDepagerExecutor = executorFactory.getExecutor();
}
@@ -103,11 +96,8 @@
return globalDepagerExecutor;
}
- public void stop() throws InterruptedException
+ public void stop()
{
- parentExecutor.shutdown();
-
- parentExecutor.awaitTermination(30, TimeUnit.SECONDS);
}
public synchronized PagingStore newStore(final SimpleString destinationName, final AddressSettings settings) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -32,9 +32,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
@@ -76,7 +75,6 @@
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.utils.DataConstants;
import org.jboss.messaging.utils.IDGenerator;
-import org.jboss.messaging.utils.JBMThreadFactory;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUID;
@@ -93,16 +91,15 @@
public class JournalStorageManager implements StorageManager
{
private static final Logger log = Logger.getLogger(JournalStorageManager.class);
-
+
private static final long CHECKPOINT_BATCH_SIZE = 2 ^ 32;
-
// Bindings journal record type
public static final byte QUEUE_BINDING_RECORD = 21;
public static final byte PERSISTENT_ID_RECORD = 23;
-
+
public static final byte ID_COUNTER_RECORD = 24;
// type + expiration + timestamp + priority
@@ -125,7 +122,7 @@
public static final byte SET_SCHEDULED_DELIVERY_TIME = 36;
public static final byte DUPLICATE_ID = 37;
-
+
private UUID persistentID;
private final BatchingIDGenerator idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE);
@@ -138,17 +135,17 @@
private volatile boolean started;
- private final ExecutorService executor;
-
- public JournalStorageManager(final Configuration config)
+ private final Executor executor;
+
+ public JournalStorageManager(final Configuration config, final Executor executor)
{
- this.executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-journal-storage-manager"));
+ this.executor = executor;
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
{
throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
}
-
+
String bindingsDir = config.getBindingsDirectory();
if (bindingsDir == null)
@@ -214,41 +211,30 @@
largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesDirectory());
}
- /* This constructor is only used for testing */
- public JournalStorageManager(final Journal messageJournal,
- final Journal bindingsJournal,
- final SequentialFileFactory largeMessagesFactory)
- {
- this.executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-journal-storage-manager"));
- this.messageJournal = messageJournal;
- this.bindingsJournal = bindingsJournal;
- this.largeMessagesFactory = largeMessagesFactory;
- }
-
public UUID getPersistentID()
- {
+ {
return persistentID;
}
-
+
public void setPersistentID(UUID id) throws Exception
{
long recordID = generateUniqueID();
-
+
if (id != null)
{
- bindingsJournal.appendAddRecord(recordID, PERSISTENT_ID_RECORD, new PersistentIDEncoding(id), true);
+ bindingsJournal.appendAddRecord(recordID, PERSISTENT_ID_RECORD, new PersistentIDEncoding(id), true);
}
-
+
this.persistentID = id;
}
public long generateUniqueID()
{
long id = idGenerator.generateID();
-
+
return id;
}
-
+
public long getCurrentUniqueID()
{
return idGenerator.getCurrentID();
@@ -258,7 +244,7 @@
{
idGenerator.setID(id);
}
-
+
public LargeServerMessage createLargeMessage()
{
return new JournalLargeServerMessage(this);
@@ -466,7 +452,7 @@
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
- {
+ {
List<RecordInfo> records = new ArrayList<RecordInfo>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
@@ -679,11 +665,7 @@
}
}
- loadPreparedTransactions(pagingManager,
- resourceManager,
- queues,
- preparedTransactions,
- duplicateIDMap);
+ loadPreparedTransactions(pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
}
private void loadPreparedTransactions(final PagingManager pagingManager,
@@ -713,7 +695,7 @@
{
byte[] data = record.data;
- MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data);
+ MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
@@ -838,7 +820,7 @@
{
byte[] data = record.data;
- MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data);
+ MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data);
long messageID = record.id;
@@ -899,13 +881,13 @@
}
public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
- {
+ {
List<RecordInfo> records = new ArrayList<RecordInfo>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
bindingsJournal.load(records, preparedTransactions);
-
+
long lastID = -1;
for (RecordInfo record : records)
@@ -925,21 +907,21 @@
bindingEncoding.setPersistenceID(id);
queueBindingInfos.add(bindingEncoding);
- }
+ }
else if (rec == PERSISTENT_ID_RECORD)
{
PersistentIDEncoding encoding = new PersistentIDEncoding();
-
+
encoding.decode(buffer);
-
+
persistentID = encoding.uuid;
}
else if (rec == ID_COUNTER_RECORD)
{
IDCounterEncoding encoding = new IDCounterEncoding();
-
+
encoding.decode(buffer);
-
+
lastID = encoding.id;
}
else
@@ -950,8 +932,6 @@
idGenerator.setID(lastID + 1);
}
-
-
// MessagingComponent implementation
// ------------------------------------------------------
@@ -962,9 +942,9 @@
{
return;
}
-
+
cleanupIncompleteFiles();
-
+
bindingsJournal.start();
messageJournal.start();
@@ -978,18 +958,14 @@
{
return;
}
-
- //Must call close to make sure last id is persisted
+
+ // Must call close to make sure last id is persisted
idGenerator.close();
- executor.shutdown();
-
bindingsJournal.stop();
messageJournal.stop();
- executor.awaitTermination(60, TimeUnit.SECONDS);
-
persistentID = null;
started = false;
@@ -1019,7 +995,6 @@
{
this.executor.execute(new Runnable()
{
-
public void run()
{
try
@@ -1108,11 +1083,11 @@
nextID = start + checkpointSize;
}
-
+
public void setID(final long id)
{
this.counter.set(id);
-
+
nextID = id + checkpointSize;
}
@@ -1146,12 +1121,12 @@
{
return counter.get();
}
-
+
public void close()
{
storeID(counter.get());
}
-
+
private void storeID(final long id)
{
try
@@ -1164,7 +1139,7 @@
}
}
}
-
+
private static class XidEncoding implements EncodingSupport
{
final Xid xid;
@@ -1293,7 +1268,7 @@
}
}
-
+
private static class PersistentIDEncoding implements EncodingSupport
{
UUID uuid;
@@ -1310,9 +1285,9 @@
public void decode(final MessagingBuffer buffer)
{
byte[] bytes = new byte[16];
-
+
buffer.readBytes(bytes);
-
+
uuid = new UUID(UUID.TYPE_TIME_BASED, bytes);
}
@@ -1327,7 +1302,7 @@
}
}
-
+
private static class IDCounterEncoding implements EncodingSupport
{
long id;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -30,8 +30,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -100,11 +98,13 @@
private final ManagementService managementService;
- private ScheduledThreadPoolExecutor messageExpiryExecutor;
+ private Thread expiryReaper;
- private final long messageExpiryScanPeriod;
+ private final long reaperPeriod;
- private final int messageExpiryThreadPriority;
+ private final int reaperPriority;
+
+ private Reaper reaper;
private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
@@ -135,8 +135,8 @@
final PagingManager pagingManager,
final QueueFactory bindableFactory,
final ManagementService managementService,
- final long messageExpiryScanPeriod,
- final int messageExpiryThreadPriority,
+ final long reaperPeriod,
+ final int reaperPriority,
final boolean enableWildCardRouting,
final boolean backup,
final int idCacheSize,
@@ -155,9 +155,9 @@
this.pagingManager = pagingManager;
- this.messageExpiryScanPeriod = messageExpiryScanPeriod;
+ this.reaperPeriod = reaperPeriod;
- this.messageExpiryThreadPriority = messageExpiryThreadPriority;
+ this.reaperPriority = reaperPriority;
if (enableWildCardRouting)
{
@@ -181,7 +181,7 @@
// MessagingComponent implementation ---------------------------------------
- public void start() throws Exception
+ public synchronized void start() throws Exception
{
managementService.addNotificationListener(this);
@@ -200,31 +200,16 @@
started = true;
}
-
- private void startExpiryScanner()
+
+ public synchronized void stop() throws Exception
{
- if (messageExpiryScanPeriod > 0)
- {
- MessageExpiryRunner messageExpiryRunner = new MessageExpiryRunner();
- messageExpiryExecutor = new ScheduledThreadPoolExecutor(1,
- new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads",
- messageExpiryThreadPriority));
- messageExpiryExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- messageExpiryExecutor.scheduleWithFixedDelay(messageExpiryRunner,
- messageExpiryScanPeriod,
- messageExpiryScanPeriod,
- TimeUnit.MILLISECONDS);
- }
- }
-
- public void stop() throws Exception
- {
managementService.removeNotificationListener(this);
- if (messageExpiryExecutor != null)
+ if (reaper != null)
{
- messageExpiryExecutor.shutdown();
- messageExpiryExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ reaper.stop();
+
+ expiryReaper.join();
}
addressManager.clear();
@@ -820,6 +805,20 @@
// Private -----------------------------------------------------------------
+ private synchronized void startExpiryScanner()
+ {
+ if (reaperPeriod > 0)
+ {
+ reaper = new Reaper();
+
+ expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
+
+ expiryReaper.setPriority(reaperPriority);
+
+ expiryReaper.start();
+ }
+ }
+
private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
{
if (queue.getFilter() == null || queue.getFilter().match(message))
@@ -890,38 +889,75 @@
return oper;
}
}
-
- private class MessageExpiryRunner implements Runnable
+
+ private class Reaper implements Runnable
{
+ private boolean closed;
+
+ public synchronized void stop()
+ {
+ closed = true;
+
+ notify();
+ }
+
public synchronized void run()
{
- Map<SimpleString, Binding> nameMap = addressManager.getBindings();
-
- List<Queue> queues = new ArrayList<Queue>();
-
- for (Binding binding : nameMap.values())
+ while (true)
{
- if (binding.getType() == BindingType.LOCAL_QUEUE)
+ long toWait = reaperPeriod;
+
+ long start = System.currentTimeMillis();
+
+ while (!closed && toWait > 0)
{
- Queue queue = (Queue)binding.getBindable();
-
- queues.add(queue);
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
}
- }
-
- for (Queue queue : queues)
- {
- try
+
+ if (closed)
{
- queue.expireReferences();
+ return;
}
- catch (Exception e)
+
+ Map<SimpleString, Binding> nameMap = addressManager.getBindings();
+
+ List<Queue> queues = new ArrayList<Queue>();
+
+ for (Binding binding : nameMap.values())
{
- log.error("failed to expire messages for queue " + queue.getName(), e);
+ if (binding.getType() == BindingType.LOCAL_QUEUE)
+ {
+ Queue queue = (Queue)binding.getBindable();
+
+ queues.add(queue);
+ }
}
+
+ for (Queue queue : queues)
+ {
+ try
+ {
+ queue.expireReferences();
+ }
+ catch (Exception e)
+ {
+ log.error("failed to expire messages for queue " + queue.getName(), e);
+ }
+ }
}
}
-
}
private class PageMessageOperation implements TransactionOperation
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -113,7 +113,9 @@
private boolean backup;
private volatile boolean started;
-
+
+ private int replicationCount;
+
/*
* Constructor using static list of connectors
*/
@@ -505,8 +507,6 @@
waitForReplicationsToComplete(3000);
}
- private int replicationCount;
-
private synchronized void waitForReplicationsToComplete(long timeout)
{
long toWait = timeout;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
@@ -96,7 +97,9 @@
import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.Future;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUID;
@@ -143,18 +146,18 @@
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
- private ScheduledExecutorService scheduledExecutor;
-
private QueueFactory queueFactory;
private PagingManager pagingManager;
private PostOffice postOffice;
- private ExecutorService asyncDeliveryPool;
+ private ExecutorService threadPool;
- private org.jboss.messaging.utils.ExecutorFactory executorFactory;
+ private ScheduledExecutorService scheduledPool;
+ private ExecutorFactory executorFactory;
+
private HierarchicalRepository<Set<Role>> securityRepository;
private ResourceManager resourceManager;
@@ -270,10 +273,10 @@
{
clusterManager.stop();
}
-
- //Need to flush all sessions to make sure all confirmations get sent back to client
-
- for (ServerSession session: sessions.values())
+
+ // Need to flush all sessions to make sure all confirmations get sent back to client
+
+ for (ServerSession session : sessions.values())
{
session.getChannel().flushConfirmations();
}
@@ -300,20 +303,6 @@
securityManager.stop();
- asyncDeliveryPool.shutdown();
-
- try
- {
- if (!asyncDeliveryPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for pool to terminate");
- }
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
-
if (replicatingConnection != null)
{
try
@@ -329,19 +318,33 @@
}
pagingManager.stop();
+ resourceManager.stop();
+ postOffice.stop();
+
pagingManager = null;
securityStore = null;
- resourceManager.stop();
resourceManager = null;
- postOffice.stop();
postOffice = null;
securityRepository = null;
securityStore = null;
- scheduledExecutor.shutdown();
queueFactory = null;
resourceManager = null;
messagingServerControl = null;
+ scheduledPool.shutdown();
+ threadPool.shutdown();
+ try
+ {
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
+
sessions.clear();
started = false;
@@ -561,14 +564,14 @@
this.nodeID = new SimpleString(uuid.toString());
initialisePart2();
-
+
long backupID = storageManager.getCurrentUniqueID();
if (liveUniqueID != backupID)
{
initialised = false;
-
- throw new IllegalStateException("Live and backup unique ids different. You're probably trying to restart a live backup pair after a crash");
+
+ throw new IllegalStateException("Live and backup unique ids different. You're probably trying to restart a live backup pair after a crash");
}
log.info("Backup server is now operational");
@@ -588,7 +591,7 @@
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- scheduledExecutor,
+ scheduledPool,
listener);
if (replicatingConnection == null)
@@ -771,8 +774,7 @@
protected PagingManager createPagingManager()
{
- return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
- configuration.getPagingMaxThreads()),
+ return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
storageManager,
addressSettingsRepository,
configuration.getPagingMaxGlobalSizeBytes(),
@@ -804,9 +806,9 @@
for (Queue queue : toActivate)
{
- scheduledExecutor.schedule(new ActivateRunner(queue),
- configuration.getQueueActivationTimeout(),
- TimeUnit.MILLISECONDS);
+ scheduledPool.schedule(new ActivateRunner(queue),
+ configuration.getQueueActivationTimeout(),
+ TimeUnit.MILLISECONDS);
}
configuration.setBackup(false);
@@ -865,14 +867,24 @@
private void initialisePart2() throws Exception
{
- // Create the pools and executor related objects
- asyncDeliveryPool = Executors.newCachedThreadPool(new org.jboss.messaging.utils.JBMThreadFactory("JBM-async-session-delivery-threads"));
+ // Create the pools - we have two pools - one for non scheduled - and another for scheduled
- executorFactory = new org.jboss.messaging.utils.OrderedExecutorFactory(asyncDeliveryPool);
+ ThreadFactory tFactory = new org.jboss.messaging.utils.JBMThreadFactory("JBM-threads");
- scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(),
- new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads"));
+ if (configuration.getThreadPoolMaxSize() == -1)
+ {
+ threadPool = Executors.newCachedThreadPool(tFactory);
+ }
+ else
+ {
+ threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory);
+ }
+ executorFactory = new OrderedExecutorFactory(threadPool);
+
+ scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(),
+ new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads"));
+
// Create the hard-wired components
if (configuration.isEnableFileDeployment())
@@ -882,7 +894,7 @@
if (configuration.isEnablePersistence())
{
- storageManager = new JournalStorageManager(configuration);
+ storageManager = new JournalStorageManager(configuration, threadPool);
}
else
{
@@ -899,7 +911,7 @@
configuration.getManagementClusterPassword(),
managementService);
- queueFactory = new QueueFactoryImpl(scheduledExecutor, addressSettingsRepository, storageManager);
+ queueFactory = new QueueFactoryImpl(scheduledPool, addressSettingsRepository, storageManager);
pagingManager = createPagingManager();
@@ -999,7 +1011,7 @@
clusterManager = new ClusterManagerImpl(executorFactory,
this,
postOffice,
- scheduledExecutor,
+ scheduledPool,
managementService,
configuration,
uuid,
@@ -1389,7 +1401,7 @@
public void run()
{
- queue.activateNow(asyncDeliveryPool);
+ queue.activateNow(threadPool);
}
}
@@ -1416,13 +1428,13 @@
if (conn != null)
{
// Execute on different thread to avoid deadlocks
- new Thread()
+ threadPool.execute(new Runnable()
{
public void run()
{
conn.fail(me);
}
- }.start();
+ });
}
}
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -102,7 +102,7 @@
private ClientBootstrap bootstrap;
- ChannelGroup channelGroup;
+ private ChannelGroup channelGroup;
private final BufferHandler handler;
Modified: trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -47,7 +47,7 @@
{
this.parent = parent;
}
-
+
/**
* Get an executor that always executes tasks in order.
*
Modified: trunk/src/schemas/jbm-configuration.xsd
===================================================================
--- trunk/src/schemas/jbm-configuration.xsd 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/schemas/jbm-configuration.xsd 2009-04-29 06:26:47 UTC (rev 6615)
@@ -28,15 +28,22 @@
minOccurs="0" />
<xsd:element ref="enable-persistence" maxOccurs="1"
minOccurs="0" />
- <xsd:element name="scheduled-max-pool-size"
+ <xsd:element name="scheduled-thread-pool-max-size"
type="xsd:int" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- Maximum number of threads to use for
- scheduled deliveries
+ Maximum number of threads to use for the scheduled thread pool
</xsd:documentation>
</xsd:annotation>
</xsd:element>
+ <xsd:element name="thread-pool-max-size"
+ type="xsd:int" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ Maximum number of threads to use for the thread pool
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
<xsd:element name="security-enabled" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/config/ConfigurationTest-config.xml 2009-04-29 06:26:47 UTC (rev 6615)
@@ -2,7 +2,7 @@
<configuration>
<clustered>true</clustered>
<enable-file-deployment>true</enable-file-deployment>
- <scheduled-max-pool-size>12345</scheduled-max-pool-size>
+ <scheduled-thread-pool-max-size>12345</scheduled-thread-pool-max-size>
<security-enabled>false</security-enabled>
<security-invalidation-interval>5423</security-invalidation-interval>
<wild-card-routing-enabled>true</wild-card-routing-enabled>
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-04-29 06:26:47 UTC (rev 6615)
@@ -2,7 +2,8 @@
<configuration>
<clustered>true</clustered>
<enable-file-deployment>true</enable-file-deployment>
- <scheduled-max-pool-size>12345</scheduled-max-pool-size>
+ <scheduled-thread-pool-max-size>12345</scheduled-thread-pool-max-size>
+ <thread-pool-max-size>54321</thread-pool-max-size>
<security-enabled>false</security-enabled>
<security-invalidation-interval>5423</security-invalidation-interval>
<wild-card-routing-enabled>true</wild-card-routing-enabled>
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -1,935 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- */
-public class ConsumerWindowSizeTest extends ServiceTestBase
-{
- private final SimpleString addressA = new SimpleString("addressA");
-
- private final SimpleString queueA = new SimpleString("queueA");
-
- private final int TIMEOUT = 5;
-
- private static final Logger log = Logger.getLogger(ConsumerWindowSizeTest.class);
-
- private static final boolean isTrace = log.isTraceEnabled();
-
- /*
- * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
- * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
- * to its window size
- * */
- public void testSendWindowSize() throws Exception
- {
- MessagingServer messagingService = createServer(false);
- ClientSessionFactory cf = createInVMFactory();
- try
- {
- messagingService.start();
- cf.setBlockOnNonPersistentSend(false);
- ClientSession sendSession = cf.createSession(false, true, true);
- ClientSession receiveSession = cf.createSession(false, true, true);
- sendSession.createQueue(addressA, queueA, false);
- ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
- ClientMessage cm = sendSession.createClientMessage(false);
- cm.setDestination(addressA);
- int encodeSize = cm.getEncodeSize();
- int numMessage = 100;
- cf.setConsumerWindowSize(numMessage * encodeSize);
- ClientSession session = cf.createSession(false, true, true);
- ClientProducer cp = sendSession.createProducer(addressA);
- ClientConsumer cc = session.createConsumer(queueA);
- session.start();
- receiveSession.start();
- for (int i = 0; i < numMessage * 4; i++)
- {
- cp.send(sendSession.createClientMessage(false));
- }
-
- for (int i = 0; i < numMessage * 2; i++)
- {
- ClientMessage m = receivingConsumer.receive(5000);
- assertNotNull(m);
- m.acknowledge();
- }
- receiveSession.close();
-
- for (int i = 0; i < numMessage * 2; i++)
- {
- ClientMessage m = cc.receive(5000);
- assertNotNull(m);
- m.acknowledge();
- }
-
- session.close();
- sendSession.close();
-
- assertEquals(0, getMessageCount(messagingService, queueA.toString()));
-
- }
- finally
- {
- if (messagingService.isStarted())
- {
- messagingService.stop();
- }
- }
- }
-
- public void testSlowConsumerBufferingOne() throws Exception
- {
- MessagingServer server = createServer(false);
-
- ClientSession sessionB = null;
- ClientSession session = null;
-
- try
- {
- final int numberOfMessages = 100;
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
- sf.setConsumerWindowSize(1);
-
- session = sf.createSession(false, true, true);
-
- SimpleString ADDRESS = addressA;
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
- sessionB = sf.createSession(false, true, true);
- sessionB.start();
-
- session.start();
-
- ClientConsumer consNeverUsed = sessionB.createConsumer(ADDRESS);
-
- ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
- ClientProducer prod = session.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- prod.send(createTextMessage(session, "Msg" + i));
- }
-
- for (int i = 0; i < numberOfMessages - 1; i++)
- {
- ClientMessage msg = cons1.receive(1000);
- assertNotNull("expected message at i = " + i, msg);
- msg.acknowledge();
- }
-
- ClientMessage msg = consNeverUsed.receive(500);
- assertNotNull(msg);
- msg.acknowledge();
-
- session.close();
- session = null;
-
- sessionB.close();
- sessionB = null;
-
- assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
- }
- finally
- {
- try
- {
- if (session != null)
- {
- session.close();
- }
- if (sessionB != null)
- {
- sessionB.close();
- }
- }
- catch (Exception ignored)
- {
- }
-
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
-
- public void testSlowConsumerNoBuffer() throws Exception
- {
- internalTestSlowConsumerNoBuffer(false);
- }
-
- public void testSlowConsumerNoBufferLargeMessages() throws Exception
- {
- internalTestSlowConsumerNoBuffer(true);
- }
-
- private void internalTestSlowConsumerNoBuffer(final boolean largeMessages) throws Exception
- {
- MessagingServer server = createServer(false);
-
- ClientSession sessionB = null;
- ClientSession session = null;
-
- try
- {
- final int numberOfMessages = 100;
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
- sf.setConsumerWindowSize(0);
-
- if (largeMessages)
- {
- sf.setMinLargeMessageSize(100);
- }
-
- session = sf.createSession(false, true, true);
-
- SimpleString ADDRESS = addressA;
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
- sessionB = sf.createSession(false, true, true);
- sessionB.start();
-
- session.start();
-
- ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
- ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
- ClientProducer prod = session.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = createTextMessage(session, "Msg" + i);
-
- if (largeMessages)
- {
- msg.getBody().writeBytes(new byte[600]);
- }
-
- prod.send(msg);
- }
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = cons1.receive(1000);
- assertNotNull("expected message at i = " + i, msg);
- assertEquals("Msg" + i, getTextMessage(msg));
- msg.acknowledge();
- }
-
- assertEquals(0, consNeverUsed.getBufferSize());
-
- session.close();
- session = null;
-
- sessionB.close();
- sessionB = null;
-
- assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
- }
- finally
- {
- try
- {
- if (session != null)
- {
- session.close();
- }
- if (sessionB != null)
- {
- sessionB.close();
- }
- }
- catch (Exception ignored)
- {
- }
-
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
-
- public void testSlowConsumerNoBuffer2() throws Exception
- {
- internalTestSlowConsumerNoBuffer2(false);
- }
-
- public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
- {
- internalTestSlowConsumerNoBuffer2(true);
- }
-
- private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws Exception
- {
- MessagingServer server = createServer(false);
-
- ClientSession session1 = null;
- ClientSession session2 = null;
-
- try
- {
- final int numberOfMessages = 100;
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- sf.setConsumerWindowSize(0);
-
- if (largeMessages)
- {
- sf.setMinLargeMessageSize(100);
- }
-
- session1 = sf.createSession(false, true, true);
-
- session2 = sf.createSession(false, true, true);
-
- session1.start();
-
- session2.start();
-
- SimpleString ADDRESS = new SimpleString("some-queue");
-
- session1.createQueue(ADDRESS, ADDRESS, true);
-
- ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
-
- // Note we make sure we send the messages *before* cons2 is created
-
- ClientProducer prod = session1.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = createTextMessage(session1, "Msg" + i);
- if (largeMessages)
- {
- msg.getBody().writeBytes(new byte[600]);
- }
- prod.send(msg);
- }
-
- ClientConsumerInternal cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages / 2; i++)
- {
- ClientMessage msg = cons1.receive(1000);
- assertNotNull("expected message at i = " + i, msg);
-
- String str = getTextMessage(msg);
- assertEquals("Msg" + i, str);
-
- msg.acknowledge();
-
- assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
- }
-
- for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
- {
- ClientMessage msg = cons2.receive(1000);
-
- assertNotNull("expected message at i = " + i, msg);
-
- assertEquals("Msg" + i, msg.getBody().readString());
-
- msg.acknowledge();
-
- assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
- }
-
- session1.close(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
- // the getMessageCount would fail
- session2.close();
-
- session1 = sf.createSession(false, true, true);
- session1.start();
- session2 = sf.createSession(false, true, true);
- session2.start();
-
- prod = session1.createProducer(ADDRESS);
-
- assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
- // This should also work the other way around
-
- cons1.close();
-
- cons2.close();
-
- cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
-
- // Note we make sure we send the messages *before* cons2 is created
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = createTextMessage(session1, "Msg" + i);
- if (largeMessages)
- {
- msg.getBody().writeBytes(new byte[600]);
- }
- prod.send(msg);
- }
-
- cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
-
- // Now we receive on cons2 first
-
- for (int i = 0; i < numberOfMessages / 2; i++)
- {
- ClientMessage msg = cons2.receive(1000);
- assertNotNull("expected message at i = " + i, msg);
-
- assertEquals("Msg" + i, msg.getBody().readString());
-
- msg.acknowledge();
-
- assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
-
- }
-
- for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
- {
- ClientMessage msg = cons1.receive(1000);
-
- assertNotNull("expected message at i = " + i, msg);
-
- assertEquals("Msg" + i, msg.getBody().readString());
-
- msg.acknowledge();
-
- assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
- }
-
- session1.close();
- session1 = null;
- session2.close();
- session2 = null;
- assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
- }
- finally
- {
- try
- {
- if (session1 != null)
- {
- session1.close();
- }
- if (session2 != null)
- {
- session2.close();
- }
- }
- catch (Exception ignored)
- {
- }
-
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
-
- public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
- {
- internalTestSlowConsumerOnMessageHandlerNoBuffers(false);
- }
-
- public void testSlowConsumerOnMessageHandlerNoBuffersLargeMessage() throws Exception
- {
- internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
- }
-
- public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
- {
-
- MessagingServer server = createServer(false);
-
- ClientSession sessionB = null;
- ClientSession session = null;
-
- try
- {
- final int numberOfMessages = 100;
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
- sf.setConsumerWindowSize(0);
-
- if (largeMessages)
- {
- sf.setMinLargeMessageSize(100);
- }
-
- session = sf.createSession(false, true, true);
-
- SimpleString ADDRESS = new SimpleString("some-queue");
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
- sessionB = sf.createSession(false, true, true);
- sessionB.start();
-
- session.start();
-
- ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
- final CountDownLatch latchReceived = new CountDownLatch(2);
-
- final CountDownLatch latchDone = new CountDownLatch(1);
-
- // It can't close the session while the large message is being read
- final CountDownLatch latchRead = new CountDownLatch(1);
-
- // It should receive two messages and then give up
- class LocalHandler implements MessageHandler
- {
- boolean failed = false;
-
- int count = 0;
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
- */
- public synchronized void onMessage(final ClientMessage message)
- {
- try
- {
- String str = getTextMessage(message);
-
- failed = failed || !str.equals("Msg" + count);
-
- message.acknowledge();
- latchReceived.countDown();
-
- if (count++ == 1)
- {
- // it will hold here for a while
- if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
- // thread around
- {
- new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
- failed = true;
- }
-
- if (largeMessages)
- {
- message.getBody().readBytes(new byte[600]);
- }
-
- latchRead.countDown();
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); // Hudson / JUnit report
- failed = true;
- }
- }
- }
-
- LocalHandler handler = new LocalHandler();
-
- ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
- ClientProducer prod = session.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = createTextMessage(session, "Msg" + i);
- if (largeMessages)
- {
- msg.getBody().writeBytes(new byte[600]);
- }
- prod.send(msg);
- }
-
- consReceiveOneAndHold.setMessageHandler(handler);
-
- assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-
- assertEquals(0, consReceiveOneAndHold.getBufferSize());
-
- for (int i = 2; i < numberOfMessages; i++)
- {
- ClientMessage msg = cons1.receive(1000);
- assertNotNull("expected message at i = " + i, msg);
- assertEquals("Msg" + i, getTextMessage(msg));
- msg.acknowledge();
- }
-
- assertEquals(0, consReceiveOneAndHold.getBufferSize());
-
- latchDone.countDown();
-
- // The test can' t close the session while the message is still being read, or it could interrupt the data
- assertTrue(latchRead.await(10, TimeUnit.SECONDS));
-
- session.close();
- session = null;
-
- sessionB.close();
- sessionB = null;
-
- assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
- assertFalse("MessageHandler received a failure", handler.failed);
-
- }
- finally
- {
- try
- {
- if (session != null)
- {
- session.close();
- }
- if (sessionB != null)
- {
- sessionB.close();
- }
- }
- catch (Exception ignored)
- {
- }
-
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
-
- public void testSlowConsumerOnMessageHandlerBufferOne() throws Exception
- {
- internalTestSlowConsumerOnMessageHandlerBufferOne(false);
- }
-
- private void internalTestSlowConsumerOnMessageHandlerBufferOne(final boolean largeMessage) throws Exception
- {
- MessagingServer server = createServer(false);
-
- ClientSession sessionB = null;
- ClientSession session = null;
-
- try
- {
- final int numberOfMessages = 100;
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
- sf.setConsumerWindowSize(1);
-
- if (largeMessage)
- {
- sf.setMinLargeMessageSize(100);
- }
-
- session = sf.createSession(false, true, true);
-
- SimpleString ADDRESS = new SimpleString("some-queue");
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
- sessionB = sf.createSession(false, true, true);
- sessionB.start();
-
- session.start();
-
- ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
- final CountDownLatch latchReceived = new CountDownLatch(2);
- final CountDownLatch latchReceivedBuffered = new CountDownLatch(3);
-
- final CountDownLatch latchDone = new CountDownLatch(1);
-
- // It should receive two messages and then give up
- class LocalHandler implements MessageHandler
- {
- boolean failed = false;
-
- int count = 0;
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
- */
- public synchronized void onMessage(final ClientMessage message)
- {
- try
- {
- String str = getTextMessage(message);
- if (isTrace)
- {
- log.trace("Received message " + str);
- }
-
- failed = failed || !str.equals("Msg" + count);
-
- message.acknowledge();
- latchReceived.countDown();
- latchReceivedBuffered.countDown();
-
- if (count++ == 1)
- {
- // it will hold here for a while
- if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS))
- {
- new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
- failed = true;
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); // Hudson / JUnit report
- failed = true;
- }
- }
- }
-
- LocalHandler handler = new LocalHandler();
-
- ClientProducer prod = session.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = createTextMessage(session, "Msg" + i);
- if (largeMessage)
- {
- msg.getBody().writeBytes(new byte[600]);
- }
- prod.send(msg);
- }
-
- consReceiveOneAndHold.setMessageHandler(handler);
-
- assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-
- long timeout = System.currentTimeMillis() + 1000 * TIMEOUT;
- while (consReceiveOneAndHold.getBufferSize() == 0 && System.currentTimeMillis() < timeout)
- {
- Thread.sleep(10);
- }
-
- assertEquals(1, consReceiveOneAndHold.getBufferSize());
-
- ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
- for (int i = 3; i < numberOfMessages; i++)
- {
- ClientMessage msg = cons1.receive(1000);
- assertNotNull("expected message at i = " + i, msg);
- String text = getTextMessage(msg);
- assertEquals("Msg" + i, text);
- msg.acknowledge();
- }
-
- latchDone.countDown();
-
- assertTrue(latchReceivedBuffered.await(TIMEOUT, TimeUnit.SECONDS));
-
- session.close();
- session = null;
-
- sessionB.close();
- sessionB = null;
-
- assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
- assertFalse("MessageHandler received a failure", handler.failed);
-
- }
- finally
- {
- try
- {
- if (session != null)
- {
- session.close();
- }
- if (sessionB != null)
- {
- sessionB.close();
- }
- }
- catch (Exception ignored)
- {
- ignored.printStackTrace();
- }
-
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
-
- public void testNoWindowRoundRobin() throws Exception
- {
- testNoWindowRoundRobin(false);
- }
-
- public void testNoWindowRoundRobinLargeMessage() throws Exception
- {
- testNoWindowRoundRobin(true);
- }
-
- private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
- {
-
- MessagingServer server = createServer(false);
-
- ClientSession sessionA = null;
- ClientSession sessionB = null;
-
- try
- {
- final int numberOfMessages = 100;
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
- sf.setConsumerWindowSize(-1);
-
- if (largeMessages)
- {
- sf.setMinLargeMessageSize(100);
- }
-
- sessionA = sf.createSession(false, true, true);
-
- SimpleString ADDRESS = new SimpleString("some-queue");
-
- sessionA.createQueue(ADDRESS, ADDRESS, true);
-
- sessionB = sf.createSession(false, true, true);
-
- sessionA.start();
- sessionB.start();
-
- ClientConsumerInternal consA = (ClientConsumerInternal)sessionA.createConsumer(ADDRESS);
-
- ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
- ClientProducer prod = sessionA.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = createTextMessage(sessionA, "Msg" + i);
- if (largeMessages)
- {
- msg.getBody().writeBytes(new byte[600]);
- }
- prod.send(msg);
- }
-
- long timeout = System.currentTimeMillis() + TIMEOUT * 1000;
-
- boolean foundA = false;
- boolean foundB = false;
-
- do
- {
- foundA = consA.getBufferSize() == numberOfMessages / 2;
- foundB = consB.getBufferSize() == numberOfMessages / 2;
-
- Thread.sleep(10);
- }
- while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
-
- assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() +
- ", consB=" +
- consB.getBufferSize() +
- ") foundA = " +
- foundA +
- " foundB = " +
- foundB,
- foundA);
- assertTrue("ConsumerB didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() +
- ", consB=" +
- consB.getBufferSize() +
- ") foundA = " +
- foundA +
- " foundB = " +
- foundB,
- foundB);
-
- }
- finally
- {
- try
- {
- if (sessionA != null)
- {
- sessionA.close();
- }
- if (sessionB != null)
- {
- sessionB.close();
- }
- }
- catch (Exception ignored)
- {
- }
-
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
-
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -25,6 +25,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.integration.journal;
import java.io.File;
+import java.util.concurrent.Executors;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.journal.SequentialFileFactory;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -81,6 +81,7 @@
assertEquals(conf.isBackup(), serverControl.isBackup());
assertEquals(conf.getQueueActivationTimeout(), serverControl.getQueueActivationTimeout());
assertEquals(conf.getScheduledThreadPoolMaxSize(), serverControl.getScheduledThreadPoolMaxSize());
+ assertEquals(conf.getThreadPoolMaxSize(), serverControl.getThreadPoolMaxSize());
assertEquals(conf.getSecurityInvalidationInterval(), serverControl.getSecurityInvalidationInterval());
assertEquals(conf.isSecurityEnabled(), serverControl.isSecurityEnabled());
assertEquals(conf.getInterceptorClassNames(), serverControl.getInterceptorClassNames());
@@ -102,8 +103,7 @@
assertEquals(conf.getJournalMaxAIO(), serverControl.getJournalMaxAIO());
assertEquals(conf.getJournalBufferReuseSize(), serverControl.getJournalBufferReuseSize());
assertEquals(conf.isCreateBindingsDir(), serverControl.isCreateBindingsDir());
- assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());
- assertEquals(conf.getPagingMaxThreads(), serverControl.getPagingMaxThreads());
+ assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());
assertEquals(conf.getPagingDirectory(), serverControl.getPagingDirectory());
assertEquals(conf.getPagingMaxGlobalSizeBytes(), serverControl.getPagingMaxGlobalSizeBytes());
assertEquals(conf.getPagingGlobalWatermarkSize(), serverControl.getPagingGlobalWatermarkSize());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -260,11 +260,6 @@
return (Long)proxy.retrieveAttributeValue("PagingMaxGlobalSizeBytes");
}
- public int getPagingMaxThreads()
- {
- return (Integer)proxy.retrieveAttributeValue("PagingMaxThreads");
- }
-
public long getQueueActivationTimeout()
{
return (Long)proxy.retrieveAttributeValue("QueueActivationTimeout");
@@ -274,6 +269,11 @@
{
return (Integer)proxy.retrieveAttributeValue("ScheduledThreadPoolMaxSize");
}
+
+ public int getThreadPoolMaxSize()
+ {
+ return (Integer)proxy.retrieveAttributeValue("ThreadPoolMaxSize");
+ }
public long getSecurityInvalidationInterval()
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
@@ -251,8 +252,7 @@
@Override
protected PagingManager createPagingManager()
{
- return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
- super.getConfiguration().getPagingMaxThreads()),
+ return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory()),
super.getStorageManager(),
super.getAddressSettingsRepository(),
super.getConfiguration().getPagingMaxGlobalSizeBytes(),
@@ -268,9 +268,9 @@
* @param directory
* @param maxThreads
*/
- public FailurePagingStoreFactoryNIO(final String directory, final int maxThreads)
+ public FailurePagingStoreFactoryNIO(final String directory)
{
- super(directory, maxThreads);
+ super(directory, new OrderedExecutorFactory(Executors.newCachedThreadPool()));
}
// Constants -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -22,6 +22,8 @@
package org.jboss.messaging.tests.integration.persistence;
+import java.util.concurrent.Executors;
+
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
import org.jboss.messaging.core.server.JournalType;
@@ -60,7 +62,7 @@
configuration.setJournalType(JournalType.NIO);
- final JournalStorageManager journal = new JournalStorageManager(configuration);
+ final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
LargeServerMessage msg = journal.createLargeMessage();
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.performance.journal;
import java.io.File;
+import java.util.concurrent.Executors;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.journal.SequentialFileFactory;
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
import java.io.File;
import java.util.HashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
@@ -125,7 +126,7 @@
configuration.setJournalType(journalType);
- final JournalStorageManager journal = new JournalStorageManager(configuration);
+ final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.concurrent.Executors;
import org.jboss.messaging.core.journal.LoadManager;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.stress.journal.remote;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.journal.LoadManager;
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.timing.core.journal.impl;
import java.io.File;
+import java.util.concurrent.Executors;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.journal.SequentialFileFactory;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -49,6 +49,8 @@
assertEquals(ConfigurationImpl.DEFAULT_QUEUE_ACTIVATION_TIMEOUT, conf.getQueueActivationTimeout());
assertEquals(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, conf.getScheduledThreadPoolMaxSize());
+
+ assertEquals(ConfigurationImpl.DEFAULT_THREAD_POOL_MAX_SIZE, conf.getThreadPoolMaxSize());
assertEquals(ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL, conf.getSecurityInvalidationInterval());
@@ -115,8 +117,6 @@
assertEquals(ConfigurationImpl.DEFAULT_CREATE_JOURNAL_DIR, conf.isCreateJournalDir());
- assertEquals(ConfigurationImpl.DEFAULT_PAGE_MAX_THREADS, conf.getPagingMaxThreads());
-
assertEquals(ConfigurationImpl.DEFAULT_PAGING_DIR, conf.getPagingDirectory());
assertEquals(ConfigurationImpl.DEFAULT_PAGE_MAX_GLOBAL_SIZE, conf.getPagingMaxGlobalSizeBytes());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -45,7 +45,8 @@
//Check they match the values from the test file
assertEquals(true, conf.isClustered());
assertEquals(true, conf.isEnableFileDeployment());
- assertEquals(12345, conf.getScheduledThreadPoolMaxSize());
+ assertEquals(12345, conf.getScheduledThreadPoolMaxSize());
+ assertEquals(54321, conf.getThreadPoolMaxSize());
assertEquals(false, conf.isSecurityEnabled());
assertEquals(5423, conf.getSecurityInvalidationInterval());
assertEquals(true, conf.isWildcardRoutingEnabled());
@@ -67,8 +68,7 @@
assertEquals(true, conf.isPersistIDCache());
assertEquals(12456, conf.getQueueActivationTimeout());
assertEquals(true, conf.isBackup());
- assertEquals(true, conf.isPersistDeliveryCountBeforeDelivery());
- assertEquals(2, conf.getPagingMaxThreads());
+ assertEquals(true, conf.isPersistDeliveryCountBeforeDelivery());
assertEquals("pagingdir", conf.getPagingDirectory());
assertEquals(123, conf.getPagingGlobalWatermarkSize());
assertEquals(4567, conf.getPagingMaxGlobalSizeBytes());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -147,6 +147,12 @@
private class FakeServerControl implements MessagingServerControlMBean
{
+ public int getThreadPoolMaxSize()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
public boolean closeConnectionsForAddress(String ipAddress) throws Exception
{
// TODO Auto-generated method stub
@@ -360,12 +366,6 @@
return 0;
}
- public int getPagingMaxThreads()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
public long getQueueActivationTimeout()
{
// TODO Auto-generated method stub
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java 2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java 2009-04-29 06:26:47 UTC (rev 6615)
@@ -25,6 +25,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.Executors;
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.paging.Page;
@@ -41,6 +42,7 @@
import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -67,7 +69,7 @@
HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<AddressSettings>();
addressSettings.setDefault(new AddressSettings());
- PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), 10),
+ PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), new OrderedExecutorFactory(Executors.newCachedThreadPool())),
new NullStorageManager(),
addressSettings,
-1,
More information about the jboss-cvs-commits
mailing list