[jboss-cvs] JBoss Messaging SVN: r4866 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Aug 23 01:37:52 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-23 01:37:51 -0400 (Sat, 23 Aug 2008)
New Revision: 4866

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Removing PageTransaction, adding duplicate detection on depage, processing transacted-paged-message in memory

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * Stores the last pageID processed during depage, to detect duplications during the delete
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface LastPageRecord extends EncodingSupport
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   long getRecordId();
+
+   void setRecordId(long recordId);
+
+   SimpleString getDestination();
+
+   void setDestination(SimpleString destination);
+
+   long getLastId();
+
+   void setLastId(long lastId);
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -23,6 +23,8 @@
 
 package org.jboss.messaging.core.paging;
 
+import java.util.Collection;
+
 import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
@@ -32,36 +34,17 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  * @param <T> An Encoding Support.
- * TODO: After we have the Paging system stable, maybe we can remove the generic part
  */
 public interface Pager
 {
    
    /**
+    * @param pagingStoreImpl 
     * @return false if the listener can't handle more pages
     */
-   boolean onDepage(SimpleString destination, PageMessage[] data) throws Exception;
+   boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PageMessage[] data) throws Exception;
    
-   /** 
-    * Depage could depage messages for transactions not committed yet,
-    * So we need to eventually send pending messages
-    * */
-   void beginTransaction(long transactionID, PageTransaction pageTransaction);
-   
-   /** 
-    * Depage could depage messages for transactions not committed yet,
-    * So we need to eventually send pending messages
-    * */
-   void commitTransaction(long transactionID, PageTransaction pageTransaction);
-   
    /**
-    * Need to clear any information about the transaction, any eventual data will be ignored 
-    * */
-   void rollbackTransaction(long transactionID);
-   
-   
-   
-   /**
     * To be used by transactions only.
     * If you're sure you will page if isPaging, just call the method page and look at its return. 
     * @param destination
@@ -77,14 +60,10 @@
    boolean page(ServerMessage message) throws Exception;
    
    /**
-    * Page, only if destination is in page mode.
     * 
-    * page is an atomic operation. It's better to call page and test the return.
-    * 
-    * @param message
-    * @return false if destination is not on page mode
-    */
-   boolean page(ServerMessage message, long transactionID) throws Exception;
+    * Duplication detection for paging processing
+    *  */
+   void loadLastPage(LastPageRecord lastPage) throws Exception;
    
    /** 
     * 
@@ -96,7 +75,14 @@
    /** To be called when a rollback is called after messageDone was called */
    long addSize(ServerMessage message) throws Exception;
 
-   void sync(PageTransaction pageTransaction) throws Exception;
+   void sync(Collection<SimpleString> destinationsToSync) throws Exception;
+
+   /**
+    * When we stop depaging, The Last page record needs to removed.
+    * Or else the record could live forever on the journal. 
+    * @throws Exception 
+    * */
+   void clearLastRecord(LastPageRecord lastRecord) throws Exception;
    
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -50,7 +50,7 @@
    
    void sync() throws Exception;
    
-   boolean page(PageMessage message) throws Exception;
+   boolean page(PageMessage message, Pager pageListener) throws Exception;
    
    /** 
     * Remove the first page from the Writing Queue.
@@ -69,4 +69,8 @@
     */
    boolean startDequeueThread(Pager listener) throws Exception;
 
+   LastPageRecord getLastRecord();
+
+   void setLastRecord(LastPageRecord record);
+
 }

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -0,0 +1,120 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+
+public class LastPageRecordImpl implements LastPageRecord
+{
+   
+    // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   
+   private long recordId;
+   private SimpleString destination;
+   private long lastId;
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   
+   
+   // Public --------------------------------------------------------
+
+   public LastPageRecordImpl(long recordId)
+   {
+   }
+   
+   public LastPageRecordImpl(long recordId, long lastId,
+         SimpleString destination)
+   {
+      super();
+      this.recordId = recordId;
+      this.lastId = lastId;
+      this.destination = destination;
+   }
+
+   public long getRecordId()
+   {
+      return recordId;
+   }
+
+   public void setRecordId(final long recordId)
+   {
+      this.recordId = recordId;
+   }
+
+   public SimpleString getDestination()
+   {
+      return destination;
+   }
+
+   public void setDestination(final SimpleString destination)
+   {
+      this.destination = destination;
+   }
+
+   public long getLastId()
+   {
+      return lastId;
+   }
+
+   public void setLastId(final long lastId)
+   {
+      this.lastId = lastId;
+   }
+   
+   
+   public void decode(final MessagingBuffer buffer)
+   {
+      lastId = buffer.getLong();
+      destination = buffer.getSimpleString();
+   }
+   
+   public void encode(final MessagingBuffer buffer)
+   {
+      buffer.putLong(lastId);
+      buffer.putSimpleString(destination);
+   }
+   
+   public int getEncodeSize()
+   {
+      return 8  + SimpleString.sizeofString(destination);
+   }
+
+
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -49,13 +49,7 @@
    // Public --------------------------------------------------------
 
    private final ServerMessage message;
-   private long transactionID;
-   
-   public PageMessage(ServerMessage message, long transactionID)
-   {
-      this.message = message;
-   }
-   
+
    public PageMessage(ServerMessage message)
    {
       this.message = message;
@@ -71,17 +65,11 @@
       return message;
    }
 
-   public long getTransactionID()
-   {
-      return transactionID;
-   }
    
-   
    // EncodingSupport implementation --------------------------------
 
    public void decode(MessagingBuffer buffer)
    {
-      transactionID = buffer.getLong();
       final long messageID = buffer.getLong();
       message.decode(buffer);
       message.setMessageID(messageID);
@@ -89,7 +77,6 @@
 
    public void encode(MessagingBuffer buffer)
    {
-      buffer.putLong(transactionID);
       buffer.putLong(message.getMessageID());
       message.encode(buffer);
    }
@@ -97,7 +84,7 @@
    public int getEncodeSize()
    {
       
-      return 8 + 8 + message.getEncodeSize();
+      return 8 + message.getEncodeSize();
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -32,6 +32,8 @@
 
 /**
  * 
+ * TODO: delete this class!
+ * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.Pager;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -72,11 +73,13 @@
    private volatile Page currentPage;
 
    // This is supposed to perform better than synchronized methods
-   // globalLock protects opening/closing and messing up with IDs
-   private final Semaphore globalLock = new Semaphore(1); 
+   // synchronizedBlockLock protects opening/closing and messing up with IDs
+   private final Semaphore synchronizedBlockLock = new Semaphore(1); 
 
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private volatile boolean initialized = false;
+
+   private volatile LastPageRecord lastRecord;
    
    
    // Static --------------------------------------------------------
@@ -131,7 +134,7 @@
       validateInit();
       
       // Read needs both global and writeLock
-      globalLock.acquire(); // This is a replacement synchronized.
+      synchronizedBlockLock.acquire(); // This is a replacement synchronized.
                             // Can't change any IDs while depaging.
       lock.writeLock().lock();  // Wait pending writes to finish before depage.
       
@@ -190,12 +193,12 @@
       finally
       {
          lock.writeLock().unlock();
-         globalLock.release();
+         synchronizedBlockLock.release();
       }
       
    }
 
-   public boolean page(PageMessage message) throws Exception
+   public boolean page(PageMessage message, Pager pageListener) throws Exception
    {
       validateInit();
       
@@ -203,13 +206,21 @@
       
       
       // This would be a synchronized block... (but using a Semaphore)
-      
-      globalLock.acquire();
+      synchronizedBlockLock.acquire();
 
       try
       {
          if (currentPage == null)
          {
+            if (this.lastRecord != null)
+            {
+               if (pageListener != null)
+               {
+                  pageListener.clearLastRecord(lastRecord);
+               }
+               lastRecord = null;
+            }
+            
             return false;
          }
          
@@ -227,7 +238,7 @@
                lock.writeLock().unlock();
             }
          }
-         // we must get the readLock before we release the globalLock
+         // we must get the readLock before we release the synchronizedBlockLock
          // or else we could end up with files records being added to the currentPage even if the max size was already achieved.
          // (Condition tested by PagingStoreTestPage::testConcurrentPaging, The test would eventually fail, 1 in 100)
          lock.readLock().lock();
@@ -235,7 +246,7 @@
       }
       finally
       {
-         globalLock.release();
+         synchronizedBlockLock.release();
       }
       
       // End of a synchronized block..
@@ -299,6 +310,18 @@
    }
    
    
+   public LastPageRecord getLastRecord()
+   {
+      return lastRecord;
+   }
+
+   public void setLastRecord(LastPageRecord record)
+   {
+      this.lastRecord = record;
+   }
+
+   
+   
    // MessagingComponent implementation
    
    public synchronized boolean isStarted()
@@ -384,7 +407,7 @@
    {
       validateInit();
 
-      globalLock.acquire();
+      synchronizedBlockLock.acquire();
       try
       {
          if (currentPage == null)
@@ -399,7 +422,7 @@
       }
       finally
       {
-         globalLock.release();
+         synchronizedBlockLock.release();
       }
    }
    
@@ -533,7 +556,7 @@
                }
                page.open();
                PageMessage messages[] = page.read();
-               needMorePages = listener.onDepage(PagingStoreImpl.this.storeName, messages);
+               needMorePages = listener.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
                page.delete();
             }
             while (needMorePages);

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -78,7 +79,9 @@
    void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
 
    
+   void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception;
    
+   
    void updateDeliveryCount(MessageReference ref) throws Exception;     
    
    void loadMessages(PostOffice postOffice, Map<Long, Queue> queues) throws Exception;

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -47,7 +47,9 @@
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -99,6 +101,8 @@
    
    public static final byte PAGE_TRANSACTION = 34;
    
+   public static final byte LAST_PAGE = 35;
+   
    public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
   	
 	private final AtomicLong messageIDSequence = new AtomicLong(0);
@@ -229,6 +233,11 @@
    {
       messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
    }
+   
+   public void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception
+   {
+      messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordId(), LAST_PAGE, pageTransaction);
+   }
 
    public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
    {
@@ -298,6 +307,18 @@
 			
 			switch (recordType)
 			{
+			   case LAST_PAGE:
+			   {
+               MessagingBuffer buff = new ByteBufferWrapper(bb);
+               
+			      LastPageRecordImpl recordImpl = new LastPageRecordImpl(record.id);
+			      
+			      recordImpl.decode(buff);
+			      
+			      postOffice.getPager().loadLastPage(recordImpl);
+			      
+			      break;
+			   }
 				case ADD_MESSAGE:
 				{
 					MessagingBuffer buff = new ByteBufferWrapper(bb);

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -165,4 +166,9 @@
 	   return started;
 	}
 
+   public void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception
+   {
+   }
+
+
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -23,9 +23,9 @@
 package org.jboss.messaging.core.postoffice.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,10 +38,11 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
 import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -60,12 +61,13 @@
  * A PostOfficeImpl
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public class PostOfficeImpl implements PostOffice
 {  
    
-   private static final long MAX_SIZE = 10 * 1024 * 1024;
+   private static final long MAX_SIZE = 100 * 1024 * 1024;
    
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
    
@@ -443,9 +445,9 @@
       }
    }
 
-   private void startDepageThread(PagingStore store) throws Exception
+   private boolean startDepageThread(PagingStore store) throws Exception
    {
-      store.startDequeueThread(new PagerImpl());
+      return store.startDequeueThread(new PagerImpl());
    }
    
    
@@ -453,107 +455,73 @@
    private class PagerImpl implements Pager
    {
       
-      private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
+//      private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
 
+      public void clearLastRecord(LastPageRecord lastRecord) throws Exception
+      {
+         System.out.println("Clearing lastRecord information!!!!!!");
+         long transactionID = storageManager.generateTransactionID();
+         storageManager.storeDeleteTransactional(transactionID, lastRecord.getRecordId());
+      }
+      
       /**
        * This method will remove files from the page system and add them into the journal, doing it transactionally
        * 
        * A Transaction will be opened only if persistent messages are used.
        * If persistent messages are also used, it will update eventual PageTransactions
        */
-      public boolean onDepage(final SimpleString destination, final PageMessage[] data) throws Exception
+      public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
       {
          log.info("Depaging....");
+         
          long transactionID = storageManager.generateTransactionID();
-         boolean usedTransaction = false;
          
-         HashSet<PageTransaction> pageTransactionsToUpdate = new HashSet<PageTransaction>();
+         LastPageRecord lastPage = pagingStore.getLastRecord(); 
          
-         final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-         
-         for (PageMessage msg: data)
+         if (lastPage != null)
          {
-            PageTransaction trUsed = null;
-            if (msg.getTransactionID() > 0)
+            if (pageId <= lastPage.getLastId())
             {
-               trUsed = transactions.get(msg.getTransactionID());
-               if (trUsed == null)
-               {
-                  // TODO make it .trace
-                  log.info("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
-                  continue;
-               }
-               else if (!trUsed.isCommitted())
-               {
-                  
-                  // this would affect any of the solutions we have in mind now..
-                  // we have some options here.. we just need to find the best option.
-                  log.info("Transaction " + msg.getTransactionID() + " is pending... we don't know what to do yet... ignoring the message for now but this is not acceptable");
-                  continue;
-               }
+               log.info("Page " + pageId + " was already processed, ignoring the page");
+               return true;
             }
-
-            if (msg.getMessage().isDurable() && trUsed != null)
+            else
             {
-               pageTransactionsToUpdate.add(trUsed);
-               trUsed.decrement();
+               storageManager.storeDeleteTransactional(transactionID, lastPage.getRecordId());
             }
-            
+         }
+         
+         LastPageRecord record = new LastPageRecordImpl(storageManager.generateMessageID(), pageId, destination);
+         storageManager.storeLastPage(transactionID, record);
+         pagingStore.setLastRecord(record);
+         
+         final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+         
+         for (PageMessage msg: data)
+         {
             refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
             
             if (msg.getMessage().getDurableRefCount() != 0)
             {
-               usedTransaction = true;
                storageManager.storeMessageTransactional(transactionID, msg.getMessage());
             }
          }
          
-         for (PageTransaction pageTrans: pageTransactionsToUpdate)
-         {
-            if (pageTrans.getNumberOfMessages() == 0)
-            {
-               storageManager.storeDelete(pageTrans.getRecordID());
-               transactions.remove(pageTrans.getTransactionID());
-            }
-            else
-            {
-               storageManager.updatePageTransaction(transactionID, pageTrans);
-            }
-            usedTransaction = true;
-         }
+         storageManager.commit(transactionID);
          
-         if (usedTransaction)
-         {
-            storageManager.commit(transactionID);
-         }
-         
          for (MessageReference ref : refsToAdd)
          {
             ref.getQueue().addLast(ref);
          }
-
          
          return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE; 
       }
       
-
-      // Transaction
-      public void beginTransaction(long transactionID, PageTransaction pageTransaction)
+      public void loadLastPage(LastPageRecord lastPage) throws Exception
       {
-         transactions.putIfAbsent(transactionID, pageTransaction);
+         pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
       }
-      
-      public void commitTransaction(long transactionID, PageTransaction pageTransaction)
-      {
-         transactions.putIfAbsent(transactionID, pageTransaction);
-         // TODO: What to do with pending transactions during depage?
-      }
-      
-      public void rollbackTransaction(long transactionID)
-      {
-         transactions.remove(transactionID);
-      }
-      
+
       public boolean isPaging(SimpleString destination) throws Exception
       {
          return pagingManager.getPageStore(destination).isPaging();
@@ -565,10 +533,12 @@
 
          if (size < MAX_SIZE)
          {
-            System.out.println("Starting depage Thread, size = " + size);
             PagingStore store = pagingManager.getPageStore(message.getDestination());
             
-            startDepageThread(store);
+            if (startDepageThread(store))
+            {
+               log.info("Starting depaging Thread, size = " + size);
+            }
          }
       }
       
@@ -586,25 +556,18 @@
       
       public boolean page(ServerMessage message) throws Exception
       {
-         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
+         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message), this);
       }
 
-      public boolean page(ServerMessage message, long transactionID) throws Exception
+      public void sync(Collection<SimpleString> destinationsToSync) throws Exception
       {
-         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message, transactionID));
-      }
-
-
-      public void sync(PageTransaction pageTransaction) throws Exception
-      {
-         SimpleString[] destinations = pageTransaction.getDestinations();
-         for (SimpleString destination: destinations)
+         for (SimpleString destination: destinationsToSync)
          {
             pagingManager.getPageStore(destination).sync();
          }
       }
-      
-      
 
+
+
    }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -24,6 +24,7 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +45,7 @@
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * A TransactionImpl
@@ -64,9 +66,9 @@
    private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
 
    private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
+   
+   private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
 
-   private PageTransaction pageTransaction; 
-
    private final Xid xid;
 
    private final long id;
@@ -127,30 +129,14 @@
          throw new IllegalStateException("Transaction is in invalid state " + state);
       }
       
-      if (pager.page(message, this.id))
+      if (pager.isPaging(message.getDestination()))
       {
-         if (message.isDurable())
-         {
-            containsPersistent = true;
-            getPageTransaction().addMessage(message.getDestination());
-         }
-         else
-         {
-            getPageTransaction();
-         }
+
+         pagedMessages.add(message);
       }
       else
       {
-         List<MessageReference> refs = postOffice.route(message);
-
-         refsToAdd.addAll(refs);
-
-         if (message.getDurableRefCount() != 0)
-         {
-            storageManager.storeMessageTransactional(id, message);
-
-            containsPersistent = true;
-         }
+         route(message);
       }
    }
 
@@ -222,10 +208,6 @@
 
       if (containsPersistent)
       {
-         if (this.pageTransaction != null)
-         {
-            storageManager.storePageTransaction(this.id, pageTransaction);
-         }
          storageManager.prepare(id);
       }
 
@@ -260,26 +242,35 @@
             throw new IllegalStateException("Transaction is in invalid state " + state);
          }
       }
+      
+      HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
 
+      for (ServerMessage message: pagedMessages)
+      {
+       
+         if (pager.page(message))
+         {
+            if (message.isDurable())
+            {
+               pagedDestinationsToSync.add(message.getDestination());
+            }
+         }
+         else
+         {
+            // This could happen when the PageStore left the pageState 
+            route(message);
+         }
+      }
       
       if (containsPersistent)
       {
-         if (this.pageTransaction != null && state != State.PREPARED)
+         if (pagedDestinationsToSync.size() > 0)
          {
-            storageManager.storePageTransaction(this.id, pageTransaction);
-            pager.sync(pageTransaction);
+            pager.sync(pagedDestinationsToSync);
          }
          storageManager.commit(id);
       }
 
-
-      // TODO: What to do if depage happen on the middle of transaction not committed yet?
-      //       This would be a problem on any solution applied on transactions & paging
-      if (pageTransaction != null)
-      {
-         pager.commitTransaction(id, pageTransaction);
-      }      
-      
       for (MessageReference ref : refsToAdd)
       {
          ref.getQueue().addLast(ref);
@@ -317,11 +308,6 @@
          storageManager.rollback(id);
       }
       
-      if (pageTransaction != null)
-      {
-         pager.rollbackTransaction(id);
-      }
-
       Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
 
       // We sort into lists - one for each queue involved.
@@ -434,19 +420,20 @@
    // Private
    // -------------------------------------------------------------------
 
-   private PageTransaction getPageTransaction()
+   private void route(final ServerMessage message) throws Exception
    {
-      if (pageTransaction == null)
+      List<MessageReference> refs = postOffice.route(message);
+
+      refsToAdd.addAll(refs);
+
+      if (message.getDurableRefCount() != 0)
       {
-         long pageTRID = storageManager.generateMessageID();
-         pageTransaction = new PageTransactionImpl(id);
-         pageTransaction.setRecordID(pageTRID);
-         pager.beginTransaction(id, pageTransaction);
+         storageManager.storeMessageTransactional(id, message);
+
+         containsPersistent = true;
       }
-      
-      return pageTransaction;
    }
-   
+
    private void clear()
    {
       refsToAdd.clear();

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -68,11 +68,11 @@
       
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
       
-      assertFalse(store.page(new PageMessage(msg)));
+      assertFalse(store.page(new PageMessage(msg), null));
       
       store.startPaging();
       
-      assertTrue(store.page(new PageMessage(msg)));
+      assertTrue(store.page(new PageMessage(msg), null));
       
       Page page = store.depage();
       
@@ -90,7 +90,7 @@
       
       assertNull(store.depage());
       
-      assertFalse(store.page(new PageMessage(msg)));
+      assertFalse(store.page(new PageMessage(msg), null));
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -98,7 +98,7 @@
       
       assertTrue(storeImpl.isPaging());
       
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(msg, null));
       
       assertEquals(1, storeImpl.getNumberOfPages());
       
@@ -137,7 +137,7 @@
    
          PageMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(msg));
+         assertTrue(storeImpl.page(msg, null));
       }
       
       
@@ -201,7 +201,7 @@
          
          PageMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(msg));
+         assertTrue(storeImpl.page(msg, null));
       }
       
       
@@ -234,7 +234,7 @@
 
       PageMessage msg = createMessage(100, destination, buffers.get(0));
       
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(msg, null));
       
       Page newPage = storeImpl.depage();
       
@@ -252,11 +252,11 @@
       
       assertFalse(storeImpl.isPaging());
       
-      assertFalse(storeImpl.page(msg));
+      assertFalse(storeImpl.page(msg, null));
       
       storeImpl.startPaging();
 
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(msg, null));
       
       Page page = storeImpl.depage();
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-23 05:37:51 UTC (rev 4866)
@@ -112,7 +112,7 @@
                {
                   long id = messageIdGenerator.incrementAndGet();
                   PageMessage msg = createMessage(id, destination, createRandomBuffer(5));
-                  if (storeImpl.page(msg))
+                  if (storeImpl.page(msg, null))
                   {
                      buffers.put(id, msg);
                   }
@@ -246,7 +246,7 @@
       long lastMessageId = messageIdGenerator.incrementAndGet();
       PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
       
-      storeImpl2.page(lastMsg);
+      storeImpl2.page(lastMsg, null);
       buffers2.put(lastMessageId, lastMsg);
       
       Page lastPage = null;




More information about the jboss-cvs-commits mailing list