[hornetq-commits] JBoss hornetq SVN: r10047 - trunk/src/main/org/hornetq/core/paging/cursor/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 16 17:29:28 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-16 17:29:27 -0500 (Thu, 16 Dec 2010)
New Revision: 10047
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
Log:
HORNETQ-574 more pagecounters stuff
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:14:18 UTC (rev 10046)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:29:27 UTC (rev 10047)
@@ -41,17 +41,15 @@
// Constants -----------------------------------------------------
static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+ // Attributes ----------------------------------------------------
- // Attributes ----------------------------------------------------
-
- // TODO: making this configurable
private static final int FLUSH_COUNTER = 1000;
private final long subscriptionID;
-
+
// the journal record id that is holding the current value
private long recordID = -1;
-
+
private final boolean persistent;
private final StorageManager storage;
@@ -59,11 +57,11 @@
private final AtomicLong value = new AtomicLong(0);
private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
-
+
private LinkedList<Pair<Long, Integer>> loadList;
private final Executor executor;
-
+
private final Runnable cleanupCheck = new Runnable()
{
public void run()
@@ -71,14 +69,17 @@
cleanup();
}
};
-
+
// protected LinkedList
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageSubscriptionCounterImpl(final StorageManager storage, final boolean persistent, final long subscriptionID, final Executor executor)
+ public PageSubscriptionCounterImpl(final StorageManager storage,
+ final boolean persistent,
+ final long subscriptionID,
+ final Executor executor)
{
this.subscriptionID = subscriptionID;
this.storage = storage;
@@ -100,11 +101,18 @@
public void increment(Transaction tx, int add) throws Exception
{
tx.setContainsPersistent();
-
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
- replayIncrement(tx, id, add);
+ if (!persistent)
+ {
+ replayIncrement(tx, -1, add);
+ }
+ else
+ {
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+ replayIncrement(tx, id, add);
+ }
+
}
/**
@@ -126,7 +134,7 @@
oper.operations.add(new ItemOper(this, recordID, add));
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
*/
@@ -135,8 +143,6 @@
this.value.set(value);
this.recordID = recordID;
}
-
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
@@ -158,9 +164,9 @@
{
if (loadList == null)
{
- loadList = new LinkedList<Pair<Long,Integer>>();
+ loadList = new LinkedList<Pair<Long, Integer>>();
}
-
+
loadList.add(new Pair<Long, Integer>(id, add));
}
@@ -187,14 +193,18 @@
public void addInc(long id, int variance)
{
value.addAndGet(variance);
- incrementRecords.add(id);
+
+ if (id >= 0)
+ {
+ incrementRecords.add(id);
+ }
}
/** This method sould alwas be called from a single threaded executor */
protected void cleanup()
{
ArrayList<Long> deleteList;
-
+
long valueReplace;
synchronized (this)
{
@@ -203,33 +213,33 @@
deleteList.addAll(incrementRecords);
incrementRecords.clear();
}
-
+
long newRecordID = -1;
long txCleanup = storage.generateUniqueID();
-
+
try
{
for (Long value : deleteList)
{
storage.deleteIncrementRecord(txCleanup, value);
}
-
+
if (recordID >= 0)
{
storage.deletePageCounter(txCleanup, recordID);
}
-
- newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
-
+
+ newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+
storage.commit(txCleanup);
-
+
storage.waitOnOperations();
}
catch (Exception e)
{
newRecordID = recordID;
-
+
log.warn(e.getMessage(), e);
try
{
More information about the hornetq-commits
mailing list