[hornetq-commits] JBoss hornetq SVN: r10045 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 16 14:47:44 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-16 14:47:43 -0500 (Thu, 16 Dec 2010)
New Revision: 10045
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
Page Counters commit 2
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -25,20 +25,25 @@
public interface PageSubscriptionCounter
{
- public abstract long getValue();
+ long getValue();
- public abstract void increment(Transaction tx, int add) throws Exception;
+ void increment(Transaction tx, int add) throws Exception;
- public abstract void loadValue(final long recordValueID, final long value);
+ void loadValue(final long recordValueID, final long value);
+
+ void loadInc(final long recordInd, final int add);
+
+ void replayIncrement(Transaction tx, long recordID, int add);
+
+ /** This will process the reload */
+ void processReload();
- public abstract void incrementProcessed(long id, int variance);
-
/**
*
* This method is also used by Journal.loadMessageJournal
* @param id
* @param variance
*/
- public abstract void addInc(long id, int variance);
+ void addInc(long id, int variance);
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
+import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.persistence.StorageManager;
@@ -58,6 +59,8 @@
private final AtomicLong value = new AtomicLong(0);
private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
+
+ private LinkedList<Pair<Long, Integer>> loadList;
private final Executor executor;
@@ -96,6 +99,22 @@
*/
public void increment(Transaction tx, int add) throws Exception
{
+ tx.setContainsPersistent();
+
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+
+ replayIncrement(tx, id, add);
+
+ }
+
+ /**
+ * This method will install the prepared TXs
+ * @param tx
+ * @param recordID
+ * @param add
+ */
+ public void replayIncrement(Transaction tx, long recordID, int add)
+ {
CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
if (oper == null)
@@ -105,18 +124,16 @@
tx.addOperation(oper);
}
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-
- oper.operations.add(new ItemOper(this, id, add));
-
+ oper.operations.add(new ItemOper(this, recordID, add));
}
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
*/
- public synchronized void loadValue(final long recordValueID, final long value)
+ public synchronized void loadValue(final long recordID, final long value)
{
this.value.set(value);
+ this.recordID = recordID;
}
@@ -124,9 +141,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
*/
- public synchronized void incrementProcessed(long id, int variance)
+ public synchronized void incrementProcessed(long id, int add)
{
- addInc(id, variance);
+ addInc(id, add);
if (incrementRecords.size() > FLUSH_COUNTER)
{
executor.execute(cleanupCheck);
@@ -135,6 +152,33 @@
}
/* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
+ */
+ public void loadInc(long id, int add)
+ {
+ if (loadList == null)
+ {
+ loadList = new LinkedList<Pair<Long,Integer>>();
+ }
+
+ loadList.add(new Pair<Long, Integer>(id, add));
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#processReload()
+ */
+ public void processReload()
+ {
+ for (Pair<Long, Integer> incElement : loadList)
+ {
+ value.addAndGet(incElement.b);
+ incrementRecords.add(incElement.a);
+ }
+ loadList.clear();
+ loadList = null;
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#addInc(long, int)
*/
public void addInc(long id, int variance)
@@ -211,14 +255,14 @@
static class ItemOper
{
- public ItemOper(PageSubscriptionCounter counter, long id, int add)
+ public ItemOper(PageSubscriptionCounterImpl counter, long id, int add)
{
this.counter = counter;
this.id = id;
this.ammount = add;
}
- PageSubscriptionCounter counter;
+ PageSubscriptionCounterImpl counter;
long id;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -52,8 +52,10 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
@@ -138,9 +140,9 @@
public static final byte HEURISTIC_COMPLETION = 38;
public static final byte ACKNOWLEDGE_CURSOR = 39;
-
+
public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
-
+
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
private UUID persistentID;
@@ -278,7 +280,7 @@
}
else
{
- idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
+ idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
@@ -440,7 +442,7 @@
}
LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
-
+
largeMessage.copyHeadersAndProperties(message);
largeMessage.setMessageID(id);
@@ -496,16 +498,18 @@
syncNonTransactional,
getContext(syncNonTransactional));
}
-
+
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
{
- long ackID = idGenerator.generateID();
- position.setRecordID(ackID);
- messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecord(ackID,
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
-
-
public void deleteMessage(final long messageID) throws Exception
{
// Messages are deleted on postACK, one after another.
@@ -592,8 +596,7 @@
{
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
- depages),
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -621,9 +624,11 @@
{
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
- messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position));
+ messageJournal.appendAddRecordTransactional(txID,
+ ackID,
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position));
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
@@ -633,7 +638,6 @@
messageJournal.appendDeleteRecordTransactional(txID, ackID);
}
-
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
{
long id = generateUniqueID();
@@ -807,6 +811,8 @@
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
+ Map<Long, PageSubscription> pageSubscriptions = new HashMap<Long, PageSubscription>();
+
final int totalSize = records.size();
for (int reccount = 0; reccount < totalSize; reccount++)
@@ -1011,25 +1017,62 @@
{
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
-
+
encoding.position.setRecordID(record.id);
-
- QueueBindingInfo queueInfo = queueInfos.get(encoding.queueID);
-
- if (queueInfo != null)
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
{
- SimpleString address = queueInfo.getAddress();
- PagingStore store = pagingManager.getPageStore(address);
- PageSubscription cursor = store.getCursorProvier().getSubscription(encoding.queueID);
- cursor.reloadACK(encoding.position);
+ sub.reloadACK(encoding.position);
}
else
{
log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
}
-
+
break;
}
+ case PAGE_CURSOR_COUNTER_VALUE:
+ {
+ PageCountRecord encoding = new PageCountRecord();
+
+ encoding.decode(buff);
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.getCounter().loadValue(record.id, encoding.value);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
+
+ case PAGE_CURSOR_COUNTER_INC:
+ {
+ PageCountRecordInc encoding = new PageCountRecordInc();
+
+ encoding.decode(buff);
+
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.getCounter().loadValue(record.id, encoding.value);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
default:
{
throw new IllegalStateException("Invalid record type " + recordType);
@@ -1083,7 +1126,7 @@
}
}
- loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
+ loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
for (LargeServerMessage msg : largeMessages)
{
@@ -1110,7 +1153,7 @@
}
}
}
-
+
// To recover positions on Iterators
if (pagingManager != null)
{
@@ -1132,6 +1175,35 @@
return info;
}
+ /**
+ * @param queueID
+ * @param pageSubscriptions
+ * @param queueInfos
+ * @return
+ */
+ private PageSubscription locateSubscription(final long queueID,
+ final Map<Long, PageSubscription> pageSubscriptions,
+ final Map<Long, QueueBindingInfo> queueInfos,
+ final PagingManager pagingManager) throws Exception
+ {
+
+ PageSubscription subs = pageSubscriptions.get(queueID);
+ if (subs == null)
+ {
+ QueueBindingInfo queueInfo = queueInfos.get(queueID);
+
+ if (queueInfo != null)
+ {
+ SimpleString address = queueInfo.getAddress();
+ PagingStore store = pagingManager.getPageStore(address);
+ subs = store.getCursorProvier().getSubscription(queueID);
+ pageSubscriptions.put(queueID, subs);
+ }
+ }
+
+ return subs;
+ }
+
// grouping handler operations
public void addGrouping(final GroupBinding groupBinding) throws Exception
{
@@ -1170,8 +1242,6 @@
{
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
-
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
@@ -1179,18 +1249,23 @@
public long storePageCounterInc(long txID, long queueID, int value) throws Exception
{
long recordID = idGenerator.generateID();
- messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_INC, new PageCountRecord(queueID, value));
+ messageJournal.appendAddRecordTransactional(txID,
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value));
return recordID;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
*/
public long storePageCounter(long txID, long queueID, long value) throws Exception
{
long recordID = idGenerator.generateID();
- messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
+ messageJournal.appendAddRecordTransactional(txID,
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+ new PageCountRecord(queueID, value));
return recordID;
}
@@ -1209,9 +1284,7 @@
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
-
-
-
+
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception
{
@@ -1458,8 +1531,10 @@
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
+ final Map<Long, QueueBindingInfo> queueInfos,
final List<PreparedTransactionInfo> preparedTransactions,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+ final Map<Long, PageSubscription> pageSubscriptions) throws Exception
{
// recover prepared transactions
for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
@@ -1609,6 +1684,34 @@
// and make sure the rollback will work well also
break;
}
+ case PAGE_CURSOR_COUNTER_VALUE:
+ {
+ log.warn("PAGE_CURSOR_COUNTER_VALUE record used on a prepared statement, what shouldn't happen");
+
+ break;
+ }
+
+ case PAGE_CURSOR_COUNTER_INC:
+ {
+ PageCountRecordInc encoding = new PageCountRecordInc();
+
+ encoding.decode(buff);
+
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.getCounter().replayIncrement(tx, record.id, encoding.value);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
+
default:
{
JournalStorageManager.log.warn("InternalError: Record type " + recordType +
@@ -2297,23 +2400,23 @@
}
}
-
+
private static final class PageCountRecord implements EncodingSupport
{
-
+
PageCountRecord()
{
-
+
}
-
+
PageCountRecord(long queueID, long value)
{
this.queueID = queueID;
this.value = value;
}
-
+
long queueID;
-
+
long value;
/* (non-Javadoc)
@@ -2341,10 +2444,55 @@
queueID = buffer.readLong();
value = buffer.readLong();
}
-
-
+
}
+ private static final class PageCountRecordInc implements EncodingSupport
+ {
+
+ PageCountRecordInc()
+ {
+
+ }
+
+ PageCountRecordInc(long queueID, int value)
+ {
+ this.queueID = queueID;
+ this.value = value;
+ }
+
+ long queueID;
+
+ int value;
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(queueID);
+ buffer.writeInt(value);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ queueID = buffer.readLong();
+ value = buffer.readInt();
+ }
+
+ }
+
private static final class AddMessageRecord
{
public AddMessageRecord(final ServerMessage message)
@@ -2366,7 +2514,7 @@
this.queueID = queueID;
this.position = position;
}
-
+
public CursorAckRecordEncoding()
{
this.position = new PagePositionImpl();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -119,7 +119,7 @@
counter = locateCounter(queue);
- //assertEquals(1, counter.getValue());
+ assertEquals(1, counter.getValue());
}
More information about the hornetq-commits
mailing list