[jboss-cvs] JBoss Messaging SVN: r5597 - in trunk: src/main/org/jboss/messaging/core/server/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 7 23:24:06 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-07 23:24:06 -0500 (Wed, 07 Jan 2009)
New Revision: 5597
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/
trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
Adding crash tests on Paging
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-08 04:09:34 UTC (rev 5596)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-08 04:24:06 UTC (rev 5597)
@@ -722,6 +722,37 @@
// Protected -----------------------------------------------------
+ // In order to test failures, we need to be able to extend this class
+ // and replace the Page for another Page that will fail before the file is removed
+ // That's why createPage is not a private method
+ protected Page createPage(final int page) throws Exception
+ {
+ String fileName = createFileName(page);
+
+ if (fileFactory == null)
+ {
+ fileFactory = storeFactory.newFileFactory(this.getStoreName());
+ }
+
+ SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
+
+ file.open();
+
+ long size = file.size();
+
+ if (fileFactory.isSupportsCallbacks() && size < pageSize)
+ {
+ file.fill((int)size, (int)(pageSize - size), (byte)0);
+ }
+
+ file.position(0);
+
+ file.close();
+
+ return new PageImpl(fileFactory, file, page);
+ }
+
+
// Private -------------------------------------------------------
/**
@@ -897,33 +928,6 @@
}
}
- private Page createPage(final int page) throws Exception
- {
- String fileName = createFileName(page);
-
- if (fileFactory == null)
- {
- fileFactory = storeFactory.newFileFactory(this.getStoreName());
- }
-
- SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
-
- file.open();
-
- long size = file.size();
-
- if (fileFactory.isSupportsCallbacks() && size < pageSize)
- {
- file.fill((int)size, (int)(pageSize - size), (byte)0);
- }
-
- file.position(0);
-
- file.close();
-
- return new PageImpl(fileFactory, file, page);
- }
-
/**
*
* Note: Decimalformat is not thread safe, Use synchronization before calling this method
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-08 04:09:34 UTC (rev 5596)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-08 04:24:06 UTC (rev 5597)
@@ -216,13 +216,8 @@
new JBMThreadFactory("JBM-scheduled-threads"));
queueFactory = new BindableFactoryImpl(scheduledExecutor, queueSettingsRepository, storageManager);
- pagingManager = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
- configuration.getPagingMaxThreads()),
- storageManager,
- queueSettingsRepository,
- configuration.getPagingMaxGlobalSizeBytes(),
- configuration.getPagingDefaultSize(),
- configuration.isJournalSyncNonTransactional());
+ pagingManager = createPagingManager();
+
pagingManager.start();
resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
@@ -670,7 +665,22 @@
// Protected
// ------------------------------------------------------------------------------------
+
+ /**
+ * Method could be replaced for test purposes
+ */
+ protected PagingManager createPagingManager()
+ {
+ return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ configuration.getPagingMaxThreads()),
+ storageManager,
+ queueSettingsRepository,
+ configuration.getPagingMaxGlobalSizeBytes(),
+ configuration.getPagingDefaultSize(),
+ configuration.isJournalSyncNonTransactional());
+ }
+
// Private
// --------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-08 04:09:34 UTC (rev 5596)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-08 04:24:06 UTC (rev 5597)
@@ -198,7 +198,8 @@
startedTx = true;
}
- // TODO we can avoid these lookups in the Queue since all messsages in the Queue will be for the same store
+ // There is no way to cache the Store, since a Queue may belong to multiple addresses,
+ // so we aways need this lookup
PagingStore store = pagingManager.getPageStore(message.getDestination());
if (tx == null)
Added: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java 2009-01-08 04:24:06 UTC (rev 5597)
@@ -0,0 +1,127 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.paging;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.integration.paging.remote.RemotePageCrashExecution;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.tests.util.SpawnedVMSupport;
+
+/**
+ * This test will make sure that a failing depage won't cause duplicated messages
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Jan 7, 2009 6:19:43 PM
+ *
+ *
+ */
+public class PageCrashTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCrashDuringDeleteFile() throws Exception
+ {
+ clearData();
+
+ Process process = SpawnedVMSupport.spawnVM(RemotePageCrashExecution.class.getCanonicalName());
+ process.waitFor();
+ assertEquals(1, process.exitValue());
+
+ File pageDir = new File(getPageDir());
+
+ File directories[] = pageDir.listFiles();
+
+ assertEquals(1, directories.length);
+
+ // When depage happened, a new empty page was supposed to be opened, what will create 3 files
+ assertEquals("Missing a file, supposed to have address.txt, 1st page and 2nd page",
+ 3,
+ directories[0].list().length);
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(RemotePageCrashExecution.ADDRESS);
+
+ assertNull(consumer.receive(200));
+
+ session.close();
+ }
+ finally
+ {
+ messagingService.stop();
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
___________________________________________________________________
Name: svn:mergeinfo
+
Added: trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java 2009-01-08 04:24:06 UTC (rev 5597)
@@ -0,0 +1,425 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.paging.remote;
+
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
+import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.security.JBMSecurityManager;
+import org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RemotePageCrashExecution
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Jan 7, 2009 6:41:41 PM
+ *
+ *
+ */
+public class RemotePageCrashExecution extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ private static final Logger log = Logger.getLogger(RemotePageCrashExecution.class);
+
+ public static void main(final String arg[])
+ {
+ try
+ {
+ RemotePageCrashExecution execution = new RemotePageCrashExecution();
+ execution.pageAndFail();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void pageAndFail() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService service = newMessagingService(config);
+
+ service.start();
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(1024);
+
+ ClientMessage message = null;
+
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ PagingStore store = service.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+ int messages = 0;
+ while (!store.isPaging())
+ {
+ producer.send(message);
+ messages++;
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ messages++;
+ producer.send(message);
+ }
+
+ session.close();
+
+ assertTrue(service.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < messages; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+ }
+
+ consumer.close();
+
+ session.close();
+
+ assertEquals(0, service.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+ }
+ finally
+ {
+ try
+ {
+ service.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private MessagingServiceImpl newMessagingService(final Configuration configuration)
+ {
+
+ StorageManager storageManager = new JournalStorageManager(configuration);
+
+ RemotingService remotingService = new RemotingServiceImpl(configuration);
+
+ JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
+
+ ManagementService managementService = new ManagementServiceImpl(ManagementFactory.getPlatformMBeanServer(), false);
+
+ remotingService.setManagementService(managementService);
+
+ MessagingServer server = new FailingMessagingServiceImpl();
+
+ server.setConfiguration(configuration);
+
+ server.setStorageManager(storageManager);
+
+ server.setRemotingService(remotingService);
+
+ server.setSecurityManager(securityManager);
+
+ server.setManagementService(managementService);
+
+ return new MessagingServiceImpl(server, storageManager, remotingService);
+ }
+
+ // Inner classes -------------------------------------------------
+
+ /** This is hacking MessagingServerImpl,
+ * to make sure the server will fail right
+ * after before the page-file was removed */
+ class FailingMessagingServiceImpl extends MessagingServerImpl
+ {
+ /**
+ * Method could be replaced for test purposes
+ */
+ @Override
+ protected PagingManager createPagingManager()
+ {
+ return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
+ super.getConfiguration().getPagingMaxThreads()),
+ super.getStorageManager(),
+ super.getQueueSettingsRepository(),
+ super.getConfiguration().getPagingMaxGlobalSizeBytes(),
+ super.getConfiguration().getPagingDefaultSize(),
+ super.getConfiguration().isJournalSyncNonTransactional());
+ }
+
+ class FailurePagingStoreFactoryNIO extends PagingStoreFactoryNIO
+
+ {
+ /**
+ * @param directory
+ * @param maxThreads
+ */
+ public FailurePagingStoreFactoryNIO(final String directory, final int maxThreads)
+ {
+ super(directory, maxThreads);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public synchronized PagingStore newStore(final SimpleString destinationName, final QueueSettings settings) throws Exception
+ {
+ Field factoryField = PagingStoreFactoryNIO.class.getDeclaredField("executorFactory");
+ factoryField.setAccessible(true);
+
+ OrderedExecutorFactory factory = (OrderedExecutorFactory)factoryField.get(this);
+ return new FailingPagingStore(destinationName, settings, factory.getExecutor());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ class FailingPagingStore extends PagingStoreImpl
+ {
+
+ /**
+ * @param pagingManager
+ * @param storageManager
+ * @param postOffice
+ * @param fileFactory
+ * @param storeFactory
+ * @param storeName
+ * @param queueSettings
+ * @param executor
+ */
+ public FailingPagingStore(final SimpleString storeName,
+ final QueueSettings queueSettings,
+ final Executor executor)
+ {
+ super(getPostOffice().getPagingManager(),
+ getStorageManager(),
+ getPostOffice(),
+ null,
+ FailurePagingStoreFactoryNIO.this,
+ storeName,
+ queueSettings,
+ executor);
+ }
+
+ @Override
+ protected Page createPage(final int page) throws Exception
+ {
+
+ Page originalPage = super.createPage(page);
+
+ return new FailingPage(originalPage);
+ }
+
+ }
+
+ }
+
+ class FailingPage implements Page
+ {
+ Page delegatedPage;
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#close()
+ */
+ public void close() throws Exception
+ {
+ delegatedPage.close();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#delete()
+ */
+ public void delete() throws Exception
+ {
+ // We want the system to fail
+ System.out.println("Crash");
+ System.out.flush(); // System.exit may not let the System.out to be seen if flush is not called
+ System.exit(1);
+ }
+
+ /**
+ * @return
+ * @see org.jboss.messaging.core.paging.Page#getNumberOfMessages()
+ */
+ public int getNumberOfMessages()
+ {
+ return delegatedPage.getNumberOfMessages();
+ }
+
+ /**
+ * @return
+ * @see org.jboss.messaging.core.paging.Page#getPageId()
+ */
+ public int getPageId()
+ {
+ return delegatedPage.getPageId();
+ }
+
+ /**
+ * @return
+ * @see org.jboss.messaging.core.paging.Page#getSize()
+ */
+ public int getSize()
+ {
+ return delegatedPage.getSize();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#open()
+ */
+ public void open() throws Exception
+ {
+ delegatedPage.open();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#read()
+ */
+ public List<PagedMessage> read() throws Exception
+ {
+ return delegatedPage.read();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#sync()
+ */
+ public void sync() throws Exception
+ {
+ delegatedPage.sync();
+ }
+
+ /**
+ * @param message
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#write(org.jboss.messaging.core.paging.PagedMessage)
+ */
+ public void write(final PagedMessage message) throws Exception
+ {
+ delegatedPage.write(message);
+ }
+
+ public FailingPage(final Page delegatePage)
+ {
+ delegatedPage = delegatePage;
+ }
+ }
+
+ }
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java
___________________________________________________________________
Name: svn:mergeinfo
+
More information about the jboss-cvs-commits
mailing list