[jboss-cvs] JBoss Messaging SVN: r5354 - in trunk: src/main/org/jboss/messaging/core/paging and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 13 16:55:14 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-11-13 16:55:14 -0500 (Thu, 13 Nov 2008)
New Revision: 5354
Added:
trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1450 - proper shutdown/stop on Journal and Paging
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -312,7 +312,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
}
@@ -350,7 +357,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
}
@@ -385,7 +399,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
}
@@ -424,7 +445,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
}
@@ -461,7 +489,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
}
@@ -497,7 +532,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
}
@@ -535,7 +577,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
// We should wait this outside of the lock, to increase throuput
@@ -590,7 +639,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
// We should wait this outside of the lock, to increase throuput
@@ -636,7 +692,14 @@
}
finally
{
- rwlock.readLock().unlock();
+ try
+ {
+ rwlock.readLock().unlock();
+ }
+ catch (Exception ignored)
+ {
+ // This could happen if the thread was interrupted
+ }
}
// We should wait this outside of the lock, to increase throuput
@@ -1513,37 +1576,50 @@
public synchronized void stop() throws Exception
{
+ trace("Stopping the journal");
+
if (state == STATE_STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
}
- if (currentFile != null)
+ positionLock.acquire();
+ rwlock.writeLock().lock();
+
+ try
{
- currentFile.getFile().close();
- }
+ filesExecutor.shutdown();
- filesExecutor.shutdown();
+ if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
+ {
+ log.warn("Couldn't stop journal executor after 60 seconds");
+ }
- if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
- {
- log.warn("Couldn't stop journal executor after 60 seconds");
- }
+ if (currentFile != null)
+ {
+ currentFile.getFile().close();
+ }
- for (JournalFile file : openedFiles)
- {
- file.getFile().close();
- }
+ for (JournalFile file : openedFiles)
+ {
+ file.getFile().close();
+ }
- currentFile = null;
+ currentFile = null;
- dataFiles.clear();
+ dataFiles.clear();
- freeFiles.clear();
+ freeFiles.clear();
- openedFiles.clear();
+ openedFiles.clear();
- state = STATE_STOPPED;
+ state = STATE_STOPPED;
+ }
+ finally
+ {
+ positionLock.release();
+ rwlock.writeLock().unlock();
+ }
}
// Public
@@ -1855,6 +1931,11 @@
try
{
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("The journal was stopped");
+ }
+
int size = bb.limit();
if (size % currentFile.getFile().getAlignment() != 0)
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -58,7 +58,11 @@
public interface PagingManager extends MessagingComponent
{
+ /** The system is paging because of global-page-mode */
boolean isGlobalPageMode();
+
+ /** During startup PostOffice may set GlobalPageMode as true */
+ void setGlobalPageMode(boolean globalMode);
/** To return the PageStore associated with the address */
PagingStore getPageStore(SimpleString address) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -38,6 +38,8 @@
PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
Executor getPagingExecutor();
+
+ void stop() throws InterruptedException;
void setPagingManager(PagingManager manager);
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -24,7 +24,9 @@
import java.io.File;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
@@ -50,7 +52,7 @@
private final String directory;
- private final Executor executor;
+ private final ExecutorService executor;
private PagingManager pagingManager;
@@ -64,18 +66,18 @@
executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-depaging-threads"));
}
- public PagingManagerFactoryNIO(final String directory, final Executor executor)
- {
- this.directory = directory;
- this.executor = executor;
- }
-
// Public --------------------------------------------------------
public Executor getPagingExecutor()
{
return executor;
}
+
+ public void stop() throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings)
{
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -125,8 +125,13 @@
{
return globalMode.get();
}
+
+ public void setGlobalPageMode(boolean globalMode)
+ {
+ this.globalMode.set(globalMode);
+ }
- //FIXME - this is not thread safe
+ // Synchronization of this method is done per ConcurrentHashMap
public PagingStore getPageStore(final SimpleString storeName) throws Exception
{
PagingStore store = stores.get(storeName);
@@ -261,6 +266,8 @@
}
storageManager.commit(depageTransactionID);
+
+ trace("Depage committed");
for (MessageReference ref : refsToAdd)
{
@@ -341,6 +348,8 @@
public void stop() throws Exception
{
started = false;
+
+ pagingSPI.stop();
for (PagingStore store : stores.values())
{
@@ -454,7 +463,7 @@
{
try
{
- while (globalSize.get() < maxGlobalSize)
+ while (globalSize.get() < maxGlobalSize && started)
{
boolean depaged = false;
// Round robin depaging one page at the time from each
@@ -483,7 +492,7 @@
}
}
- if (globalSize.get() < maxGlobalSize)
+ if (globalSize.get() < maxGlobalSize && started)
{
globalMode.set(false);
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -25,7 +25,9 @@
import java.text.DecimalFormat;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -75,7 +77,7 @@
private final PagingManager pagingManager;
- private final Executor executor;
+ private final ExecutorService executor;
// Bytes consumed by the queue on the memory
private final AtomicLong sizeInBytes = new AtomicLong();
@@ -96,7 +98,7 @@
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private volatile boolean initialized = false;
+ private volatile boolean running = false;
private volatile LastPageRecord lastPageRecord;
@@ -108,7 +110,7 @@
final SequentialFileFactory fileFactory,
final SimpleString storeName,
final QueueSettings queueSettings,
- final Executor executor)
+ final ExecutorService executor)
{
this.fileFactory = fileFactory;
this.storeName = storeName;
@@ -418,18 +420,22 @@
public synchronized boolean isStarted()
{
- return initialized;
+ return running;
}
public synchronized void stop() throws Exception
{
- if (initialized)
+ if (running)
{
lock.writeLock().lock();
try
{
- initialized = false;
+ running = false;
+
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+
if (currentPage != null)
{
currentPage.close();
@@ -445,7 +451,7 @@
public synchronized void start() throws Exception
{
- if (initialized)
+ if (running)
{
// don't throw an exception.
// You could have two threads adding PagingStore to a
@@ -483,7 +489,7 @@
}
}
- initialized = true;
+ running = true;
if (numberOfPages != 0)
{
@@ -635,12 +641,11 @@
{
try
{
- boolean needMorePages = false;
- do
+ boolean needMorePages = true;
+ while (needMorePages && running)
{
needMorePages = readPage();
}
- while (needMorePages);
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -453,13 +454,35 @@
}
storageManager.loadMessages(this, queues, resourceManager);
+
+ // TODO: This is related to http://www.jboss.com/index.html?module=bb&op=viewtopic&t=145597
+ HashSet<SimpleString> addresses = new HashSet<SimpleString>();
+
+ for (Binding binding: bindings)
+ {
+ addresses.add(binding.getAddress());
+ }
+
+ for (SimpleString destination: dests)
+ {
+ addresses.add(destination);
+ }
+
+ // End TODO -------------------------------------
- for (SimpleString destination : dests)
+ for (SimpleString destination : addresses)
{
if (!pagingManager.isGlobalPageMode())
{
PagingStore store = pagingManager.getPageStore(destination);
- store.startDepaging();
+ if (store.isPaging() && store.getMaxSizeBytes() < 0)
+ {
+ pagingManager.setGlobalPageMode(true);
+ }
+ else
+ {
+ store.startDepaging();
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -140,8 +140,8 @@
public void stop() throws Exception
{
remotingService.stop();
+ server.stop();
storageManager.stop();
- server.stop();
}
public MessagingServer getServer()
Deleted: trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -1,193 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.stress.paging;
-
-import java.util.HashMap;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * This is an integration-tests that will take some time to run. TODO: Maybe this test belongs somewhere else?
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- */
-public class MultipleDestinationPagingTest extends IntegrationTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- MessagingService service;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testGlobalPage() throws Exception
- {
- testPage(true);
- }
-
- public void testRegularPage() throws Exception
- {
- testPage(false);
- }
-
- public void testPage(boolean globalPage) throws Exception
- {
- Configuration config = createDefaultConfig();
-
- HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
-
- if (globalPage)
- {
- config.setPagingMaxGlobalSizeBytes(20 * 1024 * 1024);
- QueueSettings setting = new QueueSettings();
- setting.setMaxSizeBytes(-1);
- settings.put("page-adr", setting);
- }
- else
- {
- config.setPagingMaxGlobalSizeBytes(-1);
- QueueSettings setting = new QueueSettings();
- setting.setMaxSizeBytes(20 * 1024 * 1024);
- settings.put("page-adr", setting);
- }
-
- service = createService(true, false, config, settings);
- service.start();
-
- ClientSessionFactory factory = createInVMFactory();
- ClientSession session = null;
-
- try
- {
- session = factory.createSession(false, false, false);
-
- SimpleString address = new SimpleString("page-adr");
- SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
-
- session.createQueue(address, queue[0], null, true, false, true);
- session.createQueue(address, queue[1], null, true, false, true);
-
- ClientProducer prod = session.createProducer(address);
-
- ClientMessage message = createBytesMessage(session, new byte[700], false);
-
- int NUMBER_OF_MESSAGES = 60000;
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
- {
- if (i % 10000 == 0)
- System.out.println(i);
- prod.send(message);
- }
-
- session.commit();
-
- session.start();
-
- int counters[] = new int[2];
-
- ClientConsumer consumers[] = new ClientConsumer[] { session.createConsumer(queue[0]),
- session.createConsumer(queue[1]) };
-
- int reads = 0;
-
- while (true)
- {
- int msgs1 = readMessages(session, consumers[0], queue[0]);
- if (reads++ == 0)
- {
- assertTrue(msgs1 > 0 && msgs1 < NUMBER_OF_MESSAGES);
- }
- int msgs2 = readMessages(session, consumers[1], queue[1]);
- counters[0] += msgs1;
- counters[1] += msgs2;
-
- System.out.println("msgs1 = " + msgs1 + " msgs2 = " + msgs2);
-
- if (msgs1 + msgs2 == 0)
- {
- break;
- }
- }
-
- consumers[0].close();
- consumers[1].close();
-
- assertEquals(NUMBER_OF_MESSAGES, counters[0]);
- assertEquals(NUMBER_OF_MESSAGES, counters[1]);
- }
- finally
- {
- session.close();
- service.stop();
- }
-
- }
-
- private int readMessages(ClientSession session, ClientConsumer consumer, SimpleString queue) throws MessagingException
- {
- session.start();
- int msgs = 0;
-
- ClientMessage msg = null;
- do
- {
- msg = consumer.receive(1000);
- if (msg != null)
- {
- msg.acknowledge();
- if (++msgs % 10000 == 0)
- {
- System.out.println("received " + msgs);
- session.commit();
-
- }
- }
- }
- while (msg != null);
-
- session.commit();
-
- return msgs;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- clearData();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java (from rev 5347, trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -0,0 +1,325 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.stress.paging;
+
+import java.util.HashMap;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * This is an integration-tests that will take some time to run. TODO: Maybe this test belongs somewhere else?
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ */
+public class PageStressTest extends IntegrationTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ MessagingService service;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testStopDuringGlobalDepage() throws Exception
+ {
+ testStopDuringDepage(true);
+ }
+
+ public void testStopDuringRegularDepage() throws Exception
+ {
+ testStopDuringDepage(false);
+ }
+
+
+ public void testStopDuringDepage(boolean globalPage) throws Exception
+ {
+ HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+
+ Configuration config = createConfig(globalPage, settings);
+
+ service = createService(true, false, config, settings);
+ service.start();
+
+ ClientSessionFactory factory = createInVMFactory();
+ factory.setBlockOnAcknowledge(true);
+ ClientSession session = null;
+
+ try
+ {
+
+ final int NUMBER_OF_MESSAGES = 60000;
+
+ session = factory.createSession(null, null, false, false, true, 1024 * NUMBER_OF_MESSAGES);
+
+ SimpleString address = new SimpleString("page-adr");
+
+ session.createQueue(address, address, null, true, false, true);
+
+ ClientProducer prod = session.createProducer(address);
+
+ ClientMessage message = createBytesMessage(session, new byte[700], true);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ if (i % 10000 == 0)
+ System.out.println("Sent " + i);
+ prod.send(message);
+ }
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(address);
+
+
+ int msgs = 0;
+ ClientMessage msg = null;
+ do
+ {
+ msg = consumer.receive(1000);
+ if (msg != null)
+ {
+ msg.acknowledge();
+ if ((++msgs) % 1000 == 0)
+ {
+ System.out.println("Received " + msgs);
+ }
+ }
+ } while (msg != null);
+
+ session.commit();
+
+ session.close();
+
+ service.stop();
+
+ System.out.println("server stopped, nr msgs: " + msgs);
+
+ settings = new HashMap<String, QueueSettings>();
+ config = createConfig(globalPage, settings);
+
+ service = createService(true, false, config, settings);
+ service.start();
+
+
+ factory = createInVMFactory();
+
+ session = factory.createSession(false, false, false);
+
+ consumer = session.createConsumer(address);
+
+ session.start();
+
+ msg = null;
+ do
+ {
+ msg = consumer.receive(1000);
+ if (msg != null)
+ {
+ msg.acknowledge();
+ session.commit();
+ if ((++msgs) % 1000 == 0)
+ {
+ System.out.println("Received " + msgs);
+ }
+ }
+ } while (msg != null);
+
+ System.out.println("msgs second time: " + msgs);
+
+ assertEquals(NUMBER_OF_MESSAGES, msgs);
+ }
+ finally
+ {
+ session.close();
+ service.stop();
+ }
+
+ }
+
+ public void testGlobalPageOnMultipleDestinations() throws Exception
+ {
+ testPageOnMultipleDestinations(true);
+ }
+
+ public void testRegularPageOnMultipleDestinations() throws Exception
+ {
+ testPageOnMultipleDestinations(false);
+ }
+
+ public void testPageOnMultipleDestinations(boolean globalPage) throws Exception
+ {
+ HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+
+ Configuration config = createConfig(globalPage, settings);
+
+ service = createService(true, false, config, settings);
+ service.start();
+
+ ClientSessionFactory factory = createInVMFactory();
+ ClientSession session = null;
+
+ try
+ {
+ session = factory.createSession(false, false, false);
+
+ SimpleString address = new SimpleString("page-adr");
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+
+ session.createQueue(address, queue[0], null, true, false, true);
+ session.createQueue(address, queue[1], null, true, false, true);
+
+ ClientProducer prod = session.createProducer(address);
+
+ ClientMessage message = createBytesMessage(session, new byte[700], false);
+
+ int NUMBER_OF_MESSAGES = 60000;
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ if (i % 10000 == 0)
+ System.out.println(i);
+ prod.send(message);
+ }
+
+ session.commit();
+
+ session.start();
+
+ int counters[] = new int[2];
+
+ ClientConsumer consumers[] = new ClientConsumer[] { session.createConsumer(queue[0]),
+ session.createConsumer(queue[1]) };
+
+ int reads = 0;
+
+ while (true)
+ {
+ int msgs1 = readMessages(session, consumers[0], queue[0]);
+ if (reads++ == 0)
+ {
+ assertTrue(msgs1 > 0 && msgs1 < NUMBER_OF_MESSAGES);
+ }
+ int msgs2 = readMessages(session, consumers[1], queue[1]);
+ counters[0] += msgs1;
+ counters[1] += msgs2;
+
+ System.out.println("msgs1 = " + msgs1 + " msgs2 = " + msgs2);
+
+ if (msgs1 + msgs2 == 0)
+ {
+ break;
+ }
+ }
+
+ consumers[0].close();
+ consumers[1].close();
+
+ assertEquals(NUMBER_OF_MESSAGES, counters[0]);
+ assertEquals(NUMBER_OF_MESSAGES, counters[1]);
+ }
+ finally
+ {
+ session.close();
+ service.stop();
+ }
+
+ }
+
+ private int readMessages(ClientSession session, ClientConsumer consumer, SimpleString queue) throws MessagingException
+ {
+ session.start();
+ int msgs = 0;
+
+ ClientMessage msg = null;
+ do
+ {
+ msg = consumer.receive(1000);
+ if (msg != null)
+ {
+ msg.acknowledge();
+ if (++msgs % 10000 == 0)
+ {
+ System.out.println("received " + msgs);
+ session.commit();
+
+ }
+ }
+ }
+ while (msg != null);
+
+ session.commit();
+
+ return msgs;
+ }
+
+ /**
+ * @param globalPage
+ * @param settings
+ * @return
+ */
+ private Configuration createConfig(boolean globalPage, HashMap<String, QueueSettings> settings)
+ {
+ Configuration config = createDefaultConfig();
+
+ if (globalPage)
+ {
+ config.setPagingMaxGlobalSizeBytes(20 * 1024 * 1024);
+ QueueSettings setting = new QueueSettings();
+ setting.setMaxSizeBytes(-1);
+ settings.put("page-adr", setting);
+ }
+ else
+ {
+ config.setPagingMaxGlobalSizeBytes(-1);
+ QueueSettings setting = new QueueSettings();
+ setting.setMaxSizeBytes(20 * 1024 * 1024);
+ settings.put("page-adr", setting);
+ }
+ return config;
+ }
+
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ clearData();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -121,7 +121,9 @@
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
+
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+ EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
@@ -133,11 +135,11 @@
(Map<Long, Queue>)EasyMock.anyObject(),
(ResourceManager)EasyMock.anyObject());
- EasyMock.replay(pm, qf, binding, queue);
+ EasyMock.replay(pm, pgm, qf, binding, queue);
postOffice.start();
- EasyMock.verify(pm, qf, binding, queue);
+ EasyMock.verify(pm, pgm, qf, binding, queue);
assertTrue(postOffice.isStarted());
assertEquals(postOffice.getBinding(queueName), binding);
@@ -170,6 +172,7 @@
QueueFactory qf = EasyMock.createStrictMock(QueueFactory.class);
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+ EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
@@ -267,6 +270,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+ EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java 2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java 2008-11-13 21:55:14 UTC (rev 5354)
@@ -66,6 +66,7 @@
QueueFactory qf = EasyMock.createStrictMock(QueueFactory.class);
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+ EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
More information about the jboss-cvs-commits
mailing list