[hornetq-commits] JBoss hornetq SVN: r11405 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Sep 24 02:22:26 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-24 02:22:26 -0400 (Sat, 24 Sep 2011)
New Revision: 11405

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
JBPAPP-7161 - fixed partial streamed large messages

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -15,6 +15,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 
 import javax.transaction.xa.Xid;
@@ -39,6 +40,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
 
 /**
  * 
@@ -95,6 +97,12 @@
 
    long getCurrentUniqueID();
 
+   // Confirms that a large message was finished
+   void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception;
+   
+   // Confirms that a large message was finished
+   void confirmPendingLargeMessage(long recordID) throws Exception;
+
    void storeMessage(ServerMessage message) throws Exception;
 
    void storeReference(long queueID, long messageID, boolean last) throws Exception;
@@ -125,8 +133,6 @@
 
    void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
 
-   void deleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
-
    void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
 
    void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
@@ -141,8 +147,9 @@
     * @param message This is a temporary message that holds the parsed properties. 
     *        The remoting layer can't create a ServerMessage directly, then this will be replaced.
     * @return
+    * @throws Exception 
     */
-   LargeServerMessage createLargeMessage(long id, MessageInternal message);
+   LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception;
 
    void prepare(long txID, Xid xid) throws Exception;
 
@@ -169,7 +176,8 @@
                                              final ResourceManager resourceManager,
                                              final Map<Long, Queue> queues,
                                              Map<Long, QueueBindingInfo> queueInfos,
-                                             final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
+                                             final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+                                             final Set<Pair<Long, Long>> pendingLargeMessages) throws Exception;
 
    long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -53,7 +53,6 @@
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.ExportJournal;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.JournalReaderCallback;
@@ -98,7 +97,6 @@
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUID;
 import org.hornetq.utils.XidCodecSupport;
 
 /**
@@ -130,6 +128,10 @@
    public static final byte SECURITY_RECORD = 26;
 
    // Message journal record types
+   
+   // This is used when a large message is created but not yet stored on the system.
+   // We use this to avoid temporary files missing
+   public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
 
    public static final byte ADD_LARGE_MESSAGE = 30;
 
@@ -155,8 +157,6 @@
 
    public static final byte PAGE_CURSOR_COUNTER_INC = 41;
 
-   private UUID persistentID;
-
    private final BatchingIDGenerator idGenerator;
 
    private final ReplicationManager replicator;
@@ -453,7 +453,7 @@
       }
    }
 
-   public LargeServerMessage createLargeMessage(final long id, final MessageInternal message)
+   public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception
    {
       if (isReplicated())
       {
@@ -466,11 +466,45 @@
 
       largeMessage.setMessageID(id);
 
+      if (largeMessage.isDurable())
+      {
+         // We store a marker on the journal that the large file is pending
+         long pendingRecordID = storePendingLargeMessage(id);
+         
+         largeMessage.setPendingRecordID(pendingRecordID);
+      }
+
       return largeMessage;
    }
 
    // Non transactional operations
-
+   
+   public long storePendingLargeMessage(final long messageID) throws Exception
+   {
+      long recordID = generateUniqueID();
+       
+      messageJournal.appendAddRecord(recordID,
+                                     ADD_LARGE_MESSAGE_PENDING,
+                                     new PendingLargeMessageEncoding(messageID),
+                                     true,
+                                     getContext(true));
+      
+      return recordID;
+   }
+   
+   public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
+   {
+      installLargeMessageConfirmationOnTX(tx, recordID);
+      messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+   }
+   
+   
+   /** We don't need messageID now but we are likely to need it we ever decide to support a database */
+   public void confirmPendingLargeMessage(long recordID) throws Exception
+   {
+      messageJournal.appendDeleteRecord(recordID, true, getContext());
+   }
+   
    public void storeMessage(final ServerMessage message) throws Exception
    {
       if (message.getMessageID() <= 0)
@@ -690,11 +724,6 @@
                                                      encoding);
    }
 
-   public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
-   {
-      messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
-   }
-
    public void prepare(final long txID, final Xid xid) throws Exception
    {
       messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional));
@@ -830,7 +859,8 @@
                                                     final ResourceManager resourceManager,
                                                     final Map<Long, Queue> queues,
                                                     Map<Long, QueueBindingInfo> queueInfos,
-                                                    final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+                                                    final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+                                                    final Set<Pair<Long, Long>> pendingLargeMessages) throws Exception
    {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
@@ -869,6 +899,19 @@
 
          switch (recordType)
          {
+            case ADD_LARGE_MESSAGE_PENDING:
+            {
+               PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
+               
+               pending.decode(buff);
+               
+               if (pendingLargeMessages != null)
+               {
+                  // it could be null on tests, and we don't need anything on that case
+                  pendingLargeMessages.add(new Pair<Long, Long>(record.id, pending.largeMessageID));
+               }
+               break;
+            }
             case ADD_LARGE_MESSAGE:
             {
                LargeServerMessage largeMessage = parseLargeMessage(messages, buff);
@@ -1188,7 +1231,8 @@
                                queueInfos,
                                preparedTransactions,
                                duplicateIDMap,
-                               pageSubscriptions);
+                               pageSubscriptions,
+                               pendingLargeMessages);
 
       for (PageSubscription sub : pageSubscriptions.values())
       {
@@ -1547,7 +1591,7 @@
    // Package protected ---------------------------------------------
 
    // This should be accessed from this package only
-   void deleteFile(final SequentialFile file)
+   void deleteLargeMessage(final SequentialFile file)
    {
       Runnable deleteAction = new Runnable()
       {
@@ -1656,7 +1700,8 @@
                                          final Map<Long, QueueBindingInfo> queueInfos,
                                          final List<PreparedTransactionInfo> preparedTransactions,
                                          final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
-                                         final Map<Long, PageSubscription> pageSubscriptions) throws Exception
+                                         final Map<Long, PageSubscription> pageSubscriptions,
+                                         final Set<Pair<Long, Long>> pendingLargeMessages) throws Exception
    {
       // recover prepared transactions
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
@@ -1746,7 +1791,6 @@
                      throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
                   }
 
-                  // TODO - this involves a scan - we should find a quicker way of doing it
                   MessageReference removed = queue.removeReferenceWithID(messageID);
                   
                   if (removed == null)
@@ -1867,32 +1911,29 @@
             }
          }
 
-         for (RecordInfo record : preparedTransaction.recordsToDelete)
+         for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete)
          {
-            byte[] data = record.data;
-
-            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
-            long messageID = record.id;
-
-            DeleteEncoding encoding = new DeleteEncoding();
-
-            encoding.decode(buff);
-
-            Queue queue = queues.get(encoding.queueID);
-
-            if (queue == null)
+            byte[] data = recordDeleted.data;
+            
+            if (data.length > 0)
             {
-               throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+               HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+               byte b = buff.readByte();
+               
+               switch (b)
+               {
+                  case ADD_LARGE_MESSAGE_PENDING:
+                  {
+                     long messageID = buff.readLong();
+                     pendingLargeMessages.remove(new Pair<Long, Long>(recordDeleted.id, messageID));
+                     installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
+                     break;
+                  }
+                  default:
+                     log.warn("can't locate recordType=" + b + " on loadPreparedTransaction//deleteRecords");
+               }
             }
-
-            MessageReference removed = queue.removeReferenceWithID(messageID);
-
-            if (removed != null)
-            {
-               referencesToAck.add(removed);
-            }
-
+            
          }
 
          for (MessageReference ack : referencesToAck)
@@ -2301,6 +2342,50 @@
 
    }
 
+   public static class PendingLargeMessageEncoding implements EncodingSupport
+   {
+      public long largeMessageID;
+
+      public PendingLargeMessageEncoding(final long pendingLargeMessageID)
+      {
+         this.largeMessageID = pendingLargeMessageID;
+      }
+
+      public PendingLargeMessageEncoding()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.spi.core.remoting.HornetQBuffer)
+       */
+      public void decode(final HornetQBuffer buffer)
+      {
+         largeMessageID = buffer.readLong();
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.spi.core.remoting.HornetQBuffer)
+       */
+      public void encode(final HornetQBuffer buffer)
+      {
+         buffer.writeLong(largeMessageID);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return DataConstants.SIZE_LONG;
+      }
+      
+      public String toString()
+      {
+         return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
+      }
+
+   }
+
    public static class DeliveryCountUpdateEncoding implements EncodingSupport
    {
       public long queueID;
@@ -2388,17 +2473,48 @@
 
    }
 
-   public static class DeleteEncoding extends QueueEncoding
+   public static class DeleteEncoding implements EncodingSupport
    {
+      public byte recordType;
+      
+      public long id;
+      
       public DeleteEncoding()
       {
          super();
       }
 
-      public DeleteEncoding(final long queueID)
+      public DeleteEncoding(final byte recordType, final long id)
       {
-         super(queueID);
+         this.recordType = recordType;
+         this.id = id;
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void encode(HornetQBuffer buffer)
+      {
+         buffer.writeByte(recordType);
+         buffer.writeLong(id);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void decode(HornetQBuffer buffer)
+      {
+         recordType = buffer.readByte();
+         id = buffer.readLong();
+      }
    }
 
    public static class RefEncoding extends QueueEncoding
@@ -2866,7 +2982,15 @@
 
       switch (rec)
       {
+         case ADD_LARGE_MESSAGE_PENDING:
+         {
+            PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
+            lmEncoding.decode(buffer);
+            
+            return lmEncoding;
+         }
          case ADD_LARGE_MESSAGE:
+         {
 
             LargeServerMessage largeMessage = new LargeServerMessageImpl(null);
 
@@ -2875,19 +2999,20 @@
             messageEncoding.decode(buffer);
 
             return new MessageDescribe(largeMessage);
-
+         }
          case ADD_MESSAGE:
+         {
             ServerMessage message = new ServerMessageImpl(rec, 50);
 
             message.decode(buffer);
 
             return new MessageDescribe(message);
-
+         }
          case ADD_REF:
          {
             final RefEncoding encoding = new RefEncoding();
             encoding.decode(buffer);
-            return new ReferenceDescribe(encoding);
+            return encoding;
          }
 
          case ACKNOWLEDGE_REF:
@@ -3341,5 +3466,90 @@
 
       journal.stop();
    }
+   
+   private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
+   {
+      TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+      if (txoper == null)
+      {
+         txoper = new TXLargeMessageConfirmationOperation();
+         tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
+      }
+      txoper.confirmedMessages.add(recordID);
+   }
+   
+   
+   
+   class TXLargeMessageConfirmationOperation implements TransactionOperation
+   {
+      
+      public List<Long> confirmedMessages = new LinkedList<Long>(); 
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforePrepare(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterPrepare(Transaction tx)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforeCommit(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterCommit(Transaction tx)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforeRollback(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterRollback(Transaction tx)
+      {
+         for (Long msg : confirmedMessages)
+         {
+            try
+            {
+               JournalStorageManager.this.confirmPendingLargeMessage(msg);
+            }
+            catch (Throwable e)
+            {
+               log.warn("Error while confirming large message completion on rollback for recordID=" + msg +
+                                 "->" +
+                                 e.getMessage(),
+                        e);
+            }
+         }
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+       */
+      public List<MessageReference> getRelatedMessageReferences()
+      {
+         return null;
+      }
+      
+   }
+
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -51,6 +51,8 @@
 
    private LargeServerMessage linkMessage;
    
+   private long pendingRecordID = -1;
+   
    private boolean paged;
 
    // We should only use the NIO implementation on the Journal
@@ -87,6 +89,19 @@
 
    // Public --------------------------------------------------------
 
+   /**
+    * @param pendingRecordID
+    */
+   public void setPendingRecordID(long pendingRecordID)
+   {
+      this.pendingRecordID = pendingRecordID;
+   }
+   
+   public long getPendingRecordID()
+   {
+      return this.pendingRecordID;
+   }
+
    public void setPaged()
    {
       paged = true;
@@ -228,7 +243,12 @@
    {
       validateFile();
       releaseResources();
-      storageManager.deleteFile(file);
+      storageManager.deleteLargeMessage(file);
+      if (pendingRecordID >= 0)
+      {
+         storageManager.confirmPendingLargeMessage(pendingRecordID);
+         pendingRecordID = -1;
+      }
    }
 
    public boolean isFileExists() throws Exception

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -164,7 +164,22 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.LargeServerMessage#setPendingRecordID(long)
+    */
+   public void setPendingRecordID(long pendingRecordID)
+   {
+   }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.LargeServerMessage#getPendingRecordID()
+    */
+   public long getPendingRecordID()
+   {
+      return -1;
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -16,6 +16,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,6 +46,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
 
 /**
  * 
@@ -296,7 +298,8 @@
                                                     final ResourceManager resourceManager,
                                                     final Map<Long, Queue> queues,
                                                     Map<Long, QueueBindingInfo> queueInfos,
-                                                    final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+                                                    final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+                                                    Set<Pair<Long, Long>> pendingLM) throws Exception
    {
       return new JournalLoadInformation();
    }
@@ -569,4 +572,18 @@
       
    }
 
-}
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessageTX(org.hornetq.core.transaction.Transaction, long, long)
+    */
+   public void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
+    */
+   public void confirmPendingLargeMessage(long recordID) throws Exception
+   {
+   }
+
+ }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -47,6 +47,7 @@
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueInfo;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
@@ -931,6 +932,10 @@
 
          if (store.page(message, context, entry.getValue()))
          {
+            if (message.isLargeMessage())
+            {
+               confirmLargeMessageSend(tx, message);
+            }
 
             // We need to kick delivery so the Queues may check for the cursors case they are empty
             schedulePageDelivery(tx, entry);
@@ -984,6 +989,11 @@
                   {
                      storageManager.storeMessage(message);
                   }
+
+                  if (message.isLargeMessage())
+                  {
+                     confirmLargeMessageSend(tx, message);
+                  }
                }
 
                if (tx != null)
@@ -1040,6 +1050,31 @@
    }
 
    /**
+    * @param tx
+    * @param message
+    * @throws Exception
+    */
+   private void confirmLargeMessageSend(Transaction tx, final ServerMessage message) throws Exception
+   {
+      LargeServerMessage largeServerMessage = (LargeServerMessage)message;
+      if (largeServerMessage.getPendingRecordID() >= 0)
+      {
+         if (tx == null)
+         {
+            storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
+         }
+         else
+         {
+  
+            storageManager.confirmPendingLargeMessageTX(tx,
+                                                        largeServerMessage.getMessageID(),
+                                                        largeServerMessage.getPendingRecordID());
+         }
+         largeServerMessage.setPendingRecordID(-1);
+      }
+   }
+
+   /**
     * This will kick a delivery async on the queue, so the queue may have a chance to depage messages
     * @param tx
     * @param entry

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -28,6 +28,10 @@
 
    /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
    void setLinkedMessage(LargeServerMessage message);
+   
+   void setPendingRecordID(long pendingRecordID);
+   
+   long getPendingRecordID();
 
    boolean isFileExists() throws Exception;
    

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -92,6 +92,7 @@
 import org.hornetq.core.server.Divert;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MemoryManager;
 import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.Queue;
@@ -1545,13 +1546,16 @@
       }
 
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
+      
+      HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<Pair<Long, Long>>();
 
       journalInfo[1] = storageManager.loadMessageJournal(postOffice,
                                                          pagingManager,
                                                          resourceManager,
                                                          queues,
                                                          queueBindingInfosMap,
-                                                         duplicateIDMap);
+                                                         duplicateIDMap,
+                                                         pendingLargeMessages);
 
       for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
       {
@@ -1564,6 +1568,16 @@
             cache.load(entry.getValue());
          }
       }
+      
+      for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
+      {
+         log.info("Deleting pending large message as it wasn't completed:" + msgToDelete);
+         LargeServerMessage msg = storageManager.createLargeMessage();
+         msg.setMessageID(msgToDelete.b);
+         msg.setPendingRecordID(msgToDelete.a);
+         msg.setDurable(true);
+         msg.deleteFile();
+      }
 
       return journalInfo;
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -1292,6 +1292,11 @@
 
    // Public
    // ----------------------------------------------------------------------------
+   
+   public void clearLargeMessage()
+   {
+      currentLargeMessage = null;
+   }
 
    // Private
    // ----------------------------------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -24,6 +24,8 @@
  */
 public class TransactionPropertyIndexes
 {
+   
+   public static final int LARGE_MESSAGE_CONFIRMATIONS = 1;
 
    public static final int PAGE_SYNC = 2;
    

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -0,0 +1,539 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A LargeMessageTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created 29-Sep-08 4:04:10 PM
+ *
+ *
+ */
+public class InterruptedLargeMessageTest extends LargeMessageTestBase
+{
+   // Constants -----------------------------------------------------
+
+   final static int RECEIVE_WAIT_TIME = 60000;
+
+   private final int LARGE_MESSAGE_SIZE = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 3;
+
+   // Attributes ----------------------------------------------------
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   // Static --------------------------------------------------------
+   private final Logger log = Logger.getLogger(LargeMessageTest.class);
+
+   protected ServerLocator locator;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = true;
+      clearData();
+      locator = createFactory(isNetty());
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      locator.close();
+      super.tearDown();
+   }
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   public void testInterruptLargeMessageSend() throws Exception
+   {
+      final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      ClientSession session = null;
+
+      LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = true;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.getConfiguration()
+               .getInterceptorClassNames()
+               .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+         server.start();
+
+         locator.setBlockOnNonDurableSend(false);
+         locator.setBlockOnDurableSend(false);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, true, true);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         clientFile.setExpiration(System.currentTimeMillis());
+
+         producer.send(clientFile);
+
+         Thread.sleep(500);
+
+         for (ServerSession srvSession : server.getSessions())
+         {
+            ((ServerSessionImpl)srvSession).clearLargeMessage();
+         }
+
+         server.stop(false);
+         server.start();
+
+         server.stop();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testSendNonPersistentQueue() throws Exception
+   {
+      final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      ClientSession session = null;
+
+      LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         // server.getConfiguration()
+         // .getInterceptorClassNames()
+         // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+         server.start();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, true, true);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, false);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         for (int i = 0; i < 10; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+            producer.send(clientFile);
+         }
+         session.commit();
+
+         session.close();
+
+         session = sf.createSession(false, false);
+
+         ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+         session.start();
+
+         for (int h = 0; h < 5; h++)
+         {
+            for (int i = 0; i < 10; i++)
+            {
+               ClientMessage clientMessage = cons.receive(5000);
+               assertNotNull(clientMessage);
+               for (int countByte = 0; countByte < messageSize; countByte++)
+               {
+                  assertEquals(getSamplebyte(countByte), clientMessage.getBodyBuffer().readByte());
+               }
+               clientMessage.acknowledge();
+            }
+            session.rollback();
+         }
+
+         server.stop(false);
+         server.start();
+
+         server.stop();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testSendPaging() throws Exception
+   {
+      final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      ClientSession session = null;
+
+      LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+      try
+      {
+         server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
+         
+         // server.getConfiguration()
+         // .getInterceptorClassNames()
+         // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+         server.start();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, true, true);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+         
+         server.getPagingManager().getPageStore(ADDRESS).startPaging();
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         for (int i = 0; i < 10; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+            producer.send(clientFile);
+         }
+         session.commit();
+
+         for (int h = 0; h < 5; h++)
+         {
+            session.close();
+            
+            sf.close();
+            
+            server.stop();
+            
+            server.start();
+            
+            sf = locator.createSessionFactory();
+
+            session = sf.createSession(false, false);
+
+            ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+            session.start();
+
+            for (int i = 0; i < 10; i++)
+            {
+               ClientMessage clientMessage = cons.receive(5000);
+               
+               System.out.println("msg " + clientMessage);
+               assertNotNull(clientMessage);
+               for (int countByte = 0; countByte < messageSize; countByte++)
+               {
+                  assertEquals(getSamplebyte(countByte), clientMessage.getBodyBuffer().readByte());
+               }
+               clientMessage.acknowledge();
+            }
+            if (h == 4)
+            {
+               session.commit();
+            }
+            else
+            {
+               session.rollback();
+            }
+         }
+
+         server.stop(false);
+         server.start();
+
+         server.stop();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testSendPreparedXA() throws Exception
+   {
+      final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      ClientSession session = null;
+
+      LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+      try
+      {
+         server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
+         
+         // server.getConfiguration()
+         // .getInterceptorClassNames()
+         // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+         server.start();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, false, false);
+         
+         Xid xid1 = newXID();
+         Xid xid2 = newXID();
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         session.start(xid1, XAResource.TMNOFLAGS);
+
+         for (int i = 0; i < 10; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+            clientFile.putIntProperty("txid", 1);
+            producer.send(clientFile);
+         }
+         session.end(xid1, XAResource.TMSUCCESS);
+         
+         session.prepare(xid1);
+
+
+         session.start(xid2, XAResource.TMNOFLAGS);
+
+
+         for (int i = 0; i < 10; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+            clientFile.putIntProperty("txid", 2);
+            producer.send(clientFile);
+         }
+         session.end(xid2, XAResource.TMSUCCESS);
+         
+         session.prepare(xid2);
+         
+         session.close();
+         sf.close();
+         
+         server.stop(false);
+         server.start();
+         
+         for (int start = 0 ; start < 2; start++)
+         {
+            
+            sf = locator.createSessionFactory();
+            
+            if (start == 0)
+            {
+               session = sf.createSession(true, false, false);
+               session.commit(xid1, false);
+               session.close();
+            }
+            
+            session = sf.createSession(false, false, false);
+            ClientConsumer cons1 = session.createConsumer(ADDRESS);
+            session.start();
+            for (int i = 0 ; i < 10; i++)
+            {
+               ClientMessage msg = cons1.receive(5000);
+               assertNotNull(msg);
+               assertEquals(1, msg.getIntProperty("txid").intValue());
+               msg.acknowledge();
+            }
+            
+            if (start == 1)
+            {
+               session.commit();
+            }
+            else
+            {
+               session.rollback();
+            }
+            
+            session.close();
+            sf.close();
+   
+            server.stop();
+            server.start();
+         }
+         server.stop();
+         
+         validateNoFilesOnLargeDir(10);
+         
+         server.start();
+
+         sf = locator.createSessionFactory();
+         
+         session = sf.createSession(true, false, false);
+         session.rollback(xid2);
+         
+         sf.close();
+         
+         server.stop();
+         server.start();
+         server.stop();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public static class LargeMessageTestInterceptorIgnoreLastPacket implements Interceptor
+   {
+
+      public static boolean interruptMessages = false;
+
+      /* (non-Javadoc)
+       * @see org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet, org.hornetq.spi.core.protocol.RemotingConnection)
+       */
+      public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+      {
+         if (packet instanceof SessionSendContinuationMessage)
+         {
+            SessionSendContinuationMessage msg = (SessionSendContinuationMessage)packet;
+            if (!msg.isContinues() && interruptMessages)
+            {
+               System.out.println("Ignored a message");
+               return false;
+            }
+         }
+         return true;
+      }
+
+      // Constants -----------------------------------------------------
+
+      // Attributes ----------------------------------------------------
+
+      // Static --------------------------------------------------------
+
+      // Constructors --------------------------------------------------
+
+      // Public --------------------------------------------------------
+
+      // Package protected ---------------------------------------------
+
+      // Protected -----------------------------------------------------
+
+      // Private -------------------------------------------------------
+
+      // Inner classes -------------------------------------------------
+
+   }
+
+}

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -2973,15 +2973,11 @@
       super.setUp();
       clearData();
       locator = createFactory(isNetty());
-      log.info("\n*********************************************************************************\n Starting " + getName() +
-               "\n*********************************************************************************");
    }
 
    @Override
    protected void tearDown() throws Exception
    {
-      log.info("\n*********************************************************************************\nDone with  " + getName() +
-               "\n*********************************************************************************");
       locator.close();
       super.tearDown();
    }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -76,7 +76,7 @@
 
       journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
       
-      journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
+      journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null, null);
 
       assertEquals(98, deletedMessage.size());
       

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -101,7 +101,7 @@
 
          Map<Long, Queue> queues = new HashMap<Long, Queue>();
 
-         journal.loadMessageJournal(postOffice, null, null, queues, null, null);
+         journal.loadMessageJournal(postOffice, null, null, queues, null, null, null);
 
          journal.stop();
 
@@ -111,7 +111,7 @@
 
          queues = new HashMap<Long, Queue>();
 
-         journal.loadMessageJournal(postOffice, null, null, queues, null, null);
+         journal.loadMessageJournal(postOffice, null, null, queues, null, null, null);
 
          queueBindingInfos = new ArrayList<QueueBindingInfo>();
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -128,7 +128,7 @@
       
       Map<Long, Queue> queues = new HashMap<Long, Queue>();
 
-      journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
+      journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null, null);
    }
 
    /**

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -18,6 +18,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -72,6 +73,7 @@
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
 import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.RandomUtil;
@@ -1231,7 +1233,8 @@
                                                        final ResourceManager resourceManager,
                                                        final Map<Long, Queue> queues,
                                                        Map<Long, QueueBindingInfo> queueInfos,
-                                                       final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+                                                       final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+                                                       Set<Pair<Long, Long>> pendingLargeMessages) throws Exception
       {
          return new JournalLoadInformation();
       }
@@ -1680,8 +1683,22 @@
          
       }
 
-   }
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessageTX(org.hornetq.core.transaction.Transaction, long, long)
+       */
+      public void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception
+      {
+      }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
+       */
+      public void confirmPendingLargeMessage(long recordID) throws Exception
+      {
+      }
+
+    }
+
    class FakeStoreFactory implements PagingStoreFactory
    {
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -112,7 +112,8 @@
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     null,
-                                    mapDups);
+                                    mapDups,
+                                    null);
 
          Assert.assertEquals(0, mapDups.size());
 
@@ -134,7 +135,8 @@
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     null,
-                                    mapDups);
+                                    mapDups,
+                                    null);
 
          Assert.assertEquals(1, mapDups.size());
 
@@ -163,7 +165,8 @@
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     null,
-                                    mapDups);
+                                    mapDups,
+                                    null);
 
          Assert.assertEquals(1, mapDups.size());
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-24 06:22:26 UTC (rev 11405)
@@ -306,9 +306,12 @@
          server = HornetQServers.newHornetQServer(configuration, false);
       }
 
-      for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+      if (settings != null)
       {
-         server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+         {
+            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         }
       }
 
       AddressSettings defaultSetting = new AddressSettings();



More information about the hornetq-commits mailing list