[hornetq-commits] JBoss hornetq SVN: r7980 - in trunk: src/main/org/hornetq/core/journal/impl and 16 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 22 19:39:48 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-09-22 19:39:47 -0400 (Tue, 22 Sep 2009)
New Revision: 7980

Added:
   trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
Modified:
   trunk/src/main/org/hornetq/core/journal/Journal.java
   trunk/src/main/org/hornetq/core/journal/LoaderCallback.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
   trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java
   trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
   trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
   trunk/tests/src/org/hornetq/tests/util/JournalExample.java
   trunk/tests/src/org/hornetq/tests/util/ListJournal.java
   trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-49 (Orphaned files) and HORNETQ-143 (Bug on LargeMessages & XA)

Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -75,7 +75,7 @@
 
    // Load
 
-   long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+   long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
 
    int getAlignment() throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/journal/LoaderCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/LoaderCallback.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/LoaderCallback.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -13,18 +13,19 @@
 
 package org.hornetq.core.journal;
 
+
 /**
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * 
  */
-public interface LoaderCallback
+public interface LoaderCallback extends TransactionFailureCallback
 {
+   void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
+
    void addRecord(RecordInfo info);
 
    void deleteRecord(long id);
 
    void updateRecord(RecordInfo info);
-
-   void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
 }

Added: trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+import java.util.List;
+
+/**
+ * A TransactionFailureCallback
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface TransactionFailureCallback
+{
+   
+   /** To be used to inform about transactions without commit records.
+    *  This could be used to remove extra resources associated with the transactions (such as external files received during the transaction) */
+   void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete);
+
+}

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -28,7 +28,6 @@
 import org.hornetq.core.asyncio.impl.TimedBuffer;
 import org.hornetq.core.asyncio.impl.TimedBufferObserver;
 import org.hornetq.core.journal.IOCallback;
-import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -40,14 +39,10 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class AIOSequentialFile implements SequentialFile
+public class AIOSequentialFile extends AbstractSequentialFile
 {
    private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
 
-   private final String directory;
-
-   private File file;
-
    private boolean opened = false;
 
    private final int maxIO;
@@ -87,9 +82,8 @@
                             final Executor executor,
                             final Executor pollerExecutor)
    {
+      super(directory, new File(directory + "/" + fileName));
       this.factory = factory;
-      this.directory = directory;
-      file = new File(directory + "/" + fileName);
       this.maxIO = maxIO;
       this.bufferCallback = bufferCallback;
       this.executor = executor;
@@ -108,11 +102,6 @@
       return aioFile.getBlockSize();
    }
 
-   public boolean exists()
-   {
-      return file.exists();
-   }
-
    public int calculateBlockStart(final int position) throws Exception
    {
       int alignment = getAlignment();
@@ -139,7 +128,10 @@
 
    public synchronized void close() throws Exception
    {
-      checkOpened();
+      if (!opened)
+      {
+         return;
+      }
       opened = false;
 
       timedBuffer = null;
@@ -156,8 +148,8 @@
 
       while (!donelatch.await(60, TimeUnit.SECONDS))
       {
-         log.warn("Executor on file " + file.getName() + " couldn't complete its tasks in 60 seconds.",
-                  new Exception("Warning: Executor on file " + file.getName() +
+         log.warn("Executor on file " + getFile().getName() + " couldn't complete its tasks in 60 seconds.",
+                  new Exception("Warning: Executor on file " + getFile().getName() +
                                 " couldn't complete its tasks in 60 seconds."));
       }
 
@@ -178,17 +170,6 @@
       }
    }
 
-   public void delete() throws Exception
-   {
-      if (aioFile != null)
-      {
-         aioFile.close();
-         aioFile = null;
-      }
-
-      file.delete();
-   }
-
    public void fill(final int position, final int size, final byte fillCharacter) throws Exception
    {
       checkOpened();
@@ -237,35 +218,16 @@
       this.fileSize = aioFile.size();
    }
 
-   public String getFileName()
-   {
-      return file.getName();
-   }
-
    public void open() throws Exception
    {
       open(maxIO);
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.SequentialFile#renameTo(org.hornetq.core.journal.SequentialFile)
-    */
-   public void renameTo(String newFileName) throws Exception
-   {
-      if (isOpen())
-      {
-         close();
-      }
-      File newFile = new File(directory + "/" + newFileName);
-      file.renameTo(newFile);
-      file = newFile;
-   }
-
    public synchronized void open(final int currentMaxIO) throws Exception
    {
       opened = true;
       aioFile = newFile();
-      aioFile.open(file.getAbsolutePath(), currentMaxIO);
+      aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
       position.set(0);
       aioFile.setBufferCallback(bufferCallback);
       this.fileSize = aioFile.size();
@@ -376,7 +338,7 @@
    {
       if (aioFile == null)
       {
-         return file.length();
+         return getFile().length();
       }
       else
       {
@@ -387,7 +349,7 @@
    @Override
    public String toString()
    {
-      return "AIOSequentialFile:" + file.getAbsolutePath();
+      return "AIOSequentialFile:" + getFile().getAbsolutePath();
    }
 
    // Public methods
@@ -525,7 +487,7 @@
 
       public String toString()
       {
-         return "TimedBufferObserver on file (" + AIOSequentialFile.this.file.getName() + ")";
+         return "TimedBufferObserver on file (" + getFile().getName() + ")";
       }
 
    }

Added: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.File;
+
+import org.hornetq.core.journal.SequentialFile;
+
+/**
+ * A AbstractSequentialFile
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class AbstractSequentialFile implements SequentialFile
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private File file;
+
+   private final String directory;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * @param file
+    * @param directory
+    */
+   public AbstractSequentialFile(String directory, File file)
+   {
+      super();
+      this.file = file;
+      this.directory = directory;
+   }
+
+   // Public --------------------------------------------------------
+
+   public final boolean exists()
+   {
+      return file.exists();
+   }
+
+   public final String getFileName()
+   {
+      return file.getName();
+   }
+
+
+   public final void delete() throws Exception
+   {
+      if (isOpen())
+      {
+         close();
+      }
+
+      file.delete();
+   }
+
+
+   public final void renameTo(final String newFileName) throws Exception
+   {
+      close();
+      File newFile = new File(directory + "/" + newFileName);
+      
+
+      if (!file.equals(newFile))
+      {
+         file.renameTo(newFile);
+         file = newFile;
+      }
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected File getFile()
+   {
+      return file;
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -50,6 +50,7 @@
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TestableJournal;
+import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.utils.DataConstants;
@@ -1353,7 +1354,8 @@
     * @see JournalImpl#load(LoaderCallback)
     */
    public synchronized long load(final List<RecordInfo> committedRecords,
-                                 final List<PreparedTransactionInfo> preparedTransactions) throws Exception
+                                 final List<PreparedTransactionInfo> preparedTransactions,
+                                 final TransactionFailureCallback failureCallback) throws Exception
    {
       final Set<Long> recordsToDelete = new HashSet<Long>();
       final List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1399,6 +1401,14 @@
                recordsToDelete.clear();
             }
          }
+
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+            if (failureCallback != null)
+            {
+               failureCallback.failedTransaction(transactionID, records, recordsToDelete);
+            }
+         }
       });
 
       for (RecordInfo record : records)
@@ -2015,6 +2025,8 @@
 
             // Remove the transactionInfo
             transactions.remove(transaction.transactionID);
+            
+            loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
          }
          else
          {

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -32,16 +32,12 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class NIOSequentialFile implements SequentialFile
+public class NIOSequentialFile extends AbstractSequentialFile
 {
    private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
 
-   private File file;
-
    private long fileSize = 0;
 
-   private final String directory;
-
    private FileChannel channel;
 
    private RandomAccessFile rfile;
@@ -50,15 +46,9 @@
 
    public NIOSequentialFile(final String directory, final String fileName)
    {
-      this.directory = directory;
-      file = new File(directory + "/" + fileName);
+      super(directory, new File(directory + "/" + fileName));
    }
 
-   public boolean exists()
-   {
-      return file.exists();
-   }
-
    public int getAlignment()
    {
       return 1;
@@ -78,11 +68,6 @@
       return this.position.get() + size <= fileSize;
    }
 
-   public String getFileName()
-   {
-      return file.getName();
-   }
-
    public synchronized boolean isOpen()
    {
       return channel != null;
@@ -90,7 +75,7 @@
 
    public synchronized void open() throws Exception
    {
-      rfile = new RandomAccessFile(file, "rw");
+      rfile = new RandomAccessFile(getFile(), "rw");
 
       channel = rfile.getChannel();
 
@@ -150,17 +135,7 @@
 
       notifyAll();
    }
-
-   public void delete() throws Exception
-   {
-      if (isOpen())
-      {
-         close();
-      }
-
-      file.delete();
-   }
-
+   
    public int read(final ByteBuffer bytes) throws Exception
    {
       return read(bytes, null);
@@ -249,7 +224,7 @@
    {
       if (channel == null)
       {
-         return file.length();
+         return getFile().length();
       }
       else
       {
@@ -268,18 +243,10 @@
       return position.get();
    }
 
-   public void renameTo(final String newFileName) throws Exception
-   {
-      close();
-      File newFile = new File(directory + "/" + newFileName);
-      file.renameTo(newFile);
-      file = newFile;
-   }
-
    @Override
    public String toString()
    {
-      return "NIOSequentialFile " + file;
+      return "NIOSequentialFile " + getFile();
    }
 
    /* (non-Javadoc)

Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -153,6 +153,12 @@
 
       numberOfMessages.incrementAndGet();
       size.addAndGet(buffer.limit());
+      
+      if (message.getMessage(null).isLargeMessage())
+      {
+         // If we don't sync on large messages we could have the risk of files unnatended files on disk
+         sync();
+      }
    }
 
    public void sync() throws Exception

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -37,6 +37,7 @@
 import org.hornetq.core.paging.PagingStoreFactory;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
@@ -769,6 +770,16 @@
          ServerMessage message = null;
 
          message = pagedMessage.getMessage(storageManager);
+         
+         if (message.isLargeMessage())
+         {
+            LargeServerMessage largeMsg = (LargeServerMessage)message;
+            if (!largeMsg.isFileExists())
+            {
+               log.warn("File for large message " + largeMsg.getMessageID() + " doesn't exist, so ignoring depage for this large message");
+               continue;
+            }
+         }
 
          final long transactionIdDuringPaging = pagedMessage.getTransactionID();
 

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -50,8 +50,6 @@
    // We should only use the NIO implementation on the Journal
    private SequentialFile file;
 
-   private boolean complete = false;
-
    private long bodySize = -1;
 
    // Static --------------------------------------------------------
@@ -76,7 +74,6 @@
       this.linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
-      complete = true;
       bodySize = copy.bodySize;
       setMessageID(newID);
    }
@@ -149,6 +146,7 @@
       return (int)Math.min(bodySize, Integer.MAX_VALUE);
    }
 
+   @Override
    public synchronized long getLargeBodySize()
    {
       try
@@ -178,32 +176,26 @@
    public void decode(final HornetQBuffer buffer)
    {
       file = null;
-      complete = true;
+      try
+      {
+         this.setStored();
+      }
+      catch (Exception e)
+      {
+         // File still null, this wasn't supposed to happen ever.
+         log.warn(e.getMessage(), e);
+      }
       decodeProperties(buffer);
    }
 
-   /**
-    * @return the complete
-    */
-   public boolean isComplete()
-   {
-      return complete;
-   }
-
-   /**
-    * @param complete the complete to set
-    */
-   public void setComplete(boolean complete)
-   {
-      this.complete = complete;
-   }
-
    @Override
    public int decrementRefCount()
    {
       int currentRefCount = super.decrementRefCount();
 
-      if (currentRefCount == 0)
+      // We use <= as this could be used by load.
+      // because of a failure, no references were loaded, so we have 0... and we still need to delete the associated files
+      if (currentRefCount <= 0)
       {
          if (linkMessage != null)
          {
@@ -242,6 +234,12 @@
       validateFile();
       storageManager.deleteFile(file);
    }
+   
+   public boolean isFileExists() throws Exception
+   {
+      SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(), isStored());
+      return localfile.exists();
+   }
 
    // We cache this
    private volatile int memoryEstimate = -1;
@@ -257,14 +255,16 @@
 
       return memoryEstimate;
    }
-
-   public synchronized void complete() throws Exception
+   
+   
+   @Override
+   public void setStored() throws Exception
    {
+      super.setStored();
       releaseResources();
-
-      if (!complete)
+      if (file != null && linkMessage == null)
       {
-         SequentialFile fileToRename = storageManager.createFileForLargeMessage(getMessageID(), true);
+         SequentialFile fileToRename = storageManager.createFileForLargeMessage(getMessageID(), isStored());
          file.renameTo(fileToRename.getFileName());
       }
    }
@@ -296,7 +296,7 @@
          idToUse = linkMessage.getMessageID();
       }
 
-      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, true);
+      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, isStored());
 
       file.open();
 
@@ -327,7 +327,7 @@
             throw new RuntimeException("MessageID not set on LargeMessage");
          }
 
-         file = storageManager.createFileForLargeMessage(getMessageID(), complete);
+         file = storageManager.createFileForLargeMessage(getMessageID(), isStored());
 
          file.open();
 

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -39,6 +39,7 @@
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
@@ -465,7 +466,45 @@
 
       int deliveryCount;
    }
+   
+   
+   private class LargeMessageTXFailureCallback implements TransactionFailureCallback
+   {
+      private final Map<Long, ServerMessage> messages;
+      
+      public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
+      {
+         super();
+         this.messages = messages;
+      }
 
+
+
+      public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+      {
+         for (RecordInfo record : records)
+         {
+            if (record.userRecordType == ADD_LARGE_MESSAGE)
+            {
+               byte[] data = record.data;
+
+               HornetQBuffer buff = ChannelBuffers.wrappedBuffer(data);
+
+               try
+               {
+                  LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
+                  serverMessage.decrementRefCount();
+               }
+               catch (Exception e)
+               {
+                  log.warn(e.getMessage(), e);
+               }
+            }
+         }
+      }
+      
+   }
+
    public void loadMessageJournal(final PagingManager pagingManager,
                                   final ResourceManager resourceManager,
                                   final Map<Long, Queue> queues,
@@ -475,9 +514,11 @@
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
-      messageJournal.load(records, preparedTransactions);
-
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+      
+      messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(messages));
+      
+      ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
 
       Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
 
@@ -493,37 +534,12 @@
          {
             case ADD_LARGE_MESSAGE:
             {
-               LargeServerMessage largeMessage = createLargeMessage();
+               LargeServerMessage largeMessage = parseLargeMessage(messages, buff);
 
-               LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
-               messageEncoding.decode(buff);
+               messages.put(record.id, largeMessage);
                
-               Long originalMessageID = (Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-               
-               // Using the linked file by the original file
-               if (originalMessageID != null)
-               {
-                  LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-                  
-                  if (originalMessage == null)
-                  {
-                     // this could happen if the message was deleted but the file still exists as the file still being used
-                      originalMessage = createLargeMessage();
-                     originalMessage.setMessageID(originalMessageID);
-                     originalMessage.setComplete(true);
-                     messages.put(originalMessageID, originalMessage);
-                  }
-                  
-                  originalMessage.incrementRefCount();
-                  
-                  largeMessage.setLinkedMessage(originalMessage);
-                  largeMessage.setComplete(true);
-               }
-               
+               largeMessages.add(largeMessage);
 
-               messages.put(record.id, largeMessage);
-
                break;
             }
             case ADD_MESSAGE:
@@ -708,12 +724,58 @@
 
       loadPreparedTransactions(pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
 
+      for (LargeServerMessage msg : largeMessages)
+      {
+         if (msg.getRefCount() == 0)
+         {
+            log.info("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
+            msg.decrementRefCount();
+         }
+      }
+      
       if (perfBlastPages != -1)
       {
          messageJournal.perfBlast(perfBlastPages);
       }
    }
 
+   /**
+    * @param messages
+    * @param buff
+    * @return
+    * @throws Exception
+    */
+   private LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages, HornetQBuffer buff) throws Exception
+   {
+      LargeServerMessage largeMessage = createLargeMessage();
+
+      LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
+
+      messageEncoding.decode(buff);
+      
+      Long originalMessageID = (Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+      
+      // Using the linked file by the original file
+      if (originalMessageID != null)
+      {
+         LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
+         
+         if (originalMessage == null)
+         {
+            // this could happen if the message was deleted but the file still exists as the file still being used
+            originalMessage = createLargeMessage();
+            originalMessage.setMessageID(originalMessageID);
+            originalMessage.setStored();
+            messages.put(originalMessageID, originalMessage);
+         }
+         
+         originalMessage.incrementRefCount();
+         
+         largeMessage.setLinkedMessage(originalMessage);
+      }
+      return largeMessage;
+   }
+
    private void loadPreparedTransactions(final PagingManager pagingManager,
                                          final ResourceManager resourceManager,
                                          final Map<Long, Queue> queues,
@@ -747,6 +809,12 @@
 
             switch (recordType)
             {
+               case ADD_LARGE_MESSAGE:
+               {
+                  messages.put(record.id, parseLargeMessage(messages, buff));
+                  
+                  break;
+               }
                case ADD_MESSAGE:
                {
                   ServerMessage message = new ServerMessageImpl(record.id);
@@ -933,7 +1001,7 @@
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
-      bindingsJournal.load(records, preparedTransactions);
+      bindingsJournal.load(records, preparedTransactions, null);
 
       for (RecordInfo record : records)
       {
@@ -1059,9 +1127,9 @@
     * @param messageID
     * @return
     */
-   SequentialFile createFileForLargeMessage(final long messageID, final boolean completeFile)
+   SequentialFile createFileForLargeMessage(final long messageID, final boolean stored)
    {
-      if (completeFile)
+      if (stored)
       {
          return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
       }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -126,6 +126,15 @@
       // nothing to be done on null persistence
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.LargeServerMessage#isFileExists()
+    */
+   public boolean isFileExists() throws Exception
+   {
+      // There are no real files on null persistence
+      return true;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -610,6 +610,7 @@
       {
          if (pagingManager.page(message, true))
          {
+            message.setStored();
             return;
          }
       }
@@ -1049,6 +1050,7 @@
             {
                if (pagingManager.page(message, tx.getID(), first))
                {
+                  message.setStored();
                   if (message.isDurable())
                   {
                      // We only create pageTransactions if using persistent messages

Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -31,17 +31,13 @@
    
    /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
    LargeServerMessage getLinkedMessage();
+   
+   boolean isFileExists() throws Exception;
 
    /** Close the files if opened */
    void releaseResources();
    
    long getLargeBodySize();
    
-   void complete() throws Exception;
-   
-   void setComplete(boolean isComplete);
-   
-   boolean isComplete();
-   
    void deleteFile() throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -44,11 +44,12 @@
 
    int getMemoryEstimate();
 
-   void setStored();
+   void setStored() throws Exception;
 
    boolean isStored();
 
    int getRefCount();
+
    
    //TODO - we might be able to put this in a better place
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -794,6 +794,30 @@
                                    configuration.isBackup());
    }
 
+   /** for use on sub-classes */
+   protected ExecutorService getExecutor()
+   {
+      return threadPool;
+   }
+   
+   /** 
+    * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance) 
+    * @return
+    */
+   protected StorageManager createStorageManager()
+   {
+      if (configuration.isPersistenceEnabled())
+      {
+         return new JournalStorageManager(configuration, threadPool);
+      }
+      else
+      {
+         return new NullStorageManager();
+      }
+   }
+
+   
+
    // Private
    // --------------------------------------------------------------------------------------
 
@@ -921,14 +945,7 @@
          deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod());
       }
 
-      if (configuration.isPersistenceEnabled())
-      {
-         storageManager = new JournalStorageManager(configuration, threadPool);
-      }
-      else
-      {
-         storageManager = new NullStorageManager();
-      }
+      this.storageManager = createStorageManager();
 
       securityRepository = new HierarchicalObjectRepository<Set<Role>>();
       securityRepository.setDefault(new HashSet<Role>());

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -130,7 +130,8 @@
    private final Binding binding;
 
    // Constructors ---------------------------------------------------------------------------------
-
+   
+   
    public ServerConsumerImpl(final long id,
                              final long replicatedSessionID,
                              final ServerSession session,
@@ -147,6 +148,7 @@
                              final Executor executor,
                              final ManagementService managementService) throws Exception
    {
+      
       this.id = id;
 
       this.replicatedSessionID = replicatedSessionID;

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -105,7 +105,7 @@
       return stored;
    }
    
-   public void setStored()
+   public void setStored() throws Exception
    {
       stored = true;
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -2367,7 +2367,7 @@
 
             currentLargeMessage = null;
 
-            message.complete();
+            message.releaseResources();
 
             send(message);
          }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -272,7 +272,7 @@
       ArrayList<PreparedTransactionInfo> transactions = new ArrayList<PreparedTransactionInfo>();
 
       journal.start();
-      journal.load(records, transactions);
+      journal.load(records, transactions, null);
       
       System.out.println("===============================================");
       System.out.println("Journal records at the end:");

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -67,20 +67,19 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   
-   
+
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-      ClientSession session = null;           
+      ClientSession session = null;
 
       try
       {
          server = createServer(true);
 
          server.start();
-         
+
          log.info("*********** starting test");
 
          ClientSessionFactory sf = createInVMFactory();
@@ -94,7 +93,7 @@
          Message clientFile = createLargeClientMessage(session, messageSize, true);
 
          log.info("*********** sending large message");
-         
+
          producer.send(clientFile);
 
          session.commit();
@@ -106,13 +105,13 @@
          msg1.acknowledge();
          session.commit();
          assertNotNull(msg1);
-         
+
          consumer.close();
 
          try
          {
             msg1.getBody().readByte();
-            fail ("Exception was expected");
+            fail("Exception was expected");
          }
          catch (Throwable ignored)
          {
@@ -142,20 +141,18 @@
       }
    }
 
-
-
    public void testDLALargeMessage() throws Exception
    {
       final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-      ClientSession session = null;           
+      ClientSession session = null;
 
       try
       {
          server = createServer(true);
 
          server.start();
-         
+
          log.info("*********** starting test");
 
          ClientSessionFactory sf = createInVMFactory();
@@ -181,7 +178,7 @@
          Message clientFile = createLargeClientMessage(session, messageSize, true);
 
          log.info("*********** sending large message");
-         
+
          producer.send(clientFile);
 
          session.commit();
@@ -275,7 +272,6 @@
       }
    }
 
-   
    public void testDeliveryCount() throws Exception
    {
       final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -289,12 +285,11 @@
          server.start();
 
          ClientSessionFactory sf = createInVMFactory();
-         
+
          session = sf.createSession(false, false, false);
 
          session.createQueue(ADDRESS, ADDRESS, true);
 
- 
          ClientProducer producer = session.createProducer(ADDRESS);
 
          Message clientFile = createLargeClientMessage(session, messageSize, true);
@@ -305,19 +300,19 @@
          session.start();
 
          ClientConsumer consumer = session.createConsumer(ADDRESS);
-         
+
          ClientMessage msg = consumer.receive(10000);
          assertNotNull(msg);
          msg.acknowledge();
          assertEquals(1, msg.getDeliveryCount());
-         for (int i = 0 ; i < messageSize; i++)
+         for (int i = 0; i < messageSize; i++)
          {
             assertEquals(getSamplebyte(i), msg.getBody().readByte());
          }
          session.rollback();
-         
+
          session.close();
-         
+
          session = sf.createSession(false, false, false);
          session.start();
 
@@ -325,16 +320,16 @@
          msg = consumer.receive(10000);
          assertNotNull(msg);
          msg.acknowledge();
-         for (int i = 0 ; i < messageSize; i++)
+         for (int i = 0; i < messageSize; i++)
          {
             assertEquals(getSamplebyte(i), msg.getBody().readByte());
          }
          assertEquals(2, msg.getDeliveryCount());
          msg.acknowledge();
          consumer.close();
-         
-         session.commit();         
-         
+
+         session.commit();
+
          validateNoFilesOnLargeDir();
       }
       finally
@@ -357,7 +352,139 @@
       }
    }
 
-   
+   public void testDLAOnExpiryNonDurableMessage() throws Exception
+   {
+      final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true);
+
+         server.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
+         SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+         AddressSettings addressSettings = new AddressSettings();
+
+         addressSettings.setDeadLetterAddress(ADDRESS_DLA);
+         addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+         addressSettings.setMaxDeliveryAttempts(1);
+
+         server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
+         session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, false);
+         clientFile.setExpiration(System.currentTimeMillis());
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumerExpired = session.createConsumer(ADDRESS);
+         // to kick expiry quicker than waiting reaper thread
+         assertNull(consumerExpired.receive(1000));
+         consumerExpired.close();
+
+         ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
+
+         ClientMessage msg1 = consumerExpiry.receive(5000);
+         assertNotNull(msg1);
+         msg1.acknowledge();
+
+         session.rollback();
+
+         for (int j = 0; j < messageSize; j++)
+         {
+            assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+         }
+
+         consumerExpiry.close();
+
+         for (int i = 0; i < 10; i++)
+         {
+
+            consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+            msg1 = consumerExpiry.receive(5000);
+            assertNotNull(msg1);
+            msg1.acknowledge();
+
+            session.rollback();
+
+            for (int j = 0; j < messageSize; j++)
+            {
+               assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+            }
+
+            consumerExpiry.close();
+         }
+
+         session.close();
+
+         session = sf.createSession(false, false, false);
+
+         session.start();
+
+         consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+         msg1 = consumerExpiry.receive(5000);
+         assertNotNull(msg1);
+         // msg1.acknowledge();
+
+         for (int i = 0; i < messageSize; i++)
+         {
+            assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+         }
+
+         session.commit();
+
+         consumerExpiry.close();
+
+         session.commit();
+
+         session.close();
+
+         server.stop();
+
+         server.start();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testDLAOnExpiry() throws Exception
    {
       final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -371,7 +498,7 @@
          server.start();
 
          ClientSessionFactory sf = createInVMFactory();
-         
+
          SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
 
@@ -387,7 +514,6 @@
 
          session.createQueue(ADDRESS, ADDRESS, true);
 
-        
          session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
          session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
 
@@ -506,11 +632,11 @@
          server = createServer(true);
 
          server.start();
-         
+
          AddressSettings addressSettings = new AddressSettings();
 
          SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
-         
+
          addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
 
          server.getAddressSettingsRepository().addMatch("*", addressSettings);
@@ -520,7 +646,7 @@
          session = sf.createSession(false, false, false);
 
          session.createQueue(ADDRESS, ADDRESS, true);
-                 
+
          session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
 
          ClientProducer producer = session.createProducer(ADDRESS);
@@ -1163,6 +1289,12 @@
          {
             session.end(xid, XAResource.TMSUCCESS);
             session.prepare(xid);
+            session.close();
+            server.stop();
+            server.start();
+
+            session = sf.createSession(isXA, false, false);
+
             session.rollback(xid);
          }
          else

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -95,7 +95,7 @@
          List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
          List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
          journal.start();
-         journal.load(committedRecords, preparedTransactions);
+         journal.load(committedRecords, preparedTransactions, null);
 
          assertEquals(0, committedRecords.size());
          assertEquals(0, preparedTransactions.size());

Added: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -0,0 +1,561 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.largemessage;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executor;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.security.HornetQSecurityManager;
+import org.hornetq.core.security.impl.HornetQSecurityManagerImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.SpawnedVMSupport;
+
+/**
+ * A LargeMessageCrashTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LargeMessageCrashTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   static String QUEUE_NAME = "MY-QUEUE";
+
+   static int LARGE_MESSAGE_SIZE = 5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+   static int PAGED_MESSAGE_SIZE = 1024;
+
+   static int NUMBER_OF_PAGES_MESSAGES = 100;
+
+   boolean failAfterRename;
+
+   // Static --------------------------------------------------------
+
+   public static void main(String args[])
+   {
+      LargeMessageCrashTest serverTest = new LargeMessageCrashTest();
+
+      serverTest.failAfterRename = false;
+
+      for (String arg : args)
+      {
+         if (arg.equals("failAfterRename"))
+         {
+            serverTest.failAfterRename = true;
+         }
+      }
+      
+      for (String arg : args)
+      {
+         if (arg.equals("remoteJournalSendNonTransactional"))
+         {
+            serverTest.remoteJournalSendNonTransactional();
+         }
+         else if (arg.equals("remoteJournalSendTransactional"))
+         {
+            serverTest.remoteJournalSendTransactional();
+         }
+         else if (arg.equals("remotePreparedTransaction"))
+         {
+            serverTest.remotePreparedTransaction();
+         }
+         else if (arg.equals("remotePaging"))
+         {
+            serverTest.remotePaging();
+         }
+      }
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testJournalSendNonTransactional1() throws Exception
+   {
+      internalTestSend(false, false);
+   }
+
+   public void testJournalSendNonTransactional2() throws Exception
+   {
+      internalTestSend(true, false);
+   }
+
+   public void testJournalSendTransactional1() throws Exception
+   {
+      internalTestSend(false, true);
+   }
+
+   public void testJournalSendTransactional2() throws Exception
+   {
+      internalTestSend(true, true);
+   }
+
+   public void internalTestSend(boolean failureAfterRename, boolean transactional) throws Exception
+   {
+      if (transactional)
+      {
+         runExternalProcess(failureAfterRename, "remoteJournalSendTransactional");
+      }
+      else
+      {
+         runExternalProcess(failureAfterRename, "remoteJournalSendNonTransactional");
+      }
+
+      HornetQServer server = newServer(false);
+
+      try
+      {
+         server.start();
+
+         ClientSessionFactory cf = createInVMFactory();
+
+         ClientSession session = cf.createSession(true, true);
+
+         ClientConsumer cons = session.createConsumer(QUEUE_NAME);
+
+         session.start();
+
+         assertNull(cons.receive(100));
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testPreparedTransaction() throws Exception
+   {
+      runExternalProcess(false, "remotePreparedTransaction");
+
+      HornetQServer server = newServer(false);
+
+      server.start();
+
+      ClientSessionFactory cf = createInVMFactory();
+
+      ClientSession session = cf.createSession(true, false, false);
+
+      Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+
+      assertEquals(1, xids.length);
+
+      session.rollback(xids[0]);
+
+      session.close();
+
+      server.stop();
+
+      validateNoFilesOnLargeDir();
+
+   }
+
+   public void testPreparedTransactionAndCommit() throws Exception
+   {
+      runExternalProcess(false, "remotePreparedTransaction");
+
+      HornetQServer server = newServer(false);
+
+      server.start();
+
+      ClientSessionFactory cf = createInVMFactory();
+
+      ClientSession session = cf.createSession(true, false, false);
+
+      Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+
+      assertEquals(1, xids.length);
+
+      session.commit(xids[0], false);
+
+      session.close();
+
+      session = cf.createSession(false, false);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+
+      session.start();
+
+      ClientMessage msg = consumer.receive(5000);
+
+      assertNotNull(msg);
+
+      msg.acknowledge();
+
+      for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
+      {
+         assertEquals(getSamplebyte(i), msg.getBody().readByte());
+      }
+
+      session.commit();
+
+      session.close();
+
+      server.stop();
+
+      validateNoFilesOnLargeDir();
+
+   }
+   
+   
+   public void testPaging() throws Exception
+   {
+      runExternalProcess(false, "remotePaging");
+
+      HornetQServer server = newServer(false);
+
+      server.start();
+
+      ClientSessionFactory cf = createInVMFactory();
+
+      ClientSession session = cf.createSession(false, true, true);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+
+      session.start();
+
+      for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
+      {
+         ClientMessage msg = consumer.receive(50000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         session.commit();
+      }
+
+      ClientMessage msg = consumer.receiveImmediate();
+      assertNull(msg);
+
+      session.close();
+
+      server.stop();
+
+      validateNoFilesOnLargeDir();
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   /**
+    * @throws Exception
+    * @throws InterruptedException
+    */
+   private void runExternalProcess(boolean failAfterRename, String methodName) throws Exception, InterruptedException
+   {
+      System.err.println("running external process...");
+
+      Process process = SpawnedVMSupport.spawnVM(this.getClass().getCanonicalName(),
+                                                 "-Xms128m -Xmx128m ",
+                                                 new String[] {},
+                                                 true,
+                                                 true,
+                                                 methodName,
+                                                 (failAfterRename ? "failAfterRename" : "regularFail"));
+
+      assertEquals(100, process.waitFor());
+   }
+
+   // Inner classes -------------------------------------------------
+
+   public void remoteJournalSendNonTransactional()
+   {
+
+      try
+      {
+         startServer(failAfterRename, true);
+
+         ClientSessionFactory factory = createInVMFactory();
+         ClientSession session = factory.createSession(true, true);
+
+         try
+         {
+            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         System.exit(-1);
+      }
+
+   }
+
+   public void remoteJournalSendTransactional()
+   {
+      try
+      {
+         startServer(failAfterRename, true);
+
+         ClientSessionFactory factory = createInVMFactory();
+         ClientSession session = factory.createSession(false, false);
+
+         try
+         {
+            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         System.exit(-1);
+      }
+
+   }
+
+   public void remotePreparedTransaction()
+   {
+      try
+      {
+         startServer(failAfterRename, false);
+
+         ClientSessionFactory factory = createInVMFactory();
+         ClientSession session = factory.createSession(true, false, false);
+
+         try
+         {
+            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+         Xid xid = newXID();
+         session.start(xid, XAResource.TMNOFLAGS);
+
+         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+
+         session.end(xid, XAResource.TMSUCCESS);
+         session.prepare(xid);
+
+         Runtime.getRuntime().halt(100);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         System.exit(-1);
+      }
+
+   }
+
+   public void remotePaging()
+   {
+      try
+      {
+         startServer(failAfterRename, true);
+
+         ClientSessionFactory factory = createInVMFactory();
+         ClientSession session = factory.createSession(false, false, false);
+
+         try
+         {
+            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+         byte body[] = new byte[PAGED_MESSAGE_SIZE];
+         for (int i = 0; i < body.length; i++)
+         {
+            body[i] = getSamplebyte(i);
+         }
+
+         ClientMessage msg = session.createClientMessage(true);
+
+         msg.setBody(ChannelBuffers.wrappedBuffer(body));
+
+         for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
+         {
+            prod.send(msg);
+         }
+
+         session.commit();
+         
+         session.close();
+         
+         session = factory.createSession(false, true, true);
+         prod = session.createProducer(QUEUE_NAME);
+
+         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+
+         Runtime.getRuntime().halt(100);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         System.exit(-1);
+      }
+
+   }
+
+   protected ClientMessage createLargeClientMessage(final ClientSession session,
+                                                    final long numberOfBytes,
+                                                    final boolean persistent) throws Exception
+   {
+
+      ClientMessage clientMessage = session.createClientMessage(persistent);
+
+      clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+
+      return clientMessage;
+   }
+
+   protected void startServer(boolean failAfterRename, boolean fail)
+   {
+      this.failAfterRename = failAfterRename;
+      try
+      {
+         HornetQServer server = newServer(fail);
+         server.start();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private HornetQServer newServer(boolean failing)
+   {
+      Configuration configuration = createDefaultConfig(false);
+      HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+
+      HornetQServer server;
+
+      if (failing)
+      {
+         server = new FailingHornetQServer(configuration, securityManager);
+      }
+      else
+      {
+         server = new HornetQServerImpl(configuration, securityManager);
+      }
+
+      AddressSettings defaultSetting = new AddressSettings();
+      defaultSetting.setPageSizeBytes(10 * 1024);
+      defaultSetting.setMaxSizeBytes(100 * 1024);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      return server;
+   }
+
+   /** This is hacking HornetQServerImpl, 
+    *  to make sure the server will fail right 
+    *  before the page-file was removed */
+   private class FailingHornetQServer extends HornetQServerImpl
+   {
+      FailingHornetQServer(final Configuration config, final HornetQSecurityManager securityManager)
+      {
+         super(config, ManagementFactory.getPlatformMBeanServer(), securityManager);
+      }
+
+      @Override
+      protected StorageManager createStorageManager()
+      {
+         return new FailingStorageManager(getConfiguration(), getExecutor());
+      }
+
+   }
+
+   private class FailingStorageManager extends JournalStorageManager
+   {
+
+      public FailingStorageManager(final Configuration config, final Executor executor)
+      {
+         super(config, executor);
+      }
+
+      @Override
+      public LargeServerMessage createLargeMessage()
+      {
+         return new FailinJournalLargeServerMessage(this);
+      }
+
+   }
+
+   private class FailinJournalLargeServerMessage extends JournalLargeServerMessage
+   {
+      /**
+       * @param storageManager
+       */
+      public FailinJournalLargeServerMessage(final JournalStorageManager storageManager)
+      {
+         super(storageManager);
+      }
+
+      @Override
+      public void setStored() throws Exception
+      {
+         if (failAfterRename)
+         {
+            super.setStored();
+         }
+         Runtime.getRuntime().halt(100);
+      }
+
+   }
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -72,7 +72,7 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
+
    protected void tearDown() throws Exception
    {
       if (server != null && server.isStarted())
@@ -86,9 +86,9 @@
             log.warn(e.getMessage(), e);
          }
       }
-      
+
       server = null;
-      
+
       super.tearDown();
    }
 
@@ -180,8 +180,21 @@
 
             if (isXA)
             {
+
                session.end(xid, XAResource.TMSUCCESS);
+               session.prepare(xid);
+
+               session.close();
+
+               if (realFiles)
+               {
+                  server.stop();
+                  server.start();
+               }
+
+               session = sf.createSession(null, null, isXA, false, false, preAck, 0);
                session.rollback(xid);
+               producer = session.createProducer(ADDRESS);
                xid = newXID();
                session.start(xid, XAResource.TMNOFLAGS);
             }
@@ -198,6 +211,20 @@
          if (isXA)
          {
             session.end(xid, XAResource.TMSUCCESS);
+            session.prepare(xid);
+
+            session.close();
+
+            if (realFiles)
+            {
+               server.stop();
+               server.start();
+            }
+
+            session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+            producer = session.createProducer(ADDRESS);
+
             session.commit(xid, true);
             xid = newXID();
             session.start(xid, XAResource.TMNOFLAGS);
@@ -325,7 +352,7 @@
                               {
                                  log.debug("Read " + b + " bytes");
                               }
-                              
+
                               assertEquals(getSamplebyte(b), buffer.readByte());
                            }
                         }
@@ -614,37 +641,6 @@
       consumer.close();
    }
 
-   /**
-    * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
-    */
-   protected void validateNoFilesOnLargeDir(int expect) throws Exception
-   {
-      File largeMessagesFileDir = new File(getLargeMessagesDir());
-
-      // Deleting the file is async... we keep looking for a period of the time until the file is really gone
-      for (int i = 0; i < 100; i++)
-      {
-         if (largeMessagesFileDir.listFiles().length != expect)
-         {
-            Thread.sleep(10);
-         }
-         else
-         {
-            break;
-         }
-      }
-
-      assertEquals(expect, largeMessagesFileDir.listFiles().length);
-   }
-
-   /**
-    * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
-    */
-   protected void validateNoFilesOnLargeDir() throws Exception
-   {
-      validateNoFilesOnLargeDir(0);
-   }
-
    protected OutputStream createFakeOutputStream() throws Exception
    {
 

Modified: trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -170,7 +170,7 @@
       stopJournal();
       createJournal();
       startJournal();
-      journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+      journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
 
       assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
 
@@ -195,7 +195,7 @@
 
       journal.start();
 
-      journal.load(new ArrayList<RecordInfo>(), null);
+      journal.load(new ArrayList<RecordInfo>(), null, null);
 
       try
       {
@@ -256,7 +256,7 @@
 
       journal.start();
 
-      journal.load(new ArrayList<RecordInfo>(), null);
+      journal.load(new ArrayList<RecordInfo>(), null, null);
 
       log.debug("Adding data");
       SimpleEncoding data = new SimpleEncoding(700, (byte)'j');
@@ -279,7 +279,7 @@
       journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "hornetq-data", "hq", 5000);
 
       journal.start();
-      journal.load(new ArrayList<RecordInfo>(), null);
+      journal.load(new ArrayList<RecordInfo>(), null, null);
       journal.stop();
 
    }

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -15,6 +15,7 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -53,6 +54,10 @@
       public void updateRecord(final RecordInfo info)
       {
       }
+
+      public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+      {
+      }
    };
 
    private static final long NUMBER_OF_MESSAGES = 210000l;
@@ -122,7 +127,7 @@
       ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
       ArrayList<PreparedTransactionInfo> trans = new ArrayList<PreparedTransactionInfo>();
 
-      impl.load(info, trans);
+      impl.load(info, trans, null);
 
       impl.forceMoveNextFile();
 
@@ -196,7 +201,7 @@
       ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
       ArrayList<PreparedTransactionInfo> trans = new ArrayList<PreparedTransactionInfo>();
 
-      impl.load(info, trans);
+      impl.load(info, trans, null);
 
       if (info.size() > 0)
       {

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -15,6 +15,7 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.journal.LoaderCallback;
@@ -234,6 +235,13 @@
 
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.TransactionFailureCallback#failedTransaction(long, java.util.List, java.util.List)
+       */
+      public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+      {
+      }
+
    }
 
 }

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -14,6 +14,7 @@
 package org.hornetq.tests.stress.journal.remote;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -98,6 +99,10 @@
          public void updateRecord(RecordInfo info)
          {
          }
+
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
       });
 
       LocalThreads threads[] = new LocalThreads[numberOfThreads];

Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -167,7 +167,7 @@
       stopJournal();
       createJournal();
       startJournal();
-      journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+      journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
       
       assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
       

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -15,6 +15,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -27,6 +28,7 @@
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
@@ -61,6 +63,10 @@
       public void updateRecord(RecordInfo info)
       {
       }
+
+      public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+      {
+      }
    };
 
    // Attributes ----------------------------------------------------
@@ -70,6 +76,8 @@
    JournalImpl journalImpl = null;
 
    private ArrayList<RecordInfo> records = null;
+   
+   private ArrayList<Long> incompleteTransactions = null;
 
    private ArrayList<PreparedTransactionInfo> transactions = null;
 
@@ -436,6 +444,9 @@
 
       assertEquals(0, records.size());
       assertEquals(0, transactions.size());
+      assertEquals(2, incompleteTransactions.size());
+      assertEquals((Long)77l, incompleteTransactions.get(0));
+      assertEquals((Long)78l, incompleteTransactions.get(1));
 
       try
       {
@@ -1284,7 +1295,7 @@
       ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
       ArrayList<PreparedTransactionInfo> trans = new ArrayList<PreparedTransactionInfo>();
 
-      impl.load(info, trans);
+      impl.load(info, trans, null);
 
       assertEquals(0, info.size());
       assertEquals(0, trans.size());
@@ -1303,6 +1314,8 @@
       records = new ArrayList<RecordInfo>();
 
       transactions = new ArrayList<PreparedTransactionInfo>();
+      
+      incompleteTransactions = new ArrayList<Long>();
 
       factory = null;
 
@@ -1327,6 +1340,8 @@
       records = null;
 
       transactions = null;
+      
+      incompleteTransactions = null;
 
       factory = null;
 
@@ -1360,8 +1375,17 @@
 
       records.clear();
       transactions.clear();
+      incompleteTransactions.clear();
 
-      journalImpl.load(records, transactions);
+      journalImpl.load(records, transactions, new TransactionFailureCallback()
+      {
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+            System.out.println("records.length = " + records.size());
+            incompleteTransactions.add(transactionID);
+         }
+         
+      });
    }
 
    // Inner classes -------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -355,7 +355,7 @@
       records.clear();
       transactions.clear();
 
-      journalImpl.load(records, transactions);
+      journalImpl.load(records, transactions, null);
    }
 
    // Inner classes -------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -167,7 +167,7 @@
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
-      journal.load(committedRecords, preparedTransactions);
+      journal.load(committedRecords, preparedTransactions, null);
 
       checkRecordsEquivalent(records, committedRecords);
 
@@ -199,7 +199,7 @@
 
    protected void load() throws Exception
    {
-      journal.load(null, null);
+      journal.load(null, null, null);
    }
 
    protected void add(final long... arguments) throws Exception

Modified: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -53,7 +53,7 @@
 
       journal.start();
 
-      journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+      journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
 
       BatchingIDGenerator batch = new BatchingIDGenerator(0, 1000, journal);
       long id1 = batch.generateID();
@@ -134,7 +134,7 @@
       ArrayList<PreparedTransactionInfo> tx = new ArrayList<PreparedTransactionInfo>();
 
       journal.start();
-      journal.load(records, tx);
+      journal.load(records, tx, null);
 
       assertEquals(0, tx.size());
 

Modified: trunk/tests/src/org/hornetq/tests/util/JournalExample.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JournalExample.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/util/JournalExample.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -73,7 +73,7 @@
          ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
          journalExample.start();
          System.out.println("Loading records and creating data files");
-         journalExample.load(committedRecords, preparedTransactions);
+         journalExample.load(committedRecords, preparedTransactions, null);
 
          System.out.println("Loaded Record List:");
 

Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -69,7 +69,7 @@
          ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
 
          journal.start();
-         journal.load(records, prepared);
+         journal.load(records, prepared, null);
 
          if (prepared.size() > 0)
          {

Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-09-22 23:39:47 UTC (rev 7980)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.util;
 
+import java.io.File;
 import java.lang.management.ManagementFactory;
 import java.lang.ref.WeakReference;
 import java.util.HashMap;
@@ -313,6 +314,39 @@
       message.getBody().writeBytes(b);
       return message;
    }
+   
+   /**
+    * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+    */
+   protected void validateNoFilesOnLargeDir(int expect) throws Exception
+   {
+      File largeMessagesFileDir = new File(getLargeMessagesDir());
+
+      // Deleting the file is async... we keep looking for a period of the time until the file is really gone
+      for (int i = 0; i < 100; i++)
+      {
+         if (largeMessagesFileDir.listFiles().length != expect)
+         {
+            Thread.sleep(10);
+         }
+         else
+         {
+            break;
+         }
+      }
+
+      assertEquals(expect, largeMessagesFileDir.listFiles().length);
+   }
+
+   /**
+    * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+    */
+   protected void validateNoFilesOnLargeDir() throws Exception
+   {
+      validateNoFilesOnLargeDir(0);
+   }
+
+   
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------



More information about the hornetq-commits mailing list