[jboss-cvs] JBoss Messaging SVN: r4866 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Aug 23 01:37:52 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-23 01:37:51 -0400 (Sat, 23 Aug 2008)
New Revision: 4866
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Removing PageTransaction, adding duplicate detection on depage, processing transacted-paged-message in memory
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * Stores the last pageID processed during depage, to detect duplications during the delete
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface LastPageRecord extends EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ long getRecordId();
+
+ void setRecordId(long recordId);
+
+ SimpleString getDestination();
+
+ void setDestination(SimpleString destination);
+
+ long getLastId();
+
+ void setLastId(long lastId);
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -23,6 +23,8 @@
package org.jboss.messaging.core.paging;
+import java.util.Collection;
+
import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
@@ -32,36 +34,17 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
* @param <T> An Encoding Support.
- * TODO: After we have the Paging system stable, maybe we can remove the generic part
*/
public interface Pager
{
/**
+ * @param pagingStoreImpl
* @return false if the listener can't handle more pages
*/
- boolean onDepage(SimpleString destination, PageMessage[] data) throws Exception;
+ boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PageMessage[] data) throws Exception;
- /**
- * Depage could depage messages for transactions not committed yet,
- * So we need to eventually send pending messages
- * */
- void beginTransaction(long transactionID, PageTransaction pageTransaction);
-
- /**
- * Depage could depage messages for transactions not committed yet,
- * So we need to eventually send pending messages
- * */
- void commitTransaction(long transactionID, PageTransaction pageTransaction);
-
/**
- * Need to clear any information about the transaction, any eventual data will be ignored
- * */
- void rollbackTransaction(long transactionID);
-
-
-
- /**
* To be used by transactions only.
* If you're sure you will page if isPaging, just call the method page and look at its return.
* @param destination
@@ -77,14 +60,10 @@
boolean page(ServerMessage message) throws Exception;
/**
- * Page, only if destination is in page mode.
*
- * page is an atomic operation. It's better to call page and test the return.
- *
- * @param message
- * @return false if destination is not on page mode
- */
- boolean page(ServerMessage message, long transactionID) throws Exception;
+ * Duplication detection for paging processing
+ * */
+ void loadLastPage(LastPageRecord lastPage) throws Exception;
/**
*
@@ -96,7 +75,14 @@
/** To be called when a rollback is called after messageDone was called */
long addSize(ServerMessage message) throws Exception;
- void sync(PageTransaction pageTransaction) throws Exception;
+ void sync(Collection<SimpleString> destinationsToSync) throws Exception;
+
+ /**
+ * When we stop depaging, The Last page record needs to removed.
+ * Or else the record could live forever on the journal.
+ * @throws Exception
+ * */
+ void clearLastRecord(LastPageRecord lastRecord) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -50,7 +50,7 @@
void sync() throws Exception;
- boolean page(PageMessage message) throws Exception;
+ boolean page(PageMessage message, Pager pageListener) throws Exception;
/**
* Remove the first page from the Writing Queue.
@@ -69,4 +69,8 @@
*/
boolean startDequeueThread(Pager listener) throws Exception;
+ LastPageRecord getLastRecord();
+
+ void setLastRecord(LastPageRecord record);
+
}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -0,0 +1,120 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+
+public class LastPageRecordImpl implements LastPageRecord
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+
+ private long recordId;
+ private SimpleString destination;
+ private long lastId;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+
+
+ // Public --------------------------------------------------------
+
+ public LastPageRecordImpl(long recordId)
+ {
+ }
+
+ public LastPageRecordImpl(long recordId, long lastId,
+ SimpleString destination)
+ {
+ super();
+ this.recordId = recordId;
+ this.lastId = lastId;
+ this.destination = destination;
+ }
+
+ public long getRecordId()
+ {
+ return recordId;
+ }
+
+ public void setRecordId(final long recordId)
+ {
+ this.recordId = recordId;
+ }
+
+ public SimpleString getDestination()
+ {
+ return destination;
+ }
+
+ public void setDestination(final SimpleString destination)
+ {
+ this.destination = destination;
+ }
+
+ public long getLastId()
+ {
+ return lastId;
+ }
+
+ public void setLastId(final long lastId)
+ {
+ this.lastId = lastId;
+ }
+
+
+ public void decode(final MessagingBuffer buffer)
+ {
+ lastId = buffer.getLong();
+ destination = buffer.getSimpleString();
+ }
+
+ public void encode(final MessagingBuffer buffer)
+ {
+ buffer.putLong(lastId);
+ buffer.putSimpleString(destination);
+ }
+
+ public int getEncodeSize()
+ {
+ return 8 + SimpleString.sizeofString(destination);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -49,13 +49,7 @@
// Public --------------------------------------------------------
private final ServerMessage message;
- private long transactionID;
-
- public PageMessage(ServerMessage message, long transactionID)
- {
- this.message = message;
- }
-
+
public PageMessage(ServerMessage message)
{
this.message = message;
@@ -71,17 +65,11 @@
return message;
}
- public long getTransactionID()
- {
- return transactionID;
- }
-
// EncodingSupport implementation --------------------------------
public void decode(MessagingBuffer buffer)
{
- transactionID = buffer.getLong();
final long messageID = buffer.getLong();
message.decode(buffer);
message.setMessageID(messageID);
@@ -89,7 +77,6 @@
public void encode(MessagingBuffer buffer)
{
- buffer.putLong(transactionID);
buffer.putLong(message.getMessageID());
message.encode(buffer);
}
@@ -97,7 +84,7 @@
public int getEncodeSize()
{
- return 8 + 8 + message.getEncodeSize();
+ return 8 + message.getEncodeSize();
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -32,6 +32,8 @@
/**
*
+ * TODO: delete this class!
+ *
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.Pager;
import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -72,11 +73,13 @@
private volatile Page currentPage;
// This is supposed to perform better than synchronized methods
- // globalLock protects opening/closing and messing up with IDs
- private final Semaphore globalLock = new Semaphore(1);
+ // synchronizedBlockLock protects opening/closing and messing up with IDs
+ private final Semaphore synchronizedBlockLock = new Semaphore(1);
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private volatile boolean initialized = false;
+
+ private volatile LastPageRecord lastRecord;
// Static --------------------------------------------------------
@@ -131,7 +134,7 @@
validateInit();
// Read needs both global and writeLock
- globalLock.acquire(); // This is a replacement synchronized.
+ synchronizedBlockLock.acquire(); // This is a replacement synchronized.
// Can't change any IDs while depaging.
lock.writeLock().lock(); // Wait pending writes to finish before depage.
@@ -190,12 +193,12 @@
finally
{
lock.writeLock().unlock();
- globalLock.release();
+ synchronizedBlockLock.release();
}
}
- public boolean page(PageMessage message) throws Exception
+ public boolean page(PageMessage message, Pager pageListener) throws Exception
{
validateInit();
@@ -203,13 +206,21 @@
// This would be a synchronized block... (but using a Semaphore)
-
- globalLock.acquire();
+ synchronizedBlockLock.acquire();
try
{
if (currentPage == null)
{
+ if (this.lastRecord != null)
+ {
+ if (pageListener != null)
+ {
+ pageListener.clearLastRecord(lastRecord);
+ }
+ lastRecord = null;
+ }
+
return false;
}
@@ -227,7 +238,7 @@
lock.writeLock().unlock();
}
}
- // we must get the readLock before we release the globalLock
+ // we must get the readLock before we release the synchronizedBlockLock
// or else we could end up with files records being added to the currentPage even if the max size was already achieved.
// (Condition tested by PagingStoreTestPage::testConcurrentPaging, The test would eventually fail, 1 in 100)
lock.readLock().lock();
@@ -235,7 +246,7 @@
}
finally
{
- globalLock.release();
+ synchronizedBlockLock.release();
}
// End of a synchronized block..
@@ -299,6 +310,18 @@
}
+ public LastPageRecord getLastRecord()
+ {
+ return lastRecord;
+ }
+
+ public void setLastRecord(LastPageRecord record)
+ {
+ this.lastRecord = record;
+ }
+
+
+
// MessagingComponent implementation
public synchronized boolean isStarted()
@@ -384,7 +407,7 @@
{
validateInit();
- globalLock.acquire();
+ synchronizedBlockLock.acquire();
try
{
if (currentPage == null)
@@ -399,7 +422,7 @@
}
finally
{
- globalLock.release();
+ synchronizedBlockLock.release();
}
}
@@ -533,7 +556,7 @@
}
page.open();
PageMessage messages[] = page.read();
- needMorePages = listener.onDepage(PagingStoreImpl.this.storeName, messages);
+ needMorePages = listener.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
page.delete();
}
while (needMorePages);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
+import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -78,7 +79,9 @@
void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
+ void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception;
+
void updateDeliveryCount(MessageReference ref) throws Exception;
void loadMessages(PostOffice postOffice, Map<Long, Queue> queues) throws Exception;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -47,7 +47,9 @@
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -99,6 +101,8 @@
public static final byte PAGE_TRANSACTION = 34;
+ public static final byte LAST_PAGE = 35;
+
public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
private final AtomicLong messageIDSequence = new AtomicLong(0);
@@ -229,6 +233,11 @@
{
messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
}
+
+ public void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception
+ {
+ messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordId(), LAST_PAGE, pageTransaction);
+ }
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
{
@@ -298,6 +307,18 @@
switch (recordType)
{
+ case LAST_PAGE:
+ {
+ MessagingBuffer buff = new ByteBufferWrapper(bb);
+
+ LastPageRecordImpl recordImpl = new LastPageRecordImpl(record.id);
+
+ recordImpl.decode(buff);
+
+ postOffice.getPager().loadLastPage(recordImpl);
+
+ break;
+ }
case ADD_MESSAGE:
{
MessagingBuffer buff = new ByteBufferWrapper(bb);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -165,4 +166,9 @@
return started;
}
+ public void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception
+ {
+ }
+
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -23,9 +23,9 @@
package org.jboss.messaging.core.postoffice.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -38,10 +38,11 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -60,12 +61,13 @@
* A PostOfficeImpl
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class PostOfficeImpl implements PostOffice
{
- private static final long MAX_SIZE = 10 * 1024 * 1024;
+ private static final long MAX_SIZE = 100 * 1024 * 1024;
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
@@ -443,9 +445,9 @@
}
}
- private void startDepageThread(PagingStore store) throws Exception
+ private boolean startDepageThread(PagingStore store) throws Exception
{
- store.startDequeueThread(new PagerImpl());
+ return store.startDequeueThread(new PagerImpl());
}
@@ -453,107 +455,73 @@
private class PagerImpl implements Pager
{
- private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
+// private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
+ public void clearLastRecord(LastPageRecord lastRecord) throws Exception
+ {
+ System.out.println("Clearing lastRecord information!!!!!!");
+ long transactionID = storageManager.generateTransactionID();
+ storageManager.storeDeleteTransactional(transactionID, lastRecord.getRecordId());
+ }
+
/**
* This method will remove files from the page system and add them into the journal, doing it transactionally
*
* A Transaction will be opened only if persistent messages are used.
* If persistent messages are also used, it will update eventual PageTransactions
*/
- public boolean onDepage(final SimpleString destination, final PageMessage[] data) throws Exception
+ public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
{
log.info("Depaging....");
+
long transactionID = storageManager.generateTransactionID();
- boolean usedTransaction = false;
- HashSet<PageTransaction> pageTransactionsToUpdate = new HashSet<PageTransaction>();
+ LastPageRecord lastPage = pagingStore.getLastRecord();
- final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-
- for (PageMessage msg: data)
+ if (lastPage != null)
{
- PageTransaction trUsed = null;
- if (msg.getTransactionID() > 0)
+ if (pageId <= lastPage.getLastId())
{
- trUsed = transactions.get(msg.getTransactionID());
- if (trUsed == null)
- {
- // TODO make it .trace
- log.info("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
- continue;
- }
- else if (!trUsed.isCommitted())
- {
-
- // this would affect any of the solutions we have in mind now..
- // we have some options here.. we just need to find the best option.
- log.info("Transaction " + msg.getTransactionID() + " is pending... we don't know what to do yet... ignoring the message for now but this is not acceptable");
- continue;
- }
+ log.info("Page " + pageId + " was already processed, ignoring the page");
+ return true;
}
-
- if (msg.getMessage().isDurable() && trUsed != null)
+ else
{
- pageTransactionsToUpdate.add(trUsed);
- trUsed.decrement();
+ storageManager.storeDeleteTransactional(transactionID, lastPage.getRecordId());
}
-
+ }
+
+ LastPageRecord record = new LastPageRecordImpl(storageManager.generateMessageID(), pageId, destination);
+ storageManager.storeLastPage(transactionID, record);
+ pagingStore.setLastRecord(record);
+
+ final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+
+ for (PageMessage msg: data)
+ {
refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
if (msg.getMessage().getDurableRefCount() != 0)
{
- usedTransaction = true;
storageManager.storeMessageTransactional(transactionID, msg.getMessage());
}
}
- for (PageTransaction pageTrans: pageTransactionsToUpdate)
- {
- if (pageTrans.getNumberOfMessages() == 0)
- {
- storageManager.storeDelete(pageTrans.getRecordID());
- transactions.remove(pageTrans.getTransactionID());
- }
- else
- {
- storageManager.updatePageTransaction(transactionID, pageTrans);
- }
- usedTransaction = true;
- }
+ storageManager.commit(transactionID);
- if (usedTransaction)
- {
- storageManager.commit(transactionID);
- }
-
for (MessageReference ref : refsToAdd)
{
ref.getQueue().addLast(ref);
}
-
return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE;
}
-
- // Transaction
- public void beginTransaction(long transactionID, PageTransaction pageTransaction)
+ public void loadLastPage(LastPageRecord lastPage) throws Exception
{
- transactions.putIfAbsent(transactionID, pageTransaction);
+ pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
}
-
- public void commitTransaction(long transactionID, PageTransaction pageTransaction)
- {
- transactions.putIfAbsent(transactionID, pageTransaction);
- // TODO: What to do with pending transactions during depage?
- }
-
- public void rollbackTransaction(long transactionID)
- {
- transactions.remove(transactionID);
- }
-
+
public boolean isPaging(SimpleString destination) throws Exception
{
return pagingManager.getPageStore(destination).isPaging();
@@ -565,10 +533,12 @@
if (size < MAX_SIZE)
{
- System.out.println("Starting depage Thread, size = " + size);
PagingStore store = pagingManager.getPageStore(message.getDestination());
- startDepageThread(store);
+ if (startDepageThread(store))
+ {
+ log.info("Starting depaging Thread, size = " + size);
+ }
}
}
@@ -586,25 +556,18 @@
public boolean page(ServerMessage message) throws Exception
{
- return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
+ return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message), this);
}
- public boolean page(ServerMessage message, long transactionID) throws Exception
+ public void sync(Collection<SimpleString> destinationsToSync) throws Exception
{
- return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message, transactionID));
- }
-
-
- public void sync(PageTransaction pageTransaction) throws Exception
- {
- SimpleString[] destinations = pageTransaction.getDestinations();
- for (SimpleString destination: destinations)
+ for (SimpleString destination: destinationsToSync)
{
pagingManager.getPageStore(destination).sync();
}
}
-
-
+
+
}
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,7 @@
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
+import org.jboss.messaging.util.SimpleString;
/**
* A TransactionImpl
@@ -64,9 +66,9 @@
private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
+
+ private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
- private PageTransaction pageTransaction;
-
private final Xid xid;
private final long id;
@@ -127,30 +129,14 @@
throw new IllegalStateException("Transaction is in invalid state " + state);
}
- if (pager.page(message, this.id))
+ if (pager.isPaging(message.getDestination()))
{
- if (message.isDurable())
- {
- containsPersistent = true;
- getPageTransaction().addMessage(message.getDestination());
- }
- else
- {
- getPageTransaction();
- }
+
+ pagedMessages.add(message);
}
else
{
- List<MessageReference> refs = postOffice.route(message);
-
- refsToAdd.addAll(refs);
-
- if (message.getDurableRefCount() != 0)
- {
- storageManager.storeMessageTransactional(id, message);
-
- containsPersistent = true;
- }
+ route(message);
}
}
@@ -222,10 +208,6 @@
if (containsPersistent)
{
- if (this.pageTransaction != null)
- {
- storageManager.storePageTransaction(this.id, pageTransaction);
- }
storageManager.prepare(id);
}
@@ -260,26 +242,35 @@
throw new IllegalStateException("Transaction is in invalid state " + state);
}
}
+
+ HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
+ for (ServerMessage message: pagedMessages)
+ {
+
+ if (pager.page(message))
+ {
+ if (message.isDurable())
+ {
+ pagedDestinationsToSync.add(message.getDestination());
+ }
+ }
+ else
+ {
+ // This could happen when the PageStore left the pageState
+ route(message);
+ }
+ }
if (containsPersistent)
{
- if (this.pageTransaction != null && state != State.PREPARED)
+ if (pagedDestinationsToSync.size() > 0)
{
- storageManager.storePageTransaction(this.id, pageTransaction);
- pager.sync(pageTransaction);
+ pager.sync(pagedDestinationsToSync);
}
storageManager.commit(id);
}
-
- // TODO: What to do if depage happen on the middle of transaction not committed yet?
- // This would be a problem on any solution applied on transactions & paging
- if (pageTransaction != null)
- {
- pager.commitTransaction(id, pageTransaction);
- }
-
for (MessageReference ref : refsToAdd)
{
ref.getQueue().addLast(ref);
@@ -317,11 +308,6 @@
storageManager.rollback(id);
}
- if (pageTransaction != null)
- {
- pager.rollbackTransaction(id);
- }
-
Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
// We sort into lists - one for each queue involved.
@@ -434,19 +420,20 @@
// Private
// -------------------------------------------------------------------
- private PageTransaction getPageTransaction()
+ private void route(final ServerMessage message) throws Exception
{
- if (pageTransaction == null)
+ List<MessageReference> refs = postOffice.route(message);
+
+ refsToAdd.addAll(refs);
+
+ if (message.getDurableRefCount() != 0)
{
- long pageTRID = storageManager.generateMessageID();
- pageTransaction = new PageTransactionImpl(id);
- pageTransaction.setRecordID(pageTRID);
- pager.beginTransaction(id, pageTransaction);
+ storageManager.storeMessageTransactional(id, message);
+
+ containsPersistent = true;
}
-
- return pageTransaction;
}
-
+
private void clear()
{
refsToAdd.clear();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -68,11 +68,11 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- assertFalse(store.page(new PageMessage(msg)));
+ assertFalse(store.page(new PageMessage(msg), null));
store.startPaging();
- assertTrue(store.page(new PageMessage(msg)));
+ assertTrue(store.page(new PageMessage(msg), null));
Page page = store.depage();
@@ -90,7 +90,7 @@
assertNull(store.depage());
- assertFalse(store.page(new PageMessage(msg)));
+ assertFalse(store.page(new PageMessage(msg), null));
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -98,7 +98,7 @@
assertTrue(storeImpl.isPaging());
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, null));
assertEquals(1, storeImpl.getNumberOfPages());
@@ -137,7 +137,7 @@
PageMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, null));
}
@@ -201,7 +201,7 @@
PageMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, null));
}
@@ -234,7 +234,7 @@
PageMessage msg = createMessage(100, destination, buffers.get(0));
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, null));
Page newPage = storeImpl.depage();
@@ -252,11 +252,11 @@
assertFalse(storeImpl.isPaging());
- assertFalse(storeImpl.page(msg));
+ assertFalse(storeImpl.page(msg, null));
storeImpl.startPaging();
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, null));
Page page = storeImpl.depage();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-22 21:40:00 UTC (rev 4865)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-23 05:37:51 UTC (rev 4866)
@@ -112,7 +112,7 @@
{
long id = messageIdGenerator.incrementAndGet();
PageMessage msg = createMessage(id, destination, createRandomBuffer(5));
- if (storeImpl.page(msg))
+ if (storeImpl.page(msg, null))
{
buffers.put(id, msg);
}
@@ -246,7 +246,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
- storeImpl2.page(lastMsg);
+ storeImpl2.page(lastMsg, null);
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
More information about the jboss-cvs-commits
mailing list