[hornetq-commits] JBoss hornetq SVN: r11012 - in branches/Branch_2_2_AS7: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 20 21:16:46 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-07-20 21:16:45 -0400 (Wed, 20 Jul 2011)
New Revision: 11012

Added:
   branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
Modified:
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
 HORNETQ-725, HORNETQ-744, HORNETQ-743 - a few paging fixes

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-07-21 01:16:45 UTC (rev 11012)
@@ -89,6 +89,10 @@
 
    public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent)
    {
+      if (log.isDebugEnabled())
+      {
+         log.debug(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception ("trace"));
+      }
       PageSubscription activeCursor = activeCursors.get(cursorID);
       if (activeCursor != null)
       {
@@ -330,6 +334,11 @@
             {
                return;
             }
+            
+            if (log.isDebugEnabled())
+            {
+               log.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
+            }
 
             ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
             cursorList.addAll(activeCursors.values());
@@ -344,9 +353,21 @@
                {
                   if (!cursor.isComplete(minPage))
                   {
+                     if (log.isDebugEnabled())
+                     {
+                        log.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
+                     }
+                     
                      complete = false;
                      break;
                   }
+                  else
+                  {
+                     if (log.isDebugEnabled())
+                     {
+                        log.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
+                     }
+                  }
                }
 
                if (complete)
@@ -516,12 +537,21 @@
       for (PageSubscription cursor : cursorList)
       {
          long firstPage = cursor.getFirstPage();
+         if (log.isDebugEnabled())
+         {
+            log.debug(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
+         }
          if (firstPage < minPage)
          {
             minPage = firstPage;
          }
       }
 
+      if (log.isDebugEnabled())
+      {
+         log.debug(this.pagingStore.getAddress() + " has minPage=" + minPage);
+      }
+
       return minPage;
 
    }

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-07-21 01:16:45 UTC (rev 11012)
@@ -308,7 +308,7 @@
    @Override
    public String toString()
    {
-      return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + "]";
+      return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]";
    }
 
 
@@ -648,22 +648,42 @@
          Collections.sort(recoveredACK);
 
          boolean first = true;
+         
+         long txDeleteCursorOnReload = -1;
 
          for (PagePosition pos : recoveredACK)
          {
             lastAckedPosition = pos;
-            PageCursorInfo positions = getPageInfo(pos);
-            if (first)
+            PageCursorInfo pageInfo = getPageInfo(pos);
+            
+            if (pageInfo == null)
             {
-               first = false;
-               if (pos.getMessageNr() > 0)
+               log.warn("Couldn't find page cache for page " + pos + ", removing it from the journal");
+               if (txDeleteCursorOnReload == -1)
                {
-                  positions.confirmed.addAndGet(pos.getMessageNr());
+                  txDeleteCursorOnReload = store.generateUniqueID();
                }
+               store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload, pos.getRecordID());
+             }
+            else
+            {
+               if (first)
+               {
+                  first = false;
+                  if (pos.getMessageNr() > 0)
+                  {
+                     pageInfo.confirmed.addAndGet(pos.getMessageNr());
+                  }
+               }
+   
+               pageInfo.addACK(pos);
             }
-
-            positions.addACK(pos);
          }
+         
+         if (txDeleteCursorOnReload >= 0)
+         {
+            store.commit(txDeleteCursorOnReload);
+         }
 
          recoveredACK.clear();
          recoveredACK = null;
@@ -723,6 +743,10 @@
       if (create && pageInfo == null)
       {
          PageCache cache = cursorProvider.getPageCache(pos);
+         if (cache == null)
+         {
+            return null;
+         }
          pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
          consumedPages.put(pos.getPageNr(), pageInfo);
       }

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-21 01:16:45 UTC (rev 11012)
@@ -1582,28 +1582,31 @@
       {
          queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
          
-         Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
-
-         PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+         if (queueBindingInfo.getFilterString() == null || !queueBindingInfo.getFilterString().toString().equals(GENERIC_IGNORED_FILTER))
+         {
+            Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
+   
+            PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+            
+            Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
+                                                   queueBindingInfo.getAddress(),
+                                                   queueBindingInfo.getQueueName(),
+                                                   filter,
+                                                   subscription,
+                                                   true,
+                                                   false);
+   
+            Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
+   
+            queues.put(queueBindingInfo.getId(), queue);
+   
+            postOffice.addBinding(binding);
+   
+            managementService.registerAddress(queueBindingInfo.getAddress());
+            managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
+         }
          
-         Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
-                                                queueBindingInfo.getAddress(),
-                                                queueBindingInfo.getQueueName(),
-                                                filter,
-                                                subscription,
-                                                true,
-                                                false);
-
-         Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
-
-         queues.put(queueBindingInfo.getId(), queue);
-
-         postOffice.addBinding(binding);
-
-         managementService.registerAddress(queueBindingInfo.getAddress());
-         managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
          
-         
       }
 
       for (GroupingInfo groupingInfo : groupingInfos)

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-07-21 01:16:45 UTC (rev 11012)
@@ -260,7 +260,7 @@
 
             // If updateDeliveries = false (set by strict-update),
             // the updateDeliveryCount would still be updated after c
-            if (strictUpdateDeliveryCount)
+            if (strictUpdateDeliveryCount && !ref.isPaged())
             {
                if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
                {

Added: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java	                        (rev 0)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java	2011-07-21 01:16:45 UTC (rev 11012)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+public class JMSPagingFileDeleteTest extends JMSTestBase
+{
+   static Logger log = Logger.getLogger(JMSPagingFileDeleteTest.class);
+   
+   Topic topic1;
+
+   Connection connection;
+
+   Session session;
+
+   MessageConsumer subscriber1;
+
+   MessageConsumer subscriber2;
+
+   PagingStore pagingStore;
+
+   private static final int MESSAGE_SIZE = 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   private static final int PAGE_MAX = 20 * 1024;
+
+   private static final int RECEIVE_TIMEOUT = 500;
+
+   private static final int MESSAGE_NUM = 50;
+
+   @Override
+   protected boolean usePersistence()
+   {
+      return true;
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      clearData();
+      super.setUp();
+
+      topic1 = createTopic("topic1");
+
+      // Paging Setting
+      AddressSettings setting = new AddressSettings();
+      setting.setPageSizeBytes(JMSPagingFileDeleteTest.PAGE_SIZE);
+      setting.setMaxSizeBytes(JMSPagingFileDeleteTest.PAGE_MAX);
+      server.getAddressSettingsRepository().addMatch("#", setting);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      topic1 = null;
+      super.tearDown();
+   }
+   
+   public void testTopics() throws Exception
+   {
+      connection = null;
+
+      try
+      {
+         connection = cf.createConnection();
+         connection.setClientID("cid");
+
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(topic1);
+         subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+         subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+
+         // -----------------(Step1) Publish Messages to make Paging Files. --------------------
+         System.out.println("---------- Send messages. ----------");
+         BytesMessage bytesMessage = session.createBytesMessage();
+         bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+         for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+         {
+            producer.send(bytesMessage);
+         }
+         System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
+
+         pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+         printPageStoreInfo(pagingStore);
+
+         assertTrue(pagingStore.isPaging());
+
+         connection.start();
+
+         // -----------------(Step2) Restart the server. --------------------------------------
+         stopAndStartServer(); // If try this test without restarting server, please comment out this line;
+
+         // -----------------(Step3) Subscribe to all the messages from the topic.--------------
+         System.out.println("---------- Receive all messages. ----------");
+         for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+         {
+            Message message1 = subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+            assertNotNull(message1);
+            Message message2 = subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+            assertNotNull(message2);
+         }
+
+         pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+         long timeout = System.currentTimeMillis() + 5000;
+         while (timeout > System.currentTimeMillis() && pagingStore.isPaging())
+         {
+            Thread.sleep(100);
+         }
+         assertFalse(pagingStore.isPaging());
+         
+         printPageStoreInfo(pagingStore);
+
+         assertEquals(0, pagingStore.getAddressSize());
+         // assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+         assertFalse(pagingStore.isPaging()); // I expected IsPaging is false, but It was true.
+         // If the server is not restart, this test pass.
+
+         // -----------------(Step4) Publish a message. the message is stored in the paging file.
+         producer = session.createProducer(topic1);
+         bytesMessage = session.createBytesMessage();
+         bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+         producer.send(bytesMessage);
+
+         printPageStoreInfo(pagingStore);
+
+         assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+      }
+      finally
+      {
+         if (connection != null)
+         {
+            connection.close();
+         }
+      }
+   }
+
+   private void stopAndStartServer() throws Exception
+   {
+      System.out.println("---------- Restart server. ----------");
+      connection.close();
+
+      jmsServer.stop();
+
+      jmsServer.start();
+      jmsServer.activated();
+      registerConnectionFactory();
+
+      printPageStoreInfo(pagingStore);
+      reconnect();
+   }
+
+   private void reconnect() throws Exception
+   {
+      connection = cf.createConnection();
+      connection.setClientID("cid");
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+      subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+      connection.start();
+   }
+
+   private void printPageStoreInfo(PagingStore pagingStore) throws Exception
+   {
+      System.out.println("---------- Paging Store Info ----------");
+      System.out.println(" CurrentPage = " + pagingStore.getCurrentPage());
+      System.out.println(" FirstPage = " + pagingStore.getFirstPage());
+      System.out.println(" Number of Pages = " + pagingStore.getNumberOfPages());
+      System.out.println(" Address Size = " + pagingStore.getAddressSize());
+      System.out.println(" Is Paging = " + pagingStore.isPaging());
+   }
+}
\ No newline at end of file

Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-07-21 01:16:45 UTC (rev 11012)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -497,6 +498,248 @@
 
    }
 
+   /**
+    * This test will remove all the page directories during a restart, simulating a crash scenario. The server should still start after this
+    */
+   public void testDeletePhisicalPages() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+      config.setPersistDeliveryCountBeforeDelivery(true);
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 1000;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+         session.close();
+
+         session = null;
+
+         sf.close();
+         locator.close();
+
+         server.stop();
+
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+
+         locator = createInVMNonHALocator();
+         sf = locator.createSessionFactory();
+
+         Queue queue = server.locateQueue(ADDRESS);
+
+         assertEquals(numberOfMessages, queue.getMessageCount());
+
+         LinkedList<Xid> xids = new LinkedList<Xid>();
+
+         int msgReceived = 0;
+         ClientSession sessionConsumer = sf.createSession(false, false, false);
+         sessionConsumer.start();
+         ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+         for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+         {
+            log.info("Received " + msgCount);
+            msgReceived++;
+            ClientMessage msg = consumer.receiveImmediate();
+            if (msg == null)
+            {
+               log.info("It's null. leaving now");
+               sessionConsumer.commit();
+               fail("Didn't receive a message");
+            }
+            msg.acknowledge();
+
+            if (msgCount % 5 == 0)
+            {
+               log.info("commit");
+               sessionConsumer.commit();
+            }
+         }
+
+         sessionConsumer.commit();
+
+         sessionConsumer.close();
+
+         sf.close();
+
+         locator.close();
+
+         assertEquals(0, queue.getMessageCount());
+
+         long timeout = System.currentTimeMillis() + 5000;
+         while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+         {
+            Thread.sleep(100);
+         }
+         assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+
+         server.stop();
+
+         // Deleting the paging data. Simulating a failure
+         // a dumb user, or anything that will remove the data
+         deleteDirectory(new File(getPageDir()));
+
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+         
+         
+         locator = createInVMNonHALocator();
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         sf = locator.createSessionFactory();
+
+         queue = server.locateQueue(ADDRESS);
+
+         sf = locator.createSessionFactory();
+         session = sf.createSession(false, false, false);
+
+         producer = session.createProducer(PagingTest.ADDRESS);
+         
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+         
+         session.commit();
+         
+         server.stop();
+
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+
+         locator = createInVMNonHALocator();
+         sf = locator.createSessionFactory();
+
+         queue = server.locateQueue(ADDRESS);
+
+        // assertEquals(numberOfMessages, queue.getMessageCount());
+
+         xids = new LinkedList<Xid>();
+
+         msgReceived = 0;
+         sessionConsumer = sf.createSession(false, false, false);
+         sessionConsumer.start();
+         consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+         for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+         {
+            log.info("Received " + msgCount);
+            msgReceived++;
+            ClientMessage msg = consumer.receiveImmediate();
+            if (msg == null)
+            {
+               log.info("It's null. leaving now");
+               sessionConsumer.commit();
+               fail("Didn't receive a message");
+            }
+            msg.acknowledge();
+
+            if (msgCount % 5 == 0)
+            {
+               log.info("commit");
+               sessionConsumer.commit();
+            }
+         }
+
+         sessionConsumer.commit();
+
+         sessionConsumer.close();
+
+
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testMissingTXEverythingAcked() throws Exception
    {
       clearData();
@@ -1281,7 +1524,7 @@
             sf.close();
             locator.close();
          }
-         
+
          server = createServer(true,
                                config,
                                PagingTest.PAGE_SIZE,
@@ -1353,7 +1596,6 @@
          t.start();
          t.join();
 
-         
          assertEquals(0, errors.get());
 
          for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
@@ -1361,9 +1603,8 @@
             // The delete may be asynchronous, giving some time case it eventually happen asynchronously
             Thread.sleep(500);
          }
-         
-         assertFalse (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
 
+         assertFalse(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
 
          for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
          {
@@ -1372,7 +1613,6 @@
          }
 
          assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-         
 
       }
       finally



More information about the hornetq-commits mailing list