[jboss-cvs] JBoss Messaging SVN: r5597 - in trunk: src/main/org/jboss/messaging/core/server/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 7 23:24:06 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-07 23:24:06 -0500 (Wed, 07 Jan 2009)
New Revision: 5597

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java
Modified:
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
Adding crash tests on Paging

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-08 04:09:34 UTC (rev 5596)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-08 04:24:06 UTC (rev 5597)
@@ -722,6 +722,37 @@
 
    // Protected -----------------------------------------------------
 
+   // In order to test failures, we need to be able to extend this class 
+   // and replace the Page for another Page that will fail before the file is removed
+   // That's why createPage is not a private method
+   protected Page createPage(final int page) throws Exception
+   {
+      String fileName = createFileName(page);
+
+      if (fileFactory == null)
+      {
+         fileFactory = storeFactory.newFileFactory(this.getStoreName());
+      }
+
+      SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
+
+      file.open();
+
+      long size = file.size();
+
+      if (fileFactory.isSupportsCallbacks() && size < pageSize)
+      {
+         file.fill((int)size, (int)(pageSize - size), (byte)0);
+      }
+
+      file.position(0);
+
+      file.close();
+
+      return new PageImpl(fileFactory, file, page);
+   }
+
+
    // Private -------------------------------------------------------
 
    /**
@@ -897,33 +928,6 @@
       }
    }
 
-   private Page createPage(final int page) throws Exception
-   {
-      String fileName = createFileName(page);
-
-      if (fileFactory == null)
-      {
-         fileFactory = storeFactory.newFileFactory(this.getStoreName());
-      }
-
-      SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
-
-      file.open();
-
-      long size = file.size();
-
-      if (fileFactory.isSupportsCallbacks() && size < pageSize)
-      {
-         file.fill((int)size, (int)(pageSize - size), (byte)0);
-      }
-
-      file.position(0);
-
-      file.close();
-
-      return new PageImpl(fileFactory, file, page);
-   }
-
    /**
     * 
     * Note: Decimalformat is not thread safe, Use synchronization before calling this method

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-08 04:09:34 UTC (rev 5596)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-08 04:24:06 UTC (rev 5597)
@@ -216,13 +216,8 @@
                                                           new JBMThreadFactory("JBM-scheduled-threads"));
       queueFactory = new BindableFactoryImpl(scheduledExecutor, queueSettingsRepository, storageManager);
 
-      pagingManager = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
-                                                                      configuration.getPagingMaxThreads()),
-                                            storageManager,
-                                            queueSettingsRepository,
-                                            configuration.getPagingMaxGlobalSizeBytes(),
-                                            configuration.getPagingDefaultSize(),
-                                            configuration.isJournalSyncNonTransactional());
+      pagingManager = createPagingManager();
+      
       pagingManager.start();
 
       resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
@@ -670,7 +665,22 @@
 
    // Protected
    // ------------------------------------------------------------------------------------
+   
+   /**
+    * Method could be replaced for test purposes 
+    */
+   protected PagingManager createPagingManager()
+   {
+      return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+                                                                      configuration.getPagingMaxThreads()),
+                                            storageManager,
+                                            queueSettingsRepository,
+                                            configuration.getPagingMaxGlobalSizeBytes(),
+                                            configuration.getPagingDefaultSize(),
+                                            configuration.isJournalSyncNonTransactional());
+   }
 
+
    // Private
    // --------------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-08 04:09:34 UTC (rev 5596)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-08 04:24:06 UTC (rev 5597)
@@ -198,7 +198,8 @@
          startedTx = true;
       }
       
-      // TODO we can avoid these lookups in the Queue since all messsages in the Queue will be for the same store
+      // There is no way to cache the Store, since a Queue may belong to multiple addresses,
+      // so we aways need this lookup
       PagingStore store = pagingManager.getPageStore(message.getDestination());
 
       if (tx == null)

Added: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java	2009-01-08 04:24:06 UTC (rev 5597)
@@ -0,0 +1,127 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.paging;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.integration.paging.remote.RemotePageCrashExecution;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.tests.util.SpawnedVMSupport;
+
+/**
+ * This test will make sure that a failing depage won't cause duplicated messages
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Jan 7, 2009 6:19:43 PM
+ *
+ *
+ */
+public class PageCrashTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testCrashDuringDeleteFile() throws Exception
+   {
+      clearData();
+
+      Process process = SpawnedVMSupport.spawnVM(RemotePageCrashExecution.class.getCanonicalName());
+      process.waitFor();
+      assertEquals(1, process.exitValue());
+
+      File pageDir = new File(getPageDir());
+
+      File directories[] = pageDir.listFiles();
+
+      assertEquals(1, directories.length);
+
+      // When depage happened, a new empty page was supposed to be opened, what will create 3 files
+      assertEquals("Missing a file, supposed to have address.txt, 1st page and 2nd page",
+                   3,
+                   directories[0].list().length);
+
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+      messagingService.start();
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(RemotePageCrashExecution.ADDRESS);
+
+         assertNull(consumer.receive(200));
+
+         session.close();
+      }
+      finally
+      {
+         messagingService.stop();
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Added: trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java	2009-01-08 04:24:06 UTC (rev 5597)
@@ -0,0 +1,425 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.paging.remote;
+
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
+import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.security.JBMSecurityManager;
+import org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RemotePageCrashExecution
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Jan 7, 2009 6:41:41 PM
+ *
+ *
+ */
+public class RemotePageCrashExecution extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   private static final Logger log = Logger.getLogger(RemotePageCrashExecution.class);
+
+   public static void main(final String arg[])
+   {
+      try
+      {
+         RemotePageCrashExecution execution = new RemotePageCrashExecution();
+         execution.pageAndFail();
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void pageAndFail() throws Exception
+   {
+      clearData();
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService service = newMessagingService(config);
+
+      service.start();
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ByteBuffer ioBuffer = ByteBuffer.allocate(1024);
+
+         ClientMessage message = null;
+
+         MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+         message = session.createClientMessage(true);
+         message.setBody(bodyLocal);
+
+         PagingStore store = service.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+         int messages = 0;
+         while (!store.isPaging())
+         {
+            producer.send(message);
+            messages++;
+         }
+
+         for (int i = 0; i < 2; i++)
+         {
+            messages++;
+            producer.send(message);
+         }
+
+         session.close();
+
+         assertTrue(service.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
+
+         session = sf.createSession(null, null, false, true, true, false, 0);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < messages; i++)
+         {
+            ClientMessage message2 = consumer.receive(10000);
+
+            assertNotNull(message2);
+
+            message2.acknowledge();
+         }
+
+         consumer.close();
+
+         session.close();
+
+         assertEquals(0, service.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+      }
+      finally
+      {
+         try
+         {
+            service.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private MessagingServiceImpl newMessagingService(final Configuration configuration)
+   {
+
+      StorageManager storageManager = new JournalStorageManager(configuration);
+
+      RemotingService remotingService = new RemotingServiceImpl(configuration);
+
+      JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
+
+      ManagementService managementService = new ManagementServiceImpl(ManagementFactory.getPlatformMBeanServer(), false);
+
+      remotingService.setManagementService(managementService);
+
+      MessagingServer server = new FailingMessagingServiceImpl();
+
+      server.setConfiguration(configuration);
+
+      server.setStorageManager(storageManager);
+
+      server.setRemotingService(remotingService);
+
+      server.setSecurityManager(securityManager);
+
+      server.setManagementService(managementService);
+
+      return new MessagingServiceImpl(server, storageManager, remotingService);
+   }
+
+   // Inner classes -------------------------------------------------
+
+   /** This is hacking MessagingServerImpl, 
+    *  to make sure the server will fail right 
+    *  after before the page-file was removed */
+   class FailingMessagingServiceImpl extends MessagingServerImpl
+   {
+      /**
+       * Method could be replaced for test purposes 
+       */
+      @Override
+      protected PagingManager createPagingManager()
+      {
+         return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
+                                                                       super.getConfiguration().getPagingMaxThreads()),
+                                      super.getStorageManager(),
+                                      super.getQueueSettingsRepository(),
+                                      super.getConfiguration().getPagingMaxGlobalSizeBytes(),
+                                      super.getConfiguration().getPagingDefaultSize(),
+                                      super.getConfiguration().isJournalSyncNonTransactional());
+      }
+
+      class FailurePagingStoreFactoryNIO extends PagingStoreFactoryNIO
+
+      {
+         /**
+          * @param directory
+          * @param maxThreads
+          */
+         public FailurePagingStoreFactoryNIO(final String directory, final int maxThreads)
+         {
+            super(directory, maxThreads);
+         }
+
+         // Constants -----------------------------------------------------
+
+         // Attributes ----------------------------------------------------
+
+         // Static --------------------------------------------------------
+
+         // Constructors --------------------------------------------------
+
+         // Public --------------------------------------------------------
+
+         @Override
+         public synchronized PagingStore newStore(final SimpleString destinationName, final QueueSettings settings) throws Exception
+         {
+            Field factoryField = PagingStoreFactoryNIO.class.getDeclaredField("executorFactory");
+            factoryField.setAccessible(true);
+
+            OrderedExecutorFactory factory = (OrderedExecutorFactory)factoryField.get(this);
+            return new FailingPagingStore(destinationName, settings, factory.getExecutor());
+         }
+
+         // Package protected ---------------------------------------------
+
+         // Protected -----------------------------------------------------
+
+         // Private -------------------------------------------------------
+
+         // Inner classes -------------------------------------------------
+         class FailingPagingStore extends PagingStoreImpl
+         {
+
+            /**
+             * @param pagingManager
+             * @param storageManager
+             * @param postOffice
+             * @param fileFactory
+             * @param storeFactory
+             * @param storeName
+             * @param queueSettings
+             * @param executor
+             */
+            public FailingPagingStore(final SimpleString storeName,
+                                      final QueueSettings queueSettings,
+                                      final Executor executor)
+            {
+               super(getPostOffice().getPagingManager(),
+                     getStorageManager(),
+                     getPostOffice(),
+                     null,
+                     FailurePagingStoreFactoryNIO.this,
+                     storeName,
+                     queueSettings,
+                     executor);
+            }
+
+            @Override
+            protected Page createPage(final int page) throws Exception
+            {
+
+               Page originalPage = super.createPage(page);
+
+               return new FailingPage(originalPage);
+            }
+
+         }
+
+      }
+
+      class FailingPage implements Page
+      {
+         Page delegatedPage;
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#close()
+          */
+         public void close() throws Exception
+         {
+            delegatedPage.close();
+         }
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#delete()
+          */
+         public void delete() throws Exception
+         {
+            // We want the system to fail
+            System.out.println("Crash");
+            System.out.flush(); // System.exit may not let the System.out to be seen if flush is not called
+            System.exit(1);
+         }
+
+         /**
+          * @return
+          * @see org.jboss.messaging.core.paging.Page#getNumberOfMessages()
+          */
+         public int getNumberOfMessages()
+         {
+            return delegatedPage.getNumberOfMessages();
+         }
+
+         /**
+          * @return
+          * @see org.jboss.messaging.core.paging.Page#getPageId()
+          */
+         public int getPageId()
+         {
+            return delegatedPage.getPageId();
+         }
+
+         /**
+          * @return
+          * @see org.jboss.messaging.core.paging.Page#getSize()
+          */
+         public int getSize()
+         {
+            return delegatedPage.getSize();
+         }
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#open()
+          */
+         public void open() throws Exception
+         {
+            delegatedPage.open();
+         }
+
+         /**
+          * @return
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#read()
+          */
+         public List<PagedMessage> read() throws Exception
+         {
+            return delegatedPage.read();
+         }
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#sync()
+          */
+         public void sync() throws Exception
+         {
+            delegatedPage.sync();
+         }
+
+         /**
+          * @param message
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#write(org.jboss.messaging.core.paging.PagedMessage)
+          */
+         public void write(final PagedMessage message) throws Exception
+         {
+            delegatedPage.write(message);
+         }
+
+         public FailingPage(final Page delegatePage)
+         {
+            delegatedPage = delegatePage;
+         }
+      }
+
+   }
+
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/paging/remote/RemotePageCrashExecution.java
___________________________________________________________________
Name: svn:mergeinfo
   + 




More information about the jboss-cvs-commits mailing list