[hornetq-commits] JBoss hornetq SVN: r8189 - in trunk: src/main/org/hornetq/core/postoffice/impl and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 3 03:45:43 EST 2009


Author: timfox
Date: 2009-11-03 03:45:42 -0500 (Tue, 03 Nov 2009)
New Revision: 8189

Removed:
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
Modified:
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
   trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
removed over-complex large message cleanup code

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -36,6 +36,9 @@
  *
  *
  */
+
+
+//FIXME - this class should be renamed to just large message
 public class JournalLargeServerMessage extends ServerMessageImpl implements LargeServerMessage
 {
    // Constants -----------------------------------------------------
@@ -54,6 +57,8 @@
    private SequentialFile file;
 
    private long bodySize = -1;
+   
+   private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
 
    // Static --------------------------------------------------------
 
@@ -177,29 +182,18 @@
    public void decode(final HornetQBuffer buffer)
    {
       file = null;
-      try
-      {
-         this.setStored();
-      }
-      catch (Exception e)
-      {
-         // File still null, this wasn't supposed to happen ever.
-         log.warn(e.getMessage(), e);
-      }
       decodeHeadersAndProperties(buffer);
    }
 
-   private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
-
    public synchronized void incrementDelayDeletionCount()
    {
       this.delayDeletionCount.incrementAndGet();
    }
-   
+
    public synchronized void decrementDelayDeletionCount() throws Exception
    {
       int count = this.delayDeletionCount.decrementAndGet();
-      
+
       if (count == 0)
       {
          checkDelete();
@@ -207,7 +201,7 @@
    }
 
    private void checkDelete() throws Exception
-   {
+   {      
       if (getRefCount() <= 0)
       {
          if (linkMessage != null)
@@ -265,7 +259,7 @@
 
    public boolean isFileExists() throws Exception
    {
-      SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(), isStored());
+      SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID());
       return localfile.exists();
    }
 
@@ -284,18 +278,6 @@
       return memoryEstimate;
    }
 
-   @Override
-   public void setStored() throws Exception
-   {
-      super.setStored();
-      releaseResources();
-
-      if (file != null && linkMessage == null)
-      {
-         storageManager.completeLargeMessage(this);
-      }
-   }
-
    public synchronized void releaseResources()
    {
       if (file != null && file.isOpen())
@@ -323,12 +305,12 @@
          idToUse = linkMessage.getMessageID();
       }
 
-      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, isStored());
+      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse);
 
       ServerMessage newMessage = new JournalLargeServerMessage(linkMessage == null ? this
-                                                                                              : (JournalLargeServerMessage)linkMessage,
-                                                                           newfile,
-                                                                           newID);
+                                                                                  : (JournalLargeServerMessage)linkMessage,
+                                                               newfile,
+                                                               newID);
 
       return newMessage;
    }
@@ -360,7 +342,7 @@
             throw new RuntimeException("MessageID not set on LargeMessage");
          }
 
-         file = storageManager.createFileForLargeMessage(getMessageID(), isStored());
+         file = storageManager.createFileForLargeMessage(getMessageID());
 
          file.open();
 
@@ -369,14 +351,6 @@
       }
    }
 
-//   /* (non-Javadoc)
-//    * @see org.hornetq.core.server.LargeServerMessage#getLinkedMessage()
-//    */
-//   public LargeServerMessage getLinkedMessage()
-//   {
-//      return linkMessage;
-//   }
-
    /* (non-Javadoc)
     * @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
     */
@@ -390,7 +364,7 @@
 
       this.linkMessage = message;
 
-      file = storageManager.createFileForLargeMessage(message.getMessageID(), true);
+      file = storageManager.createFileForLargeMessage(message.getMessageID());
       try
       {
          file.open();

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -95,8 +95,9 @@
 
    private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
 
-   //grouping journal record type
+   // grouping journal record type
    public static final byte GROUP_RECORD = 41;
+
    // Bindings journal record type
 
    public static final byte QUEUE_BINDING_RECORD = 21;
@@ -190,13 +191,13 @@
       SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
 
       Journal localBindings = new JournalImpl(1024 * 1024,
-                                        2,
-                                        config.getJournalCompactMinFiles(),
-                                        config.getJournalCompactPercentage(),
-                                        bindingsFF,
-                                        "hornetq-bindings",
-                                        "bindings",
-                                        1);
+                                              2,
+                                              config.getJournalCompactMinFiles(),
+                                              config.getJournalCompactPercentage(),
+                                              bindingsFF,
+                                              "hornetq-bindings",
+                                              "bindings",
+                                              1);
 
       if (replicator != null)
       {
@@ -254,17 +255,17 @@
       }
       else
       {
-      this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
+         this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
       }
 
       Journal localMessage = new JournalImpl(config.getJournalFileSize(),
-                                       config.getJournalMinFiles(),
-                                       config.getJournalCompactMinFiles(),
-                                       config.getJournalCompactPercentage(),
-                                       journalFF,
-                                       "hornetq-data",
-                                       "hq",
-                                       config.getJournalMaxAIO());
+                                             config.getJournalMinFiles(),
+                                             config.getJournalCompactMinFiles(),
+                                             config.getJournalCompactPercentage(),
+                                             journalFF,
+                                             "hornetq-data",
+                                             "hq",
+                                             config.getJournalMaxAIO());
 
       if (replicator != null)
       {
@@ -471,8 +472,8 @@
 
    public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
    {
-      ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-                                                                         ref.getQueue().getID());
+      ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
+                                                                                                            .getID());
 
       messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                         SET_SCHEDULED_DELIVERY_TIME,
@@ -548,12 +549,12 @@
       messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true);
       return id;
    }
-   
+
    public void deleteHeuristicCompletion(long id) throws Exception
    {
       messageJournal.appendDeleteRecord(id, true);
    }
-   
+
    public void deletePageTransactional(final long txID, final long recordID) throws Exception
    {
       messageJournal.appendDeleteRecordTransactional(txID, recordID);
@@ -561,8 +562,8 @@
 
    public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
    {
-      ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-                                                                         ref.getQueue().getID());
+      ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
+                                                                                                            .getID());
 
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      ref.getMessage().getMessageID(),
@@ -627,19 +628,6 @@
                                         updateInfo,
                                         syncNonTransactional);
    }
-   /**
-    * @param journalLargeServerMessage
-    * @throws Exception
-    */
-   public void completeLargeMessage(JournalLargeServerMessage message) throws Exception
-   {
-      if (isReplicated())
-      {
-         replicator.largeMessageEnd(message.getMessageID());
-      }
-      SequentialFile fileToRename = createFileForLargeMessage(message.getMessageID(), true);
-      message.getFile().renameTo(fileToRename.getFileName());
-   }
 
    private static final class AddMessageRecord
    {
@@ -654,11 +642,11 @@
 
       int deliveryCount;
    }
-   
+
    private class LargeMessageTXFailureCallback implements TransactionFailureCallback
    {
       private final Map<Long, ServerMessage> messages;
-      
+
       public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
       {
          super();
@@ -687,7 +675,7 @@
             }
          }
       }
-      
+
    }
 
    public void loadMessageJournal(final PostOffice postOffice,
@@ -701,7 +689,7 @@
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
-      
+
       messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(messages));
       
       ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
@@ -926,7 +914,7 @@
             msg.decrementDelayDeletionCount();
          }
       }
-      
+
       if (perfBlastPages != -1)
       {
          messageJournal.perfBlast(perfBlastPages);
@@ -946,25 +934,24 @@
       LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
 
       messageEncoding.decode(buff);
-      
+
       Long originalMessageID = (Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-      
+
       // Using the linked file by the original file
       if (originalMessageID != null)
       {
          LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-         
+
          if (originalMessage == null)
          {
             // this could happen if the message was deleted but the file still exists as the file still being used
             originalMessage = createLargeMessage();
             originalMessage.setMessageID(originalMessageID);
-            originalMessage.setStored();
             messages.put(originalMessageID, originalMessage);
          }
-         
+
          originalMessage.incrementDelayDeletionCount();
-         
+
          largeMessage.setLinkedMessage(originalMessage);
       }
       return largeMessage;
@@ -992,7 +979,7 @@
 
          // Use same method as load message journal to prune out acks, so they don't get added.
          // Then have reacknowledge(tx) methods on queue, which needs to add the page size
-
+  
          // first get any sent messages for this tx and recreate
          for (RecordInfo record : preparedTransaction.records)
          {
@@ -1001,13 +988,13 @@
             HornetQBuffer buff = ChannelBuffers.wrappedBuffer(data);
 
             byte recordType = record.getUserRecordType();
-
+            
             switch (recordType)
             {
                case ADD_LARGE_MESSAGE:
-               {
+               {                 
                   messages.put(record.id, parseLargeMessage(messages, buff));
-                  
+
                   break;
                }
                case ADD_MESSAGE:
@@ -1022,6 +1009,7 @@
                }
                case ADD_REF:
                {
+                
                   long messageID = record.id;
 
                   RefEncoding encoding = new RefEncoding();
@@ -1163,11 +1151,13 @@
          resourceManager.putTransaction(xid, tx);
       }
    }
-   
-   //grouping handler operations
+
+   // grouping handler operations
    public void addGrouping(GroupBinding groupBinding) throws Exception
    {
-      GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName());
+      GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
+                                                               groupBinding.getGroupId(),
+                                                               groupBinding.getClusterName());
       bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD, groupingEncoding, true);
    }
 
@@ -1222,7 +1212,7 @@
 
             bindingEncoding.setId(id);
 
-            queueBindingInfos.add(bindingEncoding);          
+            queueBindingInfos.add(bindingEncoding);
          }
          else if (rec == PERSISTENT_ID_RECORD)
          {
@@ -1236,7 +1226,7 @@
          {
             idGenerator.loadState(record.id, buffer);
          }
-         else if(rec == GROUP_RECORD)
+         else if (rec == GROUP_RECORD)
          {
             GroupingEncoding encoding = new GroupingEncoding();
             encoding.decode(buffer);
@@ -1285,7 +1275,7 @@
       // Must call close to make sure last id is persisted
       if (idGenerator != null)
       {
-      idGenerator.close();
+         idGenerator.close();
       }
 
       bindingsJournal.stop();
@@ -1371,7 +1361,7 @@
       if (executor == null)
       {
          deleteAction.run();
-   }
+      }
       else
       {
          executor.execute(deleteAction);
@@ -1382,16 +1372,9 @@
     * @param messageID
     * @return
     */
-   SequentialFile createFileForLargeMessage(final long messageID, final boolean stored)
+   SequentialFile createFileForLargeMessage(final long messageID)
    {
-      if (stored)
-      {
-         return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
-      }
-      else
-      {
-         return largeMessagesFactory.createSequentialFile(messageID + ".tmp", -1);
-      }
+      return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);      
    }
 
    // Private ----------------------------------------------------------------------------------
@@ -1464,13 +1447,13 @@
          return XidCodecSupport.getXidEncodeLength(xid);
       }
    }
-   
+
    private static class HeuristicCompletionEncoding implements EncodingSupport
    {
       Xid xid;
 
       boolean isCommit;
-      
+
       HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
       {
          this.xid = xid;
@@ -1921,5 +1904,4 @@
 
    }
 
-
 }

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -600,8 +600,6 @@
       {
          if (pagingManager.page(message, true))
          {
-            message.setStored();
-
             return;
          }
       }
@@ -668,7 +666,7 @@
    }
 
    public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
-   {
+   {     
       MessageReference reference = message.createReference(queue);
 
       Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
@@ -680,8 +678,6 @@
 
       message.incrementDurableRefCount();
 
-      message.setStored();
-
       PagingStore store = pagingManager.getPageStore(message.getDestination());
 
       message.incrementRefCount(store, reference);
@@ -865,11 +861,11 @@
          {
             reference.setScheduledDeliveryTime(scheduledDeliveryTime);
          }
-
+         
          if (message.isDurable() && queue.isDurable())
          {
             int durableRefCount = message.incrementDurableRefCount();
-
+            
             if (durableRefCount == 1)
             {
                if (tx != null)
@@ -880,8 +876,6 @@
                {
                   storageManager.storeMessage(message);
                }
-
-               message.setStored();
             }
 
             if (tx != null)
@@ -1163,7 +1157,6 @@
             {
                if (pagingManager.page(message, tx.getID(), first))
                {
-                  message.setStored();
                   if (message.isDurable())
                   {
                      // We only create pageTransactions if using persistent messages
@@ -1232,7 +1225,7 @@
       public void beforeRollback(Transaction tx) throws Exception
       {
          // Reverse the ref counts, and paging sizes
-
+         
          for (MessageReference ref : refs)
          {
             ServerMessage message = ref.getMessage();

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -31,8 +31,6 @@
 
    long messageId;
 
-   boolean isDelete;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -42,11 +40,10 @@
       super(REPLICATION_LARGE_MESSAGE_END);
    }
 
-   public ReplicationLargemessageEndMessage(final long messageId, final boolean isDelete)
+   public ReplicationLargemessageEndMessage(final long messageId)
    {
       this();
       this.messageId = messageId;
-      this.isDelete = isDelete;
    }
 
    // Public --------------------------------------------------------
@@ -54,21 +51,19 @@
    @Override
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BOOLEAN;
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
    }
 
    @Override
    public void encodeBody(final HornetQBuffer buffer)
    {
       buffer.writeLong(messageId);
-      buffer.writeBoolean(isDelete);
    }
 
    @Override
    public void decodeBody(final HornetQBuffer buffer)
    {
       messageId = buffer.readLong();
-      isDelete = buffer.readBoolean();
    }
 
    /**
@@ -79,14 +74,6 @@
       return messageId;
    }
 
-   /**
-    * @return the isDelete
-    */
-   public boolean isDelete()
-   {
-      return isDelete;
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -78,8 +78,6 @@
    
    void largeMessageWrite(long messageId, byte [] body);
    
-   void largeMessageEnd(long messageId);
-   
    void largeMessageDelete(long messageId);
 
 }

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -66,7 +66,7 @@
    // Attributes ----------------------------------------------------
 
    private static final boolean trace = log.isTraceEnabled();
-   
+
    private static void trace(String msg)
    {
       log.trace(msg);
@@ -85,7 +85,7 @@
    private PagingManager pageManager;
 
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
-   
+
    private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
 
    // Constructors --------------------------------------------------
@@ -201,7 +201,7 @@
    {
       channel.close();
       storage.stop();
-      
+
       for (ConcurrentMap<Integer, Page> map : pageIndex.values())
       {
          for (Page page : map.values())
@@ -216,15 +216,14 @@
             }
          }
       }
-      
+
       pageIndex.clear();
-      
-      
+
       for (LargeServerMessage largeMessage : largeMessages.values())
       {
          largeMessage.releaseResources();
       }
-      
+
       largeMessages.clear();
    }
 
@@ -254,30 +253,17 @@
     */
    private void handleLargeMessageEnd(ReplicationLargemessageEndMessage packet)
    {
-      LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete());
+      LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
+      
       if (message != null)
       {
-         if (packet.isDelete())
+         try
          {
-            try
-            {
-               message.deleteFile();
-            }
-            catch (Exception e)
-            {
-               log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
-            }
+            message.deleteFile();
          }
-         else
+         catch (Exception e)
          {
-            try
-            {
-               message.setStored();
-            }
-            catch (Exception e)
-            {
-               log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
-            }
+            log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
          }
       }
    }
@@ -293,13 +279,12 @@
          message.addBytes(packet.getBody());
       }
    }
-   
-   
-   private LargeServerMessage lookupLargeMessage(long messageId, boolean isDelete)
+
+   private LargeServerMessage lookupLargeMessage(long messageId, boolean delete)
    {
       LargeServerMessage message;
-      
-      if (isDelete)
+
+      if (delete)
       {
          message = largeMessages.remove(messageId);
       }
@@ -307,12 +292,12 @@
       {
          message = largeMessages.get(messageId);
       }
-      
+
       if (message == null)
       {
          log.warn("Large MessageID " + messageId + "  is not available on backup server. Ignoring replication message");
       }
-      
+
       return message;
 
    }
@@ -328,7 +313,6 @@
       this.largeMessages.put(largeMessage.getMessageID(), largeMessage);
    }
 
-
    /**
     * @param packet
     */
@@ -433,12 +417,11 @@
       ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
 
       Page page = pages.remove(packet.getPageNumber());
-      
+
       if (page == null)
       {
          page = getPage(packet.getStoreName(), packet.getPageNumber());
       }
-      
 
       if (page != null)
       {

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -276,22 +276,11 @@
    {
       if (enabled)
       {
-         sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, true));
+         sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
       }
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#largeMessageEnd(long)
-    */
-   public void largeMessageEnd(long messageId)
-   {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, false));
-      }
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])
     */
    public void largeMessageWrite(long messageId, byte[] body)

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -47,11 +47,6 @@
 
    int getMemoryEstimate();
 
-   //TODO - do we really need this? Can't we use durable ref count?
-   void setStored() throws Exception;
-
-   boolean isStored();
-
    int getRefCount();
 
    ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -1379,9 +1379,10 @@
                                                         config.getName(),
                                                         config.getAddress(),
                                                         config.getTimeout());
-         }
-         log.info("deploying grouping handler: " + groupingHandler);
+         }        
+         
          this.groupingHandler = groupingHandler;
+         
          managementService.addNotificationListener(groupingHandler);
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -42,8 +42,6 @@
    /** Global reference counts for paging control */
    private final AtomicInteger refCount = new AtomicInteger(0);
 
-   private volatile boolean stored;
-
    // We cache this
    private volatile int memoryEstimate = -1;
 
@@ -102,16 +100,6 @@
       return ref;
    }
 
-   public boolean isStored()
-   {
-      return stored;
-   }
-
-   public void setStored() throws Exception
-   {
-      stored = true;
-   }
-
    public int incrementRefCount(final PagingStore pagingStore, final MessageReference reference) throws Exception
    {
       int count = refCount.incrementAndGet();

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -324,7 +324,7 @@
       remotingConnection.removeFailureListener(this);
 
       // Return any outstanding credits
-      
+
       closed = true;
 
       for (CreditManagerHolder holder : creditManagerHolders.values())
@@ -648,7 +648,7 @@
       {
          log.error("Failed to query consumer deliveries", e);
       }
-      
+
       sendResponse(message, null, false, false);
    }
 
@@ -1064,9 +1064,9 @@
             response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
          }
          else
-         {
+         {                     
             Transaction theTx = resourceManager.removeTransaction(xid);
-
+            
             if (theTx == null)
             {
                // checked heuristic committed transactions
@@ -1436,9 +1436,9 @@
       Packet response = null;
 
       ServerMessage message = packet.getServerMessage();
-            
+
       try
-      {         
+      {
          long id = storageManager.generateUniqueID();
 
          message.setMessageID(id);
@@ -1510,9 +1510,9 @@
             currentLargeMessage = null;
 
             message.releaseResources();
-                        
+
             send(message);
-            
+
             releaseOutStanding(message);
          }
 
@@ -1857,7 +1857,7 @@
    private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
    {
       boolean wasStarted = started;
-
+      
       List<MessageReference> toCancel = new ArrayList<MessageReference>();
 
       for (ServerConsumer consumer : consumers.values())
@@ -1874,7 +1874,7 @@
       {
          ref.getQueue().cancel(theTx, ref);
       }
-
+            
       theTx.rollback();
 
       if (wasStarted)
@@ -1899,7 +1899,7 @@
 
       tx = new TransactionImpl(storageManager);
    }
-   
+
    /*
     * The way flow producer flow control works is as follows:
     * The client can only send messages as long as it has credits. It requests credits from the server
@@ -1914,11 +1914,11 @@
    private void releaseOutStanding(final ServerMessage message) throws Exception
    {
       CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
-      
+
       int size = message.getEncodeSize();
-      
+
       holder.outstandingCredits -= size;
-      
+
       holder.store.returnProducerCredits(size);
    }
 
@@ -1945,6 +1945,6 @@
       else
       {
          postOffice.route(msg, tx);
-      }  
+      }
    }
 }

Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -62,7 +62,7 @@
    private final long createTime;
 
    public TransactionImpl(final StorageManager storageManager)
-   {
+   {    
       this.storageManager = storageManager;
 
       xid = null;
@@ -73,7 +73,7 @@
    }
 
    public TransactionImpl(final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
-   {
+   {      
       this.storageManager = storageManager;
 
       this.xid = xid;
@@ -84,7 +84,7 @@
    }
 
    public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager)
-   {
+   {    
       this.storageManager = storageManager;
 
       this.xid = xid;
@@ -276,7 +276,7 @@
          if (operations != null)
          {
             for (TransactionOperation operation : operations)
-            {
+            {              
                operation.beforeRollback(this);
             }
          }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -37,6 +37,7 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleString;
 
@@ -436,8 +437,10 @@
          consumerExpiry = session.createConsumer(ADDRESS_DLA);
 
          msg1 = consumerExpiry.receive(5000);
+         
          assertNotNull(msg1);
-         // msg1.acknowledge();
+         
+         msg1.acknowledge();
 
          for (int i = 0; i < messageSize; i++)
          {
@@ -1348,19 +1351,28 @@
 
    }
 
-   public void testSendRollbackXA() throws Exception
+   public void testSendRollbackXADurable() throws Exception
    {
-      internalTestSendRollback(true);
+      internalTestSendRollback(true, true);
    }
+   
+   public void testSendRollbackXANonDurable() throws Exception
+   {
+      internalTestSendRollback(true, false);
+   }
 
-   public void testSendRollback() throws Exception
+   public void testSendRollbackDurable() throws Exception
    {
-      internalTestSendRollback(false);
+      internalTestSendRollback(false, true);
    }
+   
+   public void testSendRollbackNonDurable() throws Exception
+   {
+      internalTestSendRollback(false, false);
+   }
 
-   private void internalTestSendRollback(final boolean isXA) throws Exception
+   private void internalTestSendRollback(final boolean isXA, final boolean durable) throws Exception
    {
-
       ClientSession session = null;
 
       try
@@ -1379,13 +1391,13 @@
 
          if (isXA)
          {
-            xid = newXID();
+            xid = RandomUtil.randomXid();
             session.start(xid, XAResource.TMNOFLAGS);
          }
 
          ClientProducer producer = session.createProducer(ADDRESS);
 
-         Message clientFile = createLargeClientMessage(session, 50000, false);
+         Message clientFile = createLargeClientMessage(session, 50000, durable);
 
          for (int i = 0; i < 1; i++)
          {

Deleted: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -1,196 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.largemessage;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.integration.largemessage.mock.MockConnector;
-import org.hornetq.tests.integration.largemessage.mock.MockConnectorFactory;
-
-/**
- * A LargeMessageCleanupTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class LargeMessageCleanupTest extends LargeMessageTestBase
-{
-   // Constants -----------------------------------------------------
-   
-   private static final Logger log = Logger.getLogger(LargeMessageCleanupTest.class);
-
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   
-   
-   public void testCleanup() throws Exception
-   {
-      clearData();
-      
-      FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(), "1234.tmp"));
-      
-      fileOut.write(new byte[1024]); // anything
-      
-      fileOut.close();
-
-      Configuration config = createDefaultConfig();
-
-      server = createServer(true, config, -1, -1, new HashMap<String, AddressSettings>());
-
-      server.start();
-
-      try
-      {
-
-         File directoryLarge = new File(getLargeMessagesDir());
-
-         assertEquals("The startup should have been deleted 1234.tmp", 0, directoryLarge.list().length);
-      }
-      finally
-      {
-         server.stop();
-      }
-   }
-
-   public void testFailureOnSendingFile() throws Exception
-   {
-      clearData();
-
-      Configuration config = createDefaultConfig();
-
-      server = createServer(true, config, 10 * 1024, 20 * 1024, new HashMap<String, AddressSettings>());
-
-      server.start();
-
-      final int numberOfBytes = 2 * 1024 * 1024;
-
-      ClientSession session = null;
-
-      class LocalCallback implements MockConnector.MockCallback
-      {
-         AtomicInteger counter = new AtomicInteger(0);
-
-         ClientSession session;
-
-         public void onWrite(final HornetQBuffer buffer)
-         {
-            log.info("calling cb onwrite** ");
-            if (counter.incrementAndGet() == 5)
-            {
-               RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
-               RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)server.getRemotingService();
-               remotingServiceImpl.connectionException(conn.getID(),
-                                                       new HornetQException(HornetQException.NOT_CONNECTED, "blah!"));
-               conn.fail(new HornetQException(HornetQException.NOT_CONNECTED, "blah"));
-               throw new IllegalStateException("blah");
-            }
-         }
-      }
-
-      LocalCallback callback = new LocalCallback();
-
-      try
-      {
-         HashMap<String, Object> parameters = new HashMap<String, Object>();
-         parameters.put("callback", callback);
-
-         TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
-                                                                       parameters);
-
-         ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
-
-         mockFactory.setBlockOnNonPersistentSend(false);
-         mockFactory.setBlockOnPersistentSend(false);
-         mockFactory.setBlockOnAcknowledge(false);
-
-         session = mockFactory.createSession(null, null, false, true, true, false, 0);
-
-         callback.session = session;
-
-         session.createQueue(ADDRESS, ADDRESS, null, true);
-
-         ClientProducer producer = session.createProducer(ADDRESS);
-
-         ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
-
-         try
-         {
-            producer.send(clientLarge);
-            
-            fail("Exception was expected!");
-         }
-         catch (Exception e)
-         {
-         }
-
-         validateNoFilesOnLargeDir();
-
-         session.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Exception ignored)
-         {
-            ignored.printStackTrace();
-         }
-      }
-
-   }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-   }
-
-   protected void tearDown() throws Exception
-   {
-      super.tearDown();
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Deleted: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -1,567 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.largemessage;
-
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.Executor;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.JournalLargeServerMessage;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.impl.HornetQSecurityManagerImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.SpawnedVMSupport;
-
-/**
- * A LargeMessageCrashTest
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class LargeMessageCrashTest extends ServiceTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   static String QUEUE_NAME = "MY-QUEUE";
-
-   static int LARGE_MESSAGE_SIZE = 5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
-   static int PAGED_MESSAGE_SIZE = 1024;
-
-   static int NUMBER_OF_PAGES_MESSAGES = 100;
-
-   boolean failAfterRename;
-
-   // Static --------------------------------------------------------
-
-   public static void main(String args[])
-   {
-      LargeMessageCrashTest serverTest = new LargeMessageCrashTest();
-
-      serverTest.failAfterRename = false;
-
-      for (String arg : args)
-      {
-         if (arg.equals("failAfterRename"))
-         {
-            serverTest.failAfterRename = true;
-         }
-      }
-      
-      for (String arg : args)
-      {
-         if (arg.equals("remoteJournalSendNonTransactional"))
-         {
-            serverTest.remoteJournalSendNonTransactional();
-         }
-         else if (arg.equals("remoteJournalSendTransactional"))
-         {
-            serverTest.remoteJournalSendTransactional();
-         }
-         else if (arg.equals("remotePreparedTransaction"))
-         {
-            serverTest.remotePreparedTransaction();
-         }
-         else if (arg.equals("remotePaging"))
-         {
-            serverTest.remotePaging();
-         }
-      }
-   }
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Commented out for https://jira.jboss.org/jira/browse/HORNETQ-49
-   public void testFoo()
-   {
-      
-   }
-   
-//   public void testJournalSendNonTransactional1() throws Exception
-//   {
-//      internalTestSend(false, false);
-//   }
-//
-//   public void testJournalSendNonTransactional2() throws Exception
-//   {
-//      internalTestSend(true, false);
-//   }
-//
-//   public void testJournalSendTransactional1() throws Exception
-//   {
-//      internalTestSend(false, true);
-//   }
-//
-//   public void testJournalSendTransactional2() throws Exception
-//   {
-//      internalTestSend(true, true);
-//   }
-
-   public void internalTestSend(boolean failureAfterRename, boolean transactional) throws Exception
-   {
-      if (transactional)
-      {
-         runExternalProcess(failureAfterRename, "remoteJournalSendTransactional");
-      }
-      else
-      {
-         runExternalProcess(failureAfterRename, "remoteJournalSendNonTransactional");
-      }
-
-      HornetQServer server = newServer(false);
-
-      try
-      {
-         server.start();
-
-         ClientSessionFactory cf = createInVMFactory();
-
-         ClientSession session = cf.createSession(true, true);
-
-         ClientConsumer cons = session.createConsumer(QUEUE_NAME);
-
-         session.start();
-
-         assertNull(cons.receive(100));
-
-         session.close();
-
-         validateNoFilesOnLargeDir();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
-
-   public void testPreparedTransaction() throws Exception
-   {
-      runExternalProcess(false, "remotePreparedTransaction");
-
-      HornetQServer server = newServer(false);
-
-      server.start();
-
-      ClientSessionFactory cf = createInVMFactory();
-
-      ClientSession session = cf.createSession(true, false, false);
-
-      Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
-
-      assertEquals(1, xids.length);
-
-      session.rollback(xids[0]);
-
-      session.close();
-
-      server.stop();
-
-      validateNoFilesOnLargeDir();
-
-   }
-
-   public void testPreparedTransactionAndCommit() throws Exception
-   {
-      runExternalProcess(false, "remotePreparedTransaction");
-
-      HornetQServer server = newServer(false);
-
-      server.start();
-
-      ClientSessionFactory cf = createInVMFactory();
-
-      ClientSession session = cf.createSession(true, false, false);
-
-      Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
-
-      assertEquals(1, xids.length);
-
-      session.commit(xids[0], false);
-
-      session.close();
-
-      session = cf.createSession(false, false);
-
-      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
-
-      session.start();
-
-      ClientMessage msg = consumer.receive(5000);
-
-      assertNotNull(msg);
-
-      msg.acknowledge();
-
-      for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
-      {
-         assertEquals(getSamplebyte(i), msg.getBody().readByte());
-      }
-
-      session.commit();
-
-      session.close();
-
-      server.stop();
-
-      validateNoFilesOnLargeDir();
-
-   }
-   
-   
-   public void testPaging() throws Exception
-   {
-      runExternalProcess(false, "remotePaging");
-
-      HornetQServer server = newServer(false);
-
-      server.start();
-
-      ClientSessionFactory cf = createInVMFactory();
-
-      ClientSession session = cf.createSession(false, true, true);
-
-      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
-
-      session.start();
-
-      for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
-      {
-         ClientMessage msg = consumer.receive(50000);
-         assertNotNull(msg);
-         msg.acknowledge();
-         session.commit();
-      }
-
-      ClientMessage msg = consumer.receiveImmediate();
-      assertNull(msg);
-
-      session.close();
-
-      server.stop();
-
-      validateNoFilesOnLargeDir();
-
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-   /**
-    * @throws Exception
-    * @throws InterruptedException
-    */
-   private void runExternalProcess(boolean failAfterRename, String methodName) throws Exception, InterruptedException
-   {
-      System.err.println("running external process...");
-
-      Process process = SpawnedVMSupport.spawnVM(this.getClass().getCanonicalName(),
-                                                 "-Xms128m -Xmx128m ",
-                                                 new String[] {},
-                                                 true,
-                                                 true,
-                                                 methodName,
-                                                 (failAfterRename ? "failAfterRename" : "regularFail"));
-
-      assertEquals(100, process.waitFor());
-   }
-
-   // Inner classes -------------------------------------------------
-
-   public void remoteJournalSendNonTransactional()
-   {
-
-      try
-      {
-         startServer(failAfterRename, true);
-
-         ClientSessionFactory factory = createInVMFactory();
-         ClientSession session = factory.createSession(true, true);
-
-         try
-         {
-            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         ClientProducer prod = session.createProducer(QUEUE_NAME);
-
-         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         System.exit(-1);
-      }
-
-   }
-
-   public void remoteJournalSendTransactional()
-   {
-      try
-      {
-         startServer(failAfterRename, true);
-
-         ClientSessionFactory factory = createInVMFactory();
-         ClientSession session = factory.createSession(false, false);
-
-         try
-         {
-            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         ClientProducer prod = session.createProducer(QUEUE_NAME);
-
-         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         System.exit(-1);
-      }
-
-   }
-
-   public void remotePreparedTransaction()
-   {
-      try
-      {
-         startServer(failAfterRename, false);
-
-         ClientSessionFactory factory = createInVMFactory();
-         ClientSession session = factory.createSession(true, false, false);
-
-         try
-         {
-            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         ClientProducer prod = session.createProducer(QUEUE_NAME);
-
-         Xid xid = newXID();
-         session.start(xid, XAResource.TMNOFLAGS);
-
-         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
-
-         session.end(xid, XAResource.TMSUCCESS);
-         session.prepare(xid);
-
-         Runtime.getRuntime().halt(100);
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         System.exit(-1);
-      }
-
-   }
-
-   public void remotePaging()
-   {
-      try
-      {
-         startServer(failAfterRename, true);
-
-         ClientSessionFactory factory = createInVMFactory();
-         ClientSession session = factory.createSession(false, false, false);
-
-         try
-         {
-            session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         ClientProducer prod = session.createProducer(QUEUE_NAME);
-
-         byte body[] = new byte[PAGED_MESSAGE_SIZE];
-         for (int i = 0; i < body.length; i++)
-         {
-            body[i] = getSamplebyte(i);
-         }
-
-         ClientMessage msg = session.createClientMessage(true);
-
-         msg.setBody(ChannelBuffers.wrappedBuffer(body));
-
-         for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
-         {
-            prod.send(msg);
-         }
-
-         session.commit();
-         
-         session.close();
-         
-         session = factory.createSession(false, true, true);
-         prod = session.createProducer(QUEUE_NAME);
-
-         prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
-
-         Runtime.getRuntime().halt(100);
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         System.exit(-1);
-      }
-
-   }
-
-   protected ClientMessage createLargeClientMessage(final ClientSession session,
-                                                    final long numberOfBytes,
-                                                    final boolean persistent) throws Exception
-   {
-
-      ClientMessage clientMessage = session.createClientMessage(persistent);
-
-      clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
-
-      return clientMessage;
-   }
-
-   protected void startServer(boolean failAfterRename, boolean fail)
-   {
-      this.failAfterRename = failAfterRename;
-      try
-      {
-         HornetQServer server = newServer(fail);
-         server.start();
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   private HornetQServer newServer(boolean failing)
-   {
-      Configuration configuration = createDefaultConfig(false);
-      HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
-
-      HornetQServer server;
-
-      if (failing)
-      {
-         server = new FailingHornetQServer(configuration, securityManager);
-      }
-      else
-      {
-         server = new HornetQServerImpl(configuration, securityManager);
-      }
-
-      AddressSettings defaultSetting = new AddressSettings();
-      defaultSetting.setPageSizeBytes(10 * 1024);
-      defaultSetting.setMaxSizeBytes(100 * 1024);
-
-      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
-      return server;
-   }
-
-   /** This is hacking HornetQServerImpl, 
-    *  to make sure the server will fail right 
-    *  before the page-file was removed */
-   private class FailingHornetQServer extends HornetQServerImpl
-   {
-      FailingHornetQServer(final Configuration config, final HornetQSecurityManager securityManager)
-      {
-         super(config, ManagementFactory.getPlatformMBeanServer(), securityManager);
-      }
-
-      @Override
-      protected StorageManager createStorageManager()
-      {
-         return new FailingStorageManager(getConfiguration(), getExecutor());
-      }
-
-   }
-
-   private class FailingStorageManager extends JournalStorageManager
-   {
-
-      public FailingStorageManager(final Configuration config, final Executor executor)
-      {
-         super(config, executor);
-      }
-
-      @Override
-      public LargeServerMessage createLargeMessage()
-      {
-         return new FailinJournalLargeServerMessage(this);
-      }
-
-   }
-
-   private class FailinJournalLargeServerMessage extends JournalLargeServerMessage
-   {
-      /**
-       * @param storageManager
-       */
-      public FailinJournalLargeServerMessage(final JournalStorageManager storageManager)
-      {
-         super(storageManager);
-      }
-
-      @Override
-      public void setStored() throws Exception
-      {
-         if (failAfterRename)
-         {
-            super.setStored();
-         }
-         Runtime.getRuntime().halt(100);
-      }
-
-   }
-
-}

Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-02 23:23:19 UTC (rev 8188)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-03 08:45:42 UTC (rev 8189)
@@ -248,7 +248,7 @@
 
          manager.largeMessageWrite(500, new byte[1024]);
 
-         manager.largeMessageEnd(500);
+         manager.largeMessageDelete(500);
 
          blockOnReplication(manager);
 



More information about the hornetq-commits mailing list