[jboss-cvs] JBoss Messaging SVN: r5459 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 3 20:40:14 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-03 20:40:14 -0500 (Wed, 03 Dec 2008)
New Revision: 5459

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/paging/Page.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Paging tweaks

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -78,6 +78,6 @@
 
    long size() throws Exception;
    
-   void renameTo(SequentialFile file) throws Exception;
+   void renameTo(String newFileName) throws Exception;
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -182,7 +182,7 @@
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.journal.SequentialFile#renameTo(org.jboss.messaging.core.journal.SequentialFile)
     */
-   public void renameTo(SequentialFile file) throws Exception
+   public void renameTo(String fileName) throws Exception
    {
       throw new IllegalStateException ("method rename not supported on AIO");
       

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -1213,7 +1213,7 @@
                   }
                   else
                   {
-                     log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
+                     log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
                      tx.invalid = true;
                   }
 
@@ -1779,7 +1779,7 @@
 
          if (counter == null)
          {
-             for (JournalFile lookupFile : orderedFiles)
+            for (JournalFile lookupFile : orderedFiles)
             {
                if (lookupFile.getOrderingID() == ref.a)
                {
@@ -1796,7 +1796,7 @@
          }
          else
          {
-            // (V) Missing a record... Transaction was not completed as stated.
+            // (IV) Missing a record... Transaction was not completed as stated.
             // we will ignore the whole transaction
             // This is probably a hole caused by a crash during commit/prepare.
             if (counter.get() != ref.b)

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -43,17 +43,21 @@
 public class NIOSequentialFile implements SequentialFile
 {
    private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
+
    private File file;
 
+   private final String directory;
+
    private FileChannel channel;
 
    private RandomAccessFile rfile;
 
    BufferCallback bufferCallback;
 
-   public NIOSequentialFile(final String journalDir, final String fileName)
+   public NIOSequentialFile(final String directory, final String fileName)
    {
-       this.file = new File(journalDir + "/" + fileName);
+      this.directory = directory;
+      file = new File(directory + "/" + fileName);
    }
 
    public int getAlignment()
@@ -70,7 +74,7 @@
    {
       return file.getName();
    }
-   
+
    public synchronized boolean isOpen()
    {
       return channel != null;
@@ -114,7 +118,7 @@
    }
 
    public void close() throws Exception
-   {      
+   {
       if (channel != null)
       {
          channel.close();
@@ -210,8 +214,7 @@
          throw e;
       }
    }
-   
-   
+
    public void sync() throws Exception
    {
       channel.force(false);
@@ -232,20 +235,18 @@
       return channel.position();
    }
 
-   public void renameTo(SequentialFile newFile) throws Exception
+   public void renameTo(final String newFileName) throws Exception
    {
       close();
-      this.file.renameTo(((NIOSequentialFile)newFile).file);
-      file = ((NIOSequentialFile)newFile).file;
+      File newFile = new File(directory + "/" + newFileName);
+      file.renameTo(newFile);
+      file = newFile;
    }
-   
-   
-   
+
+   @Override
    public String toString()
    {
-      return "NIOSequentialFile " + this.file;
+      return "NIOSequentialFile " + file;
    }
 
-   
-   
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/Page.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/Page.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.paging;
 
+import java.util.List;
+
 /**
  * 
  * <p>Look at the <a href="http://wiki.jboss.org/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
@@ -36,7 +38,7 @@
 
    void write(PagedMessage message) throws Exception;
 
-   PagedMessage[] read() throws Exception;
+   List<PagedMessage> read() throws Exception;
 
    int getSize();
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -87,6 +87,7 @@
    /**
     * Page, only if destination is in page mode.
     * @param message
+    * @param sync - Sync should be called right after the write
     * @return false if destination is not on page mode
     */
    boolean page(ServerMessage message) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -61,9 +61,7 @@
 
    void sync() throws Exception;
 
-   public boolean readPage() throws Exception;
-
-   boolean page(PagedMessage message) throws Exception;
+   boolean page(PagedMessage message, boolean sync) throws Exception;
    
    /**
     * 

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -27,10 +27,12 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagedMessage;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
@@ -44,6 +46,8 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(PageImpl.class);
+
    public static final int SIZE_RECORD = SIZE_BYTE + SIZE_INT + SIZE_BYTE;
 
    private static final byte START_BYTE = (byte)'{';
@@ -54,6 +58,8 @@
 
    private final int pageId;
 
+   private boolean suspiciousRecords = false;
+
    private final AtomicInteger numberOfMessages = new AtomicInteger(0);
 
    private final SequentialFile file;
@@ -82,7 +88,7 @@
       return pageId;
    }
 
-   public PagedMessage[] read() throws Exception
+   public List<PagedMessage> read() throws Exception
    {
       ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
 
@@ -108,23 +114,31 @@
                {
                   PagedMessage msg = new PagedMessageImpl();
                   msg.decode(messageBuffer);
+                  if (buffer.get() != END_BYTE)
+                  {
+                     // Sanity Check: This would only happen if there is a bug on decode or any internal code, as this
+                     // constraint was already checked
+                     throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE");
+                  }
                   messages.add(msg);
                }
                else
                {
-                  buffer.position(position + 1);
+                  markFileAsSuspect(position, messages.size());
+                  break;
                }
             }
          }
          else
          {
-            buffer.position(position + 1);
+            markFileAsSuspect(position, messages.size());
+            break;
          }
       }
 
       numberOfMessages.set(messages.size());
 
-      return messages.toArray(new PagedMessage[messages.size()]);
+      return messages;
    }
 
    public void write(final PagedMessage message) throws Exception
@@ -136,7 +150,7 @@
       buffer.put(END_BYTE);
       buffer.rewind();
 
-      file.write(buffer, false);      
+      file.write(buffer, false);
 
       numberOfMessages.incrementAndGet();
       size.addAndGet(buffer.limit());
@@ -144,7 +158,7 @@
 
    public void sync() throws Exception
    {
-      file.sync();      
+      file.sync();
    }
 
    public void open() throws Exception
@@ -161,7 +175,18 @@
 
    public void delete() throws Exception
    {
-      file.delete();
+      if (suspiciousRecords)
+      {
+         log.warn("File " + file.getFileName() +
+                  " being renamed to " +
+                  file.getFileName() +
+                  ".invalidPage as it was loaded partially. Please verify your data.");
+         file.renameTo(file.getFileName() + ".invalidPage");
+      }
+      else
+      {
+         file.delete();
+      }
    }
 
    public int getNumberOfMessages()
@@ -180,5 +205,15 @@
 
    // Private -------------------------------------------------------
 
+   /**
+    * @param position
+    * @param msgNumber
+    */
+   private void markFileAsSuspect(final int position, final int msgNumber)
+   {
+      log.warn("Page file had incomplete records at position " + position + " at record number " + msgNumber);
+      suspiciousRecords = true;
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -73,6 +73,8 @@
 
    private final long defaultPageSize;
 
+   private final boolean syncNonTransactional;
+
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
 
    // Static
@@ -80,9 +82,6 @@
 
    private static final Logger log = Logger.getLogger(PagingManagerImpl.class);
 
-   // private static final boolean isTrace = log.isTraceEnabled();
-   private static final boolean isTrace = true;
-
    // This is just a debug tool method.
    // During debugs you could make log.trace as log.info, and change the
    // variable isTrace above
@@ -99,13 +98,15 @@
                             final StorageManager storageManager,
                             final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                             final long maxGlobalSize,
-                            final long defaultPageSize)
+                            final long defaultPageSize,
+                            final boolean syncNonTransactional)
    {
       this.pagingSPI = pagingSPI;
       this.queueSettingsRepository = queueSettingsRepository;
       this.storageManager = storageManager;
       this.defaultPageSize = defaultPageSize;
       this.maxGlobalSize = maxGlobalSize;
+      this.syncNonTransactional = syncNonTransactional;
    }
 
    // Public
@@ -119,7 +120,7 @@
       return globalMode.get();
    }
 
-   public void setGlobalPageMode(boolean globalMode)
+   public void setGlobalPageMode(final boolean globalMode)
    {
       this.globalMode.set(globalMode);
    }
@@ -179,7 +180,7 @@
    public void setLastPageRecord(final LastPageRecord lastPage) throws Exception
    {
       trace("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
-      
+
       getPageStore(lastPage.getDestination()).setLastPageRecord(lastPage);
    }
 
@@ -200,29 +201,32 @@
 
    public boolean page(final ServerMessage message, final long transactionId) throws Exception
    {
-      return getPageStore(message.getDestination()).page(new PagedMessageImpl(message, transactionId));
+      // The sync on transactions is done on commit only
+      return getPageStore(message.getDestination()).page(new PagedMessageImpl(message, transactionId), false);
    }
 
    public boolean page(final ServerMessage message) throws Exception
    {
-      return getPageStore(message.getDestination()).page(new PagedMessageImpl(message));
+      // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
+      // of crash
+      return getPageStore(message.getDestination()).page(new PagedMessageImpl(message),
+                                                         syncNonTransactional && message.isDurable());
    }
 
    public void addTransaction(final PageTransactionInfo pageTransaction)
    {
       transactions.put(pageTransaction.getTransactionID(), pageTransaction);
    }
-   
+
    public void removeTransaction(final long id)
    {
       transactions.remove(id);
    }
-   
+
    public PageTransactionInfo getTransaction(final long id)
    {
       return transactions.get(id);
    }
-   
 
    public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
    {
@@ -246,11 +250,11 @@
       {
          return;
       }
-      
+
       pagingSPI.setPagingManager(this);
-      
+
       pagingSPI.setStorageManager(storageManager);
-      
+
       started = true;
    }
 
@@ -260,7 +264,7 @@
       {
          return;
       }
-      
+
       started = false;
 
       for (PagingStore store : stores.values())
@@ -270,10 +274,10 @@
 
       pagingSPI.stop();
    }
-   
+
    public synchronized void startGlobalDepage()
    {
-      for (PagingStore store: stores.values())
+      for (PagingStore store : stores.values())
       {
          store.startDepaging(pagingSPI.getGlobalDepagerExecutor());
       }
@@ -285,19 +289,17 @@
     */
    public long getGlobalSize()
    {
-      return this.globalSize.get();
+      return globalSize.get();
    }
-   
 
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.paging.PagingManager#addGlobalSize(long)
     */
-   public long addGlobalSize(long size)
+   public long addGlobalSize(final long size)
    {
       return globalSize.addAndGet(size);
    }
 
-
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.paging.PagingManager#getMaxGlobalSize()
     */
@@ -305,7 +307,6 @@
    {
       return maxGlobalSize;
    }
-   
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -90,8 +90,7 @@
    // Bytes consumed by the queue on the memory
    private final AtomicLong sizeInBytes = new AtomicLong();
 
-   //FIXME - don't call this a thread - it's a Runnable not a Thread
-   private volatile Runnable dequeueThread;
+   private volatile Runnable depageAction;
 
    private volatile int numberOfPages;
 
@@ -153,11 +152,11 @@
 
       if (queueSettings.getPageSizeBytes() != null)
       {
-         this.pageSize = queueSettings.getPageSizeBytes();
+         pageSize = queueSettings.getPageSizeBytes();
       }
       else
       {
-         this.pageSize = pagingManager.getDefaultPageSize();
+         pageSize = pagingManager.getDefaultPageSize();
       }
 
       dropMessagesWhenFull = queueSettings.isDropMessagesWhenFull();
@@ -215,39 +214,6 @@
       return storeName;
    }
 
-   /**
-    * Depage one page-file, read it and send it to the pagingManager / postoffice
-    * @return
-    * @throws Exception
-    */
-   // FIXME - why is this public?
-   public boolean readPage() throws Exception
-   {
-      Page page = depage();
-
-      if (page == null)
-      {
-         if (lastPageRecord != null)
-         {
-            clearLastPageRecord(lastPageRecord);
-         }
-
-         lastPageRecord = null;
-
-         return false;
-      }
-
-      page.open();
-
-      PagedMessage messages[] = page.read();
-
-      boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
-
-      page.delete();
-
-      return addressNotFull;
-   }
-
    /** 
     *  It returns a Page out of the Page System without reading it. 
     *  The method calling this method will remove the page and will start reading it outside of any locks. 
@@ -269,6 +235,9 @@
             numberOfPages--;
 
             final Page returnPage;
+
+            // We are out of old pages, all that is left now is the current page.
+            // On that case we need to replace it by a new empty page, and return the current page immediately
             if (currentPageId == firstPageId)
             {
                firstPageId = Integer.MAX_VALUE;
@@ -285,14 +254,19 @@
                   throw new IllegalStateException("CurrentPage is null");
                }
 
+               // The current page is empty... what means we achieved the end of the pages
                if (returnPage.getNumberOfMessages() == 0)
                {
                   returnPage.open();
                   returnPage.delete();
+
+                  // This will trigger this Destination to exit the page mode,
+                  // and this will make JBM start using the journal again
                   return null;
                }
                else
                {
+                  // We need to create a new page, as we can't lock the address until we finish depaging.
                   openNewPage();
                }
 
@@ -409,21 +383,21 @@
       }
    }
 
-   public boolean page(final PagedMessage message) throws Exception
+   public boolean page(final PagedMessage message, final boolean sync) throws Exception
    {
-      
+
       if (!running)
       {
-         throw new IllegalStateException ("PagingStore(" + this.getStoreName() + ") not initialized");
+         throw new IllegalStateException("PagingStore(" + getStoreName() + ") not initialized");
       }
-      
-      // Max-size is set, but reject is activated, what means.. never page on
-      // this address
+
+      // We should never page when drop-messages is activated.
       if (dropMessagesWhenFull)
       {
          return false;
       }
 
+      // We need to ensure a read lock, as depage could change the paging state
       currentPageLock.readLock().lock();
 
       try
@@ -457,6 +431,8 @@
             try
             {
                openNewPage();
+
+               // openNewPage will set pageUsedSize to zero, we need to set it again
                pageUsedSize.addAndGet(bytesToWrite);
             }
             finally
@@ -472,6 +448,10 @@
             if (currentPage != null)
             {
                currentPage.write(message);
+               if (sync)
+               {
+                  currentPage.sync();
+               }
                return true;
             }
             else
@@ -489,6 +469,7 @@
       {
          writeLock.unlock();
       }
+
    }
 
    public void sync() throws Exception
@@ -508,13 +489,12 @@
       }
    }
 
-
    public boolean startDepaging()
    {
       return startDepaging(executor);
    }
 
-   public boolean startDepaging(Executor executor)
+   public boolean startDepaging(final Executor executor)
    {
       currentPageLock.readLock().lock();
       try
@@ -527,10 +507,10 @@
          {
             synchronized (this)
             {
-               if (dequeueThread == null)
+               if (depageAction == null)
                {
-                  dequeueThread = new DepageRunnable(executor);
-                  executor.execute(dequeueThread);
+                  depageAction = new DepageRunnable(executor);
+                  executor.execute(depageAction);
                   return true;
                }
                else
@@ -573,7 +553,7 @@
          try
          {
             running = false;
-            
+
             if (currentPage != null)
             {
                currentPage.close();
@@ -606,7 +586,7 @@
          else
          {
             currentPageLock.writeLock().lock();
-            
+
             fileFactory.createDirs();
 
             firstPageId = Integer.MAX_VALUE;
@@ -659,7 +639,7 @@
       {
          return false;
       }
-      
+
       // First check without any global locks.
       // (Faster)
       currentPageLock.readLock().lock();
@@ -719,7 +699,7 @@
     * If persistent messages are also used, it will update eventual PageTransactions
     */
 
-   private boolean onDepage(final int pageId, final SimpleString destination, final PagedMessage[] data) throws Exception
+   private boolean onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> data) throws Exception
    {
       trace("Depaging....");
 
@@ -756,7 +736,7 @@
       {
          ServerMessage pagedMessage = null;
 
-         pagedMessage = (ServerMessage)msg.getMessage(storageManager);
+         pagedMessage = msg.getMessage(storageManager);
 
          final long transactionIdDuringPaging = msg.getTransactionID();
 
@@ -846,7 +826,7 @@
 
    private synchronized void clearDequeueThread()
    {
-      dequeueThread = null;
+      depageAction = null;
    }
 
    private void openNewPage() throws Exception
@@ -927,13 +907,45 @@
       return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
    }
 
+   /**
+    * Depage one page-file, read it and send it to the pagingManager / postoffice
+    * @return
+    * @throws Exception
+    */
+   private boolean readPage() throws Exception
+   {
+      Page page = depage();
+
+      if (page == null)
+      {
+         if (lastPageRecord != null)
+         {
+            clearLastPageRecord(lastPageRecord);
+         }
+
+         lastPageRecord = null;
+
+         return false;
+      }
+
+      page.open();
+
+      List<PagedMessage> messages = page.read();
+
+      boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
+
+      page.delete();
+
+      return addressNotFull;
+   }
+
    // Inner classes -------------------------------------------------
 
    private class DepageRunnable implements Runnable
    {
       private final Executor followingExecutor;
-      
-      public DepageRunnable(Executor followingExecutor)
+
+      public DepageRunnable(final Executor followingExecutor)
       {
          this.followingExecutor = followingExecutor;
       }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -46,7 +46,7 @@
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(JournalLargeMessageImpl.class);
-   
+
    private static boolean isTrace = log.isTraceEnabled();
 
    // Attributes ----------------------------------------------------
@@ -55,14 +55,14 @@
 
    // We should only use the NIO implementation on the Journal
    private volatile SequentialFile file;
-   
+
    private volatile boolean complete = false;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public JournalLargeMessageImpl(JournalStorageManager storageManager)
+   public JournalLargeMessageImpl(final JournalStorageManager storageManager)
    {
       this.storageManager = storageManager;
    }
@@ -187,7 +187,7 @@
 
    public void deleteFile() throws MessagingException
    {
-      this.storageManager.deleteFile(file);
+      storageManager.deleteFile(file);
    }
 
    @Override
@@ -196,15 +196,15 @@
       // The body won't be on memory (aways on-file), so we don't consider this for paging
       return super.getPropertiesEncodeSize();
    }
-   
+
    public synchronized void complete() throws Exception
    {
       releaseResources();
-      
+
       if (!complete)
       {
-         SequentialFile fileToRename = storageManager.createFileForLargeMessage(this.getMessageID(), true);
-         file.renameTo(fileToRename);         
+         SequentialFile fileToRename = storageManager.createFileForLargeMessage(getMessageID(), true);
+         file.renameTo(fileToRename.getFileName());
       }
    }
 
@@ -233,17 +233,16 @@
    {
       if (file == null)
       {
-         if (this.messageID <= 0)
+         if (messageID <= 0)
          {
             throw new RuntimeException("MessageID not set on LargeMessage");
          }
-         
-         file = storageManager.createFileForLargeMessage(this.getMessageID(), complete);
-         
+
+         file = storageManager.createFileForLargeMessage(getMessageID(), complete);
+
       }
    }
 
-
    // Inner classes -------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -214,7 +214,8 @@
                                             storageManager,
                                             queueSettingsRepository,
                                             configuration.getPagingMaxGlobalSizeBytes(),
-                                            configuration.getPagingDefaultSize());
+                                            configuration.getPagingDefaultSize(),
+                                            configuration.isJournalSyncNonTransactional());
       pagingManager.start();
 
       resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -50,6 +50,11 @@
       testAdd(new NIOSequentialFileFactory(getTestDir()), 1000);
    }
 
+   public void testDamagedDataWithNIO() throws Exception
+   {
+      testDamagedPage(new NIOSequentialFileFactory(getTestDir()), 1000);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -24,6 +24,7 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagedMessage;
@@ -60,7 +61,7 @@
 
    // Public --------------------------------------------------------
 
-   public void testPagingManagerNIO() throws Exception
+   public void testPagingManager() throws Exception
    {
       HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
       queueSettings.setDefault(new QueueSettings());
@@ -69,7 +70,8 @@
                                                             null,
                                                             queueSettings,
                                                             -1,
-                                                            1024 * 1024);
+                                                            1024 * 1024,
+                                                            true);
 
       managerImpl.start();
 
@@ -77,31 +79,32 @@
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
 
-      assertFalse(store.page(new PagedMessageImpl(msg)));
+      assertFalse(store.page(new PagedMessageImpl(msg), true));
 
       store.startPaging();
 
-      assertTrue(store.page(new PagedMessageImpl(msg)));
+      assertTrue(store.page(new PagedMessageImpl(msg), true));
 
       Page page = store.depage();
 
       page.open();
 
-      PagedMessage msgs[] = page.read();
+      List<PagedMessage> msgs = page.read();
 
       page.close();
 
-      assertEquals(1, msgs.length);
+      assertEquals(1, msgs.size());
 
-      assertEqualsByteArrays(msg.getBody().array(), (msgs[0].getMessage(null)).getBody().array());
+      assertEqualsByteArrays(msg.getBody().array(), (msgs.get(0).getMessage(null)).getBody().array());
 
       assertTrue(store.isPaging());
 
       assertNull(store.depage());
 
-      assertFalse(store.page(new PagedMessageImpl(msg)));
+      assertFalse(store.page(new PagedMessageImpl(msg), true));
    }
 
+
    public void testPagingManagerAddressFull() throws Exception
    {
       HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
@@ -117,7 +120,8 @@
                                                             null,
                                                             queueSettings,
                                                             -1,
-                                                            1024 * 1024);
+                                                            1024 * 1024,
+                                                            false);
       managerImpl.start();
 
       managerImpl.createPageStore(new SimpleString("simple-test"));

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -24,8 +24,6 @@
 
 import java.io.File;
 
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.tests.unit.core.paging.impl.PagingStoreTestBase;
 
@@ -46,18 +44,6 @@
 
    // Public --------------------------------------------------------
 
-   public void testPageStoreWithAIO() throws Exception
-   {
-      if (!AsynchronousFileImpl.isLoaded())
-      {
-         fail(String.format("libAIO is not loaded on %s %s %s",
-                            System.getProperty("os.name"),
-                            System.getProperty("os.arch"),
-                            System.getProperty("os.version")));
-      }
-      testConcurrentPaging(new AIOSequentialFileFactory(getTestDir()), 10);
-   }
-
    public void testPageWithNIO() throws Exception
    {
       // This integration test could fail 1 in 100 due to race conditions.

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -304,7 +304,7 @@
    {
       private volatile boolean open;
 
-      private final String fileName;
+      private String fileName;
 
       private ByteBuffer data;
 
@@ -557,9 +557,11 @@
       /* (non-Javadoc)
        * @see org.jboss.messaging.core.journal.SequentialFile#renameTo(org.jboss.messaging.core.journal.SequentialFile)
        */
-      public void renameTo(SequentialFile file) throws Exception
+      public void renameTo(String newFileName) throws Exception
       {
-         throw new IllegalStateException("Method rename not supoprted on FakeSequentialFile");
+         fileMap.remove(this.fileName);
+         this.fileName = newFileName;
+         fileMap.put(newFileName, this);
       }
 
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -46,6 +46,12 @@
       testAdd(new FakeSequentialFileFactory(1, false), 10);
    }
 
+   /** Validate if everything we add is recovered */
+   public void testDamagedPage() throws Exception
+   {
+      testDamagedPage(new FakeSequentialFileFactory(1, false), 100);
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -24,6 +24,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -73,65 +74,156 @@
 
       assertEquals(1, factory.listFiles("page").size());
 
-      ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-
       SimpleString simpleDestination = new SimpleString("Test");
 
-      for (int i = 0; i < numberOfElements; i++)
-      {
-         ByteBuffer buffer = ByteBuffer.allocate(10);
+      ArrayList<ByteBuffer> buffers = addPageElements(simpleDestination, impl, numberOfElements);
 
-         for (int j = 0; j < buffer.limit(); j++)
-         {
-            buffer.put(RandomUtil.randomByte());
-         }
+      impl.sync();
+      impl.close();
 
-         buffers.add(buffer);
+      file = factory.createSequentialFile("00010.page", 1);
+      file.open();
+      impl = new PageImpl(factory, file, 10);
 
-         ServerMessage msg = new ServerMessageImpl((byte)1,
-                                                   true,
-                                                   0,
-                                                   System.currentTimeMillis(),
-                                                   (byte)0,
-                                                   new ByteBufferWrapper(buffer));
+      List<PagedMessage> msgs = impl.read();
 
-         msg.setMessageID(i);
+      assertEquals(numberOfElements, msgs.size());
 
-         msg.setDestination(simpleDestination);
+      assertEquals(numberOfElements, impl.getNumberOfMessages());
 
-         impl.write(new PagedMessageImpl(msg));
+      for (int i = 0; i < msgs.size(); i++)
+      {
+         assertEquals(i, (msgs.get(i).getMessage(null)).getMessageID());
 
-         assertEquals(i + 1, impl.getNumberOfMessages());
+         assertEquals(simpleDestination, (msgs.get(i).getMessage(null)).getDestination());
+
+         assertEqualsByteArrays(buffers.get(i).array(), (msgs.get(i).getMessage(null)).getBody().array());
       }
 
+      impl.delete();
+
+      assertEquals(0, factory.listFiles(".page").size());
+
+   }
+
+   
+   
+   public void testDamagedPage(final SequentialFileFactory factory, final int numberOfElements) throws Exception
+   {
+
+      SequentialFile file = factory.createSequentialFile("00010.page", 1);
+
+      PageImpl impl = new PageImpl(factory, file, 10);
+
+      assertEquals(10, impl.getPageId());
+
+      impl.open();
+
+      assertEquals(1, factory.listFiles("page").size());
+
+      SimpleString simpleDestination = new SimpleString("Test");
+
+      ArrayList<ByteBuffer> buffers = addPageElements(simpleDestination, impl, numberOfElements);
+
       impl.sync();
+
+      long positionA = file.position();
+
+      // Add one record that will be damaged
+      addPageElements(simpleDestination, impl, 1);
+      
+      long positionB = file.position();
+      
+      // Add more 10 as they will need to be ignored
+      addPageElements(simpleDestination, impl, 10);
+      
+
+      // Damage data... position the file on the middle between points A and B
+      file.position(positionA + (positionB - positionA) / 2);
+      
+      ByteBuffer buffer = ByteBuffer.allocate((int)(positionB - file.position()));
+      
+      for (int i = 0; i< buffer.capacity(); i++)
+      {
+         buffer.put((byte)'Z');
+      }
+      
+      buffer.rewind();
+      
+      file.write(buffer, true);
+      
       impl.close();
 
       file = factory.createSequentialFile("00010.page", 1);
       file.open();
       impl = new PageImpl(factory, file, 10);
 
-      PagedMessage msgs[] = impl.read();
+      List<PagedMessage> msgs = impl.read();
 
-      assertEquals(numberOfElements, msgs.length);
+      assertEquals(numberOfElements, msgs.size());
 
       assertEquals(numberOfElements, impl.getNumberOfMessages());
 
-      for (int i = 0; i < msgs.length; i++)
+      for (int i = 0; i < msgs.size(); i++)
       {
-         assertEquals(i, (msgs[i].getMessage(null)).getMessageID());
+         assertEquals(i, (msgs.get(i).getMessage(null)).getMessageID());
 
-         assertEquals(simpleDestination, (msgs[i].getMessage(null)).getDestination());
+         assertEquals(simpleDestination, (msgs.get(i).getMessage(null)).getDestination());
 
-         assertEqualsByteArrays(buffers.get(i).array(), (msgs[i].getMessage(null)).getBody().array());
+         assertEqualsByteArrays(buffers.get(i).array(), (msgs.get(i).getMessage(null)).getBody().array());
       }
 
       impl.delete();
 
-      assertEquals(0, factory.listFiles(".page").size());
+      assertEquals(0, factory.listFiles("page").size());
 
+      assertEquals(1, factory.listFiles("invalidPage").size());
+
    }
+   
+   /**
+    * @param simpleDestination
+    * @param page
+    * @param numberOfElements
+    * @return
+    * @throws Exception
+    */
+   protected ArrayList<ByteBuffer> addPageElements(SimpleString simpleDestination, PageImpl page, int numberOfElements) throws Exception
+   {
+      ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      
+      int initialNumberOfMessages = page.getNumberOfMessages();
 
+      for (int i = 0; i < numberOfElements; i++)
+      {
+         ByteBuffer buffer = ByteBuffer.allocate(10);
+
+         for (int j = 0; j < buffer.limit(); j++)
+         {
+            buffer.put(RandomUtil.randomByte());
+         }
+
+         buffers.add(buffer);
+
+         ServerMessage msg = new ServerMessageImpl((byte)1,
+                                                   true,
+                                                   0,
+                                                   System.currentTimeMillis(),
+                                                   (byte)0,
+                                                   new ByteBufferWrapper(buffer));
+
+         msg.setMessageID(i);
+
+         msg.setDestination(simpleDestination);
+
+         page.write(new PagedMessageImpl(msg));
+
+         assertEquals(initialNumberOfMessages + i + 1, page.getNumberOfMessages());
+      }
+      return buffers;
+   }
+
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -111,7 +111,7 @@
 
       assertTrue(storeImpl.isPaging());
 
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(msg, true));
 
       assertEquals(1, storeImpl.getNumberOfPages());
 
@@ -162,7 +162,7 @@
 
          PagedMessageImpl msg = createMessage(destination, buffer);
 
-         assertTrue(storeImpl.page(msg));
+         assertTrue(storeImpl.page(msg, true));
       }
 
       assertEquals(1, storeImpl.getNumberOfPages());
@@ -173,9 +173,9 @@
 
       page.open();
 
-      PagedMessage msg[] = page.read();
+      List<PagedMessage> msg = page.read();
 
-      assertEquals(10, msg.length);
+      assertEquals(10, msg.size());
       assertEquals(1, storeImpl.getNumberOfPages());
 
       page = storeImpl.depage();
@@ -186,8 +186,8 @@
 
       for (int i = 0; i < 10; i++)
       {
-         assertEquals(0, (msg[i].getMessage(null)).getMessageID());
-         assertEqualsByteArrays(buffers.get(i).array(), (msg[i].getMessage(null)).getBody().array());
+         assertEquals(0, (msg.get(i).getMessage(null)).getMessageID());
+         assertEqualsByteArrays(buffers.get(i).array(), (msg.get(i).getMessage(null)).getBody().array());
       }
 
    }
@@ -230,7 +230,7 @@
 
          PagedMessageImpl msg = createMessage(destination, buffer);
 
-         assertTrue(storeImpl.page(msg));
+         assertTrue(storeImpl.page(msg, true));
       }
 
       assertEquals(2, storeImpl.getNumberOfPages());
@@ -243,16 +243,16 @@
 
          page.open();
 
-         PagedMessage msg[] = page.read();
+         List<PagedMessage> msg = page.read();
 
          page.close();
 
-         assertEquals(5, msg.length);
+         assertEquals(5, msg.size());
 
          for (int i = 0; i < 5; i++)
          {
-            assertEquals(0, (msg[i].getMessage(null)).getMessageID());
-            assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), (msg[i].getMessage(null)).getBody().array());
+            assertEquals(0, (msg.get(i).getMessage(null)).getMessageID());
+            assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), (msg.get(i).getMessage(null)).getBody().array());
          }
       }
 
@@ -262,13 +262,13 @@
 
       PagedMessageImpl msg = createMessage(destination, buffers.get(0));
 
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(msg, true));
 
       Page newPage = storeImpl.depage();
 
       newPage.open();
 
-      assertEquals(1, newPage.read().length);
+      assertEquals(1, newPage.read().size());
 
       newPage.delete();
 
@@ -280,23 +280,23 @@
 
       assertFalse(storeImpl.isPaging());
 
-      assertFalse(storeImpl.page(msg));
+      assertFalse(storeImpl.page(msg, true));
 
       storeImpl.startPaging();
 
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(msg, true));
 
       Page page = storeImpl.depage();
 
       page.open();
 
-      PagedMessage msgs[] = page.read();
+      List<PagedMessage> msgs = page.read();
 
-      assertEquals(1, msgs.length);
+      assertEquals(1, msgs.size());
 
-      assertEquals(0l, (msgs[0].getMessage(null)).getMessageID());
+      assertEquals(0l, (msgs.get(0).getMessage(null)).getMessageID());
 
-      assertEqualsByteArrays(buffers.get(0).array(), (msgs[0].getMessage(null)).getBody().array());
+      assertEqualsByteArrays(buffers.get(0).array(), (msgs.get(0).getMessage(null)).getBody().array());
 
       assertEquals(1, storeImpl.getNumberOfPages());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-12-04 01:40:14 UTC (rev 5459)
@@ -141,7 +141,7 @@
                {
                   long id = messageIdGenerator.incrementAndGet();
                   PagedMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
-                  if (storeImpl.page(msg))
+                  if (storeImpl.page(msg, false))
                   {
                      buffers.put(id, msg);
                   }
@@ -231,7 +231,7 @@
       for (Page page : readPages)
       {
          page.open();
-         PagedMessage msgs[] = page.read();
+         List<PagedMessage> msgs = page.read();
          page.close();
 
          for (PagedMessage msg : msgs)
@@ -278,7 +278,7 @@
       long lastMessageId = messageIdGenerator.incrementAndGet();
       PagedMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
 
-      storeImpl2.page(lastMsg);
+      storeImpl2.page(lastMsg, false);
       buffers2.put(lastMessageId, lastMsg);
 
       Page lastPage = null;
@@ -294,7 +294,7 @@
 
          page.open();
 
-         PagedMessage[] msgs = page.read();
+         List<PagedMessage> msgs = page.read();
 
          page.close();
 
@@ -312,13 +312,13 @@
       }
 
       lastPage.open();
-      PagedMessage lastMessages[] = lastPage.read();
+      List<PagedMessage> lastMessages = lastPage.read();
       lastPage.close();
-      assertEquals(1, lastMessages.length);
+      assertEquals(1, lastMessages.size());
 
-      (lastMessages[0].getMessage(null)).getBody().rewind();
-      assertEquals((lastMessages[0].getMessage(null)).getBody().getLong(), lastMessageId);
-      assertEqualsByteArrays((lastMessages[0].getMessage(null)).getBody().array(), (lastMsg.getMessage(null)).getBody()
+      (lastMessages.get(0).getMessage(null)).getBody().rewind();
+      assertEquals((lastMessages.get(0).getMessage(null)).getBody().getLong(), lastMessageId);
+      assertEqualsByteArrays((lastMessages.get(0).getMessage(null)).getBody().array(), (lastMsg.getMessage(null)).getBody()
                                                                                                              .array());
 
       assertEquals(0, buffers2.size());




More information about the jboss-cvs-commits mailing list