[jboss-cvs] JBoss Messaging SVN: r5571 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 2 17:50:41 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-02 17:50:41 -0500 (Fri, 02 Jan 2009)
New Revision: 5571

Modified:
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
JBMESSAGING-1473 & JBMESSAGING-1474 - Fixing tests

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-02 15:55:00 UTC (rev 5570)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-02 22:50:41 UTC (rev 5571)
@@ -351,6 +351,23 @@
          {
             return false;
          }
+         
+         if (duplicateDetection)
+         {
+            //We set the duplicate detection header to prevent the message being depaged more than once in case of failure during depage
+            
+            byte[] bytes = new byte[8];
+            
+            ByteBuffer buff = ByteBuffer.wrap(bytes);
+            
+            ServerMessage msg = message.getMessage(storageManager);
+            
+            buff.putLong(msg.getMessageID());
+            
+            SimpleString duplID = new SimpleString(bytes);
+               
+            message.getMessage(storageManager).putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
+         }
 
          int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
 
@@ -377,22 +394,6 @@
          {
             if (currentPage != null)
             {               
-               if (duplicateDetection)
-               {
-                  //We set the duplicate detection header to prevent the message being depaged more than once in case of failure during depage
-                  
-                  byte[] bytes = new byte[8];
-                  
-                  ByteBuffer buff = ByteBuffer.wrap(bytes);
-                  
-                  ServerMessage msg = message.getMessage(storageManager);
-                  
-                  buff.putLong(msg.getMessageID());
-                  
-                  SimpleString duplID = new SimpleString(bytes);
-                     
-                  message.getMessage(storageManager).putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
-               }
                
                currentPage.write(message);
                
@@ -717,7 +718,10 @@
 
    private void onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> pagedMessages) throws Exception
    {
-      trace("Depaging....");
+      if (isTrace)
+      {
+         trace("Depaging....");
+      }
 
       // Depage has to be done atomically, in case of failure it should be
       // back to where it was
@@ -743,10 +747,7 @@
             // section
             if (pageTransactionInfo == null)
             {
-               if (isTrace)
-               {
-                  trace("Transaction " + pagedMessage.getTransactionID() + " not found, ignoring message " + message);
-               }
+               log.warn("Transaction " + pagedMessage.getTransactionID() + " used during paging not found, ignoring message " + message);
                continue;
             }
 
@@ -754,7 +755,10 @@
             // before the commit arrived
             if (!pageTransactionInfo.waitCompletion())
             {
-               trace("Rollback was called after prepare, ignoring message " + message);
+               if (isTrace)
+               {
+                  trace("Rollback was called after prepare, ignoring message " + message);
+               }
                continue;
             }
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-02 15:55:00 UTC (rev 5570)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-02 22:50:41 UTC (rev 5571)
@@ -834,6 +834,8 @@
                                          final List<PreparedTransactionInfo> preparedTransactions,
                                          final Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception
    {
+      final PagingManager pagingManager = postOffice.getPagingManager();
+      
       // recover prepared transactions
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
       {
@@ -905,6 +907,8 @@
                   pageTransactionInfo.markIncomplete();
 
                   tx.setPageTransaction(pageTransactionInfo);
+                  
+                  pagingManager.addTransaction(pageTransactionInfo);
 
                   break;
                }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-02 15:55:00 UTC (rev 5570)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-02 22:50:41 UTC (rev 5571)
@@ -241,7 +241,7 @@
 
          SimpleString destination = message.getDestination();
 
-         if (!tx.isDepage() && (tx.getPagingAddresses().contains(destination) || pagingManager.isPaging(destination)))
+         if (!tx.isDepage() && !message.isReload() && (tx.getPagingAddresses().contains(destination) || pagingManager.isPaging(destination)))
          {
             tx.addPagingAddress(destination);
 
@@ -253,7 +253,7 @@
 
             boolean first = message.getRefCount() == 1;
             
-            if (message.getRefCount() == 1)
+            if (!message.isReload() &&  message.getRefCount() == 1)
             {
                if (durableRef)
                {
@@ -272,7 +272,7 @@
             {
                ref.setScheduledDeliveryTime(scheduledDeliveryTime);
 
-               if (durableRef)
+               if (durableRef && !message.isReload())
                {
                   storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), ref);
                }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2009-01-02 15:55:00 UTC (rev 5570)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2009-01-02 22:50:41 UTC (rev 5571)
@@ -44,20 +44,15 @@
 
    // Public --------------------------------------------------------
 
-   public void testFoo()
-   {      
+   public void testPageWithNIO() throws Exception
+   {
+      // This integration test could fail 1 in 100 due to race conditions.
+      for (int i = 0; i < 100; i++)
+      {
+         recreateDirectory();
+         testConcurrentPaging(new NIOSequentialFileFactory(getTestDir()), 1);
+      }
    }
-   
-//   public void testPageWithNIO() throws Exception
-//   {
-//      // This integration test could fail 1 in 100 due to race conditions.
-//      for (int i = 0; i < 100; i++)
-//      {
-//         recreateDirectory();
-//         System.out.println("Test " + i);
-//         testConcurrentPaging(new NIOSequentialFileFactory(getTestDir()), 1);
-//      }
-//   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2009-01-02 15:55:00 UTC (rev 5570)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2009-01-02 22:50:41 UTC (rev 5571)
@@ -230,11 +230,10 @@
       testMultipleTxReceiveWithRollback(true);
    }
 
-// Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1473 is complete   
-//   public void testPagingServerRestarted() throws Exception
-//   {
-//      testPaging(true);
-//   }
+   public void testPagingServerRestarted() throws Exception
+   {
+      testPaging(true);
+   }
 
    public void testPaging() throws Exception
    {

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-01-02 15:55:00 UTC (rev 5570)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-01-02 22:50:41 UTC (rev 5571)
@@ -310,13 +310,13 @@
 
    }
 
-//   public void testConcurrentDepage() throws Exception
-//   {
-//      SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
-//
-//      testConcurrentPaging(factory, 10);
-//   }
+   public void testConcurrentDepage() throws Exception
+   {
+      SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
 
+      testConcurrentPaging(factory, 10);
+   }
+
    public void testFoo()
    {
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2009-01-02 15:55:00 UTC (rev 5570)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2009-01-02 22:50:41 UTC (rev 5571)
@@ -88,246 +88,252 @@
       executor.shutdown();
    }
 
-   // Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1474 is complete
-//   protected void testConcurrentPaging(final SequentialFileFactory factory, final int numberOfThreads) throws Exception,
-//                                                                                                      InterruptedException
-//   {
-//
-//      final int MAX_SIZE = 1024 * 10;
-//
-//      final AtomicLong messageIdGenerator = new AtomicLong(0);
-//
-//      final AtomicInteger aliveProducers = new AtomicInteger(numberOfThreads);
-//
-//      final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
-//
-//      final ConcurrentHashMap<Long, PagedMessageImpl> buffers = new ConcurrentHashMap<Long, PagedMessageImpl>();
-//
-//      final ArrayList<Page> readPages = new ArrayList<Page>();
-//
-//      QueueSettings settings = new QueueSettings();
-//      settings.setPageSizeBytes(MAX_SIZE);
-//
-//      final TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
-//                                                                 createStorageManagerMock(),
-//                                                                 createPostOfficeMock(),
-//                                                                 factory,
-//                                                                 new SimpleString("test"),
-//                                                                 settings,
-//                                                                 executor);
-//
-//      storeImpl.start();
-//
-//      assertEquals(0, storeImpl.getNumberOfPages());
-//
-//      storeImpl.startPaging();
-//
-//      assertEquals(1, storeImpl.getNumberOfPages());
-//
-//      final SimpleString destination = new SimpleString("test");
-//
-//      class ProducerThread extends Thread
-//      {
-//
-//         Exception e;
-//
-//         @Override
-//         public void run()
-//         {
-//
-//            try
-//            {
-//               boolean firstTime = true;
-//               while (true)
-//               {
-//                  long id = messageIdGenerator.incrementAndGet();
-//                  PagedMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
-//                  if (storeImpl.page(msg, false, true))
-//                  {
-//                     buffers.put(id, msg);
-//                  }
-//                  else
-//                  {
-//                     break;
-//                  }
-//
-//                  if (firstTime)
-//                  {
-//                     latchStart.countDown();
-//                     firstTime = false;
-//                  }
-//               }
-//            }
-//            catch (Exception e)
-//            {
-//               e.printStackTrace();
-//               this.e = e;
-//            }
-//            finally
-//            {
-//               aliveProducers.decrementAndGet();
-//            }
-//         }
-//      }
-//
-//      class ConsumerThread extends Thread
-//      {
-//         Exception e;
-//
-//         @Override
-//         public void run()
-//         {
-//            try
-//            {
-//               // Wait every producer to produce at least one message
-//               latchStart.await();
-//               while (aliveProducers.get() > 0)
-//               {
-//                  Page page = storeImpl.depage();
-//                  if (page != null)
-//                  {
-//                     readPages.add(page);
-//                  }
-//               }
-//            }
-//            catch (Exception e)
-//            {
-//               e.printStackTrace();
-//               this.e = e;
-//            }
-//         }
-//      }
-//
-//      ProducerThread producerThread[] = new ProducerThread[numberOfThreads];
-//
-//      for (int i = 0; i < numberOfThreads; i++)
-//      {
-//         producerThread[i] = new ProducerThread();
-//         producerThread[i].start();
-//      }
-//
-//      ConsumerThread consumer = new ConsumerThread();
-//      consumer.start();
-//
-//      for (int i = 0; i < numberOfThreads; i++)
-//      {
-//         producerThread[i].join();
-//         if (producerThread[i].e != null)
-//         {
-//            throw producerThread[i].e;
-//         }
-//      }
-//
-//      consumer.join();
-//
-//      if (consumer.e != null)
-//      {
-//         throw consumer.e;
-//      }
-//
-//      System.out.println("Reading " + buffers.size() + " messages, " + readPages.size() + " pages");
-//
-//      final ConcurrentHashMap<Long, PagedMessage> buffers2 = new ConcurrentHashMap<Long, PagedMessage>();
-//
-//      for (Page page : readPages)
-//      {
-//         page.open();
-//         List<PagedMessage> msgs = page.read();
-//         page.close();
-//
-//         for (PagedMessage msg : msgs)
-//         {
-//            (msg.getMessage(null)).getBody().rewind();
-//            long id = (msg.getMessage(null)).getBody().getLong();
-//            (msg.getMessage(null)).getBody().rewind();
-//
-//            PagedMessageImpl msgWritten = buffers.remove(id);
-//            buffers2.put(id, msg);
-//            assertNotNull(msgWritten);
-//            assertEquals((msg.getMessage(null)).getDestination(), (msgWritten.getMessage(null)).getDestination());
-//            assertEqualsByteArrays((msgWritten.getMessage(null)).getBody().array(), (msg.getMessage(null)).getBody()
-//                                                                                                          .array());
-//         }
-//      }
-//
-//      assertEquals(0, buffers.size());
-//
-//      List<String> files = factory.listFiles("page");
-//
-//      assertTrue(files.size() != 0);
-//
-//      for (String file : files)
-//      {
-//         SequentialFile fileTmp = factory.createSequentialFile(file, 1);
-//         fileTmp.open();
-//         assertTrue(fileTmp.size() + " <= " + MAX_SIZE, fileTmp.size() <= MAX_SIZE);
-//         fileTmp.close();
-//      }
-//
-//      TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(), createStorageManagerMock(), createPostOfficeMock(), factory, new SimpleString("test"), settings, executor);
-//      storeImpl2.start();
-//
-//      int numberOfPages = storeImpl2.getNumberOfPages();
-//      assertTrue(numberOfPages != 0);
-//
-//      storeImpl2.startPaging();
-//
-//      storeImpl2.startPaging();
-//
-//      assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
-//
-//      long lastMessageId = messageIdGenerator.incrementAndGet();
-//      PagedMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
-//
-//      storeImpl2.page(lastMsg, false, true);
-//      buffers2.put(lastMessageId, lastMsg);
-//
-//      Page lastPage = null;
-//      while (true)
-//      {
-//         Page page = storeImpl2.depage();
-//         if (page == null)
-//         {
-//            break;
-//         }
-//
-//         lastPage = page;
-//
-//         page.open();
-//
-//         List<PagedMessage> msgs = page.read();
-//
-//         page.close();
-//
-//         for (PagedMessage msg : msgs)
-//         {
-//
-//            (msg.getMessage(null)).getBody().rewind();
-//            long id = (msg.getMessage(null)).getBody().getLong();
-//            PagedMessage msgWritten = buffers2.remove(id);
-//            assertNotNull(msgWritten);
-//            assertEquals((msg.getMessage(null)).getDestination(), (msgWritten.getMessage(null)).getDestination());
-//            assertEqualsByteArrays((msgWritten.getMessage(null)).getBody().array(), (msg.getMessage(null)).getBody()
-//                                                                                                          .array());
-//         }
-//      }
-//
-//      lastPage.open();
-//      List<PagedMessage> lastMessages = lastPage.read();
-//      lastPage.close();
-//      assertEquals(1, lastMessages.size());
-//
-//      (lastMessages.get(0).getMessage(null)).getBody().rewind();
-//      assertEquals((lastMessages.get(0).getMessage(null)).getBody().getLong(), lastMessageId);
-//      assertEqualsByteArrays((lastMessages.get(0).getMessage(null)).getBody().array(), (lastMsg.getMessage(null)).getBody()
-//                                                                                                             .array());
-//
-//      assertEquals(0, buffers2.size());
-//      
-//      assertEquals(0, storeImpl.getAddressSize());
-//
-//   }
+   protected void testConcurrentPaging(final SequentialFileFactory factory, final int numberOfThreads) throws Exception,
+                                                                                                      InterruptedException
+   {
 
+      final int MAX_SIZE = 1024 * 10;
+
+      final AtomicLong messageIdGenerator = new AtomicLong(0);
+
+      final AtomicInteger aliveProducers = new AtomicInteger(numberOfThreads);
+
+      final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
+
+      final ConcurrentHashMap<Long, PagedMessageImpl> buffers = new ConcurrentHashMap<Long, PagedMessageImpl>();
+
+      final ArrayList<Page> readPages = new ArrayList<Page>();
+
+      QueueSettings settings = new QueueSettings();
+      settings.setPageSizeBytes(MAX_SIZE);
+
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+                                                                 createStorageManagerMock(),
+                                                                 createPostOfficeMock(),
+                                                                 factory,
+                                                                 new SimpleString("test"),
+                                                                 settings,
+                                                                 executor);
+
+      storeImpl.start();
+
+      assertEquals(0, storeImpl.getNumberOfPages());
+
+      storeImpl.startPaging();
+
+      assertEquals(1, storeImpl.getNumberOfPages());
+
+      final SimpleString destination = new SimpleString("test");
+
+      class ProducerThread extends Thread
+      {
+
+         Exception e;
+
+         @Override
+         public void run()
+         {
+
+            try
+            {
+               boolean firstTime = true;
+               while (true)
+               {
+                  long id = messageIdGenerator.incrementAndGet();
+                  PagedMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
+                  if (storeImpl.page(msg, false, true))
+                  {
+                     buffers.put(id, msg);
+                  }
+                  else
+                  {
+                     break;
+                  }
+
+                  if (firstTime)
+                  {
+                     latchStart.countDown();
+                     firstTime = false;
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+               this.e = e;
+            }
+            finally
+            {
+               aliveProducers.decrementAndGet();
+            }
+         }
+      }
+
+      class ConsumerThread extends Thread
+      {
+         Exception e;
+
+         @Override
+         public void run()
+         {
+            try
+            {
+               // Wait every producer to produce at least one message
+               latchStart.await();
+               while (aliveProducers.get() > 0)
+               {
+                  Page page = storeImpl.depage();
+                  if (page != null)
+                  {
+                     readPages.add(page);
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+               this.e = e;
+            }
+         }
+      }
+
+      ProducerThread producerThread[] = new ProducerThread[numberOfThreads];
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         producerThread[i] = new ProducerThread();
+         producerThread[i].start();
+      }
+
+      ConsumerThread consumer = new ConsumerThread();
+      consumer.start();
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         producerThread[i].join();
+         if (producerThread[i].e != null)
+         {
+            throw producerThread[i].e;
+         }
+      }
+
+      consumer.join();
+
+      if (consumer.e != null)
+      {
+         throw consumer.e;
+      }
+
+      final ConcurrentHashMap<Long, PagedMessage> buffers2 = new ConcurrentHashMap<Long, PagedMessage>();
+
+      for (Page page : readPages)
+      {
+         page.open();
+         List<PagedMessage> msgs = page.read();
+         page.close();
+
+         for (PagedMessage msg : msgs)
+         {
+            msg.getMessage(null).getBody().rewind();
+            long id = msg.getMessage(null).getBody().getLong();
+            msg.getMessage(null).getBody().rewind();
+
+            PagedMessageImpl msgWritten = buffers.remove(id);
+            buffers2.put(id, msg);
+            assertNotNull(msgWritten);
+            assertEquals(msg.getMessage(null).getDestination(), msgWritten.getMessage(null).getDestination());
+            assertEqualsByteArrays(msgWritten.getMessage(null).getBody().array(), msg.getMessage(null)
+                                                                                     .getBody()
+                                                                                     .array());
+         }
+      }
+
+      assertEquals(0, buffers.size());
+
+      List<String> files = factory.listFiles("page");
+
+      assertTrue(files.size() != 0);
+
+      for (String file : files)
+      {
+         SequentialFile fileTmp = factory.createSequentialFile(file, 1);
+         fileTmp.open();
+         assertTrue(fileTmp.size() + " <= " + MAX_SIZE, fileTmp.size() <= MAX_SIZE);
+         fileTmp.close();
+      }
+
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(),
+                                                            createStorageManagerMock(),
+                                                            createPostOfficeMock(),
+                                                            factory,
+                                                            new SimpleString("test"),
+                                                            settings,
+                                                            executor);
+      storeImpl2.start();
+
+      int numberOfPages = storeImpl2.getNumberOfPages();
+      assertTrue(numberOfPages != 0);
+
+      storeImpl2.startPaging();
+
+      storeImpl2.startPaging();
+
+      assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
+
+      long lastMessageId = messageIdGenerator.incrementAndGet();
+      PagedMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
+
+      storeImpl2.page(lastMsg, false, true);
+      buffers2.put(lastMessageId, lastMsg);
+
+      Page lastPage = null;
+      while (true)
+      {
+         Page page = storeImpl2.depage();
+         if (page == null)
+         {
+            break;
+         }
+
+         lastPage = page;
+
+         page.open();
+
+         List<PagedMessage> msgs = page.read();
+
+         page.close();
+
+         for (PagedMessage msg : msgs)
+         {
+
+            msg.getMessage(null).getBody().rewind();
+            long id = msg.getMessage(null).getBody().getLong();
+            PagedMessage msgWritten = buffers2.remove(id);
+            assertNotNull(msgWritten);
+            assertEquals(msg.getMessage(null).getDestination(), msgWritten.getMessage(null).getDestination());
+            assertEqualsByteArrays(msgWritten.getMessage(null).getBody().array(), msg.getMessage(null)
+                                                                                     .getBody()
+                                                                                     .array());
+         }
+      }
+
+      lastPage.open();
+      List<PagedMessage> lastMessages = lastPage.read();
+      lastPage.close();
+      assertEquals(1, lastMessages.size());
+
+      lastMessages.get(0).getMessage(null).getBody().rewind();
+      assertEquals(lastMessages.get(0).getMessage(null).getBody().getLong(), lastMessageId);
+      assertEqualsByteArrays(lastMessages.get(0).getMessage(null).getBody().array(), lastMsg.getMessage(null)
+                                                                                            .getBody()
+                                                                                            .array());
+
+      assertEquals(0, buffers2.size());
+
+      assertEquals(0, storeImpl.getAddressSize());
+
+   }
+
    protected PagedMessageImpl createMessage(final SimpleString destination, final ByteBuffer buffer)
    {
       ServerMessage msg = new ServerMessageImpl((byte)1,
@@ -360,19 +366,18 @@
    protected PagingManager createMockManager()
    {
       PagingManager mockManager = EasyMock.createNiceMock(PagingManager.class);
-      EasyMock.expect(mockManager.getDefaultPageSize()).andStubReturn(ConfigurationImpl.DEFAULT_PAGE_SIZE);
+      org.easymock.EasyMock.expect(mockManager.getDefaultPageSize()).andStubReturn(ConfigurationImpl.DEFAULT_PAGE_SIZE);
       EasyMock.replay(mockManager);
       return mockManager;
    }
-   
+
    protected StorageManager createStorageManagerMock()
    {
       StorageManager storageManager = EasyMock.createNiceMock(StorageManager.class);
       EasyMock.replay(storageManager);
       return storageManager;
    }
-   
-   
+
    protected PostOffice createPostOfficeMock()
    {
       PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);




More information about the jboss-cvs-commits mailing list