[hornetq-commits] JBoss hornetq SVN: r10047 - trunk/src/main/org/hornetq/core/paging/cursor/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 16 17:29:28 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-16 17:29:27 -0500 (Thu, 16 Dec 2010)
New Revision: 10047

Modified:
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
Log:
HORNETQ-574 more pagecounters stuff

Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 22:14:18 UTC (rev 10046)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 22:29:27 UTC (rev 10047)
@@ -41,17 +41,15 @@
    // Constants -----------------------------------------------------
    static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
 
+   // Attributes ----------------------------------------------------
 
-   // Attributes ----------------------------------------------------
-   
-   // TODO: making this configurable
    private static final int FLUSH_COUNTER = 1000;
 
    private final long subscriptionID;
-   
+
    // the journal record id that is holding the current value
    private long recordID = -1;
-   
+
    private final boolean persistent;
 
    private final StorageManager storage;
@@ -59,11 +57,11 @@
    private final AtomicLong value = new AtomicLong(0);
 
    private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
-   
+
    private LinkedList<Pair<Long, Integer>> loadList;
 
    private final Executor executor;
-   
+
    private final Runnable cleanupCheck = new Runnable()
    {
       public void run()
@@ -71,14 +69,17 @@
          cleanup();
       }
    };
-   
+
    // protected LinkedList
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public PageSubscriptionCounterImpl(final StorageManager storage, final boolean persistent, final long subscriptionID, final Executor executor)
+   public PageSubscriptionCounterImpl(final StorageManager storage,
+                                      final boolean persistent,
+                                      final long subscriptionID,
+                                      final Executor executor)
    {
       this.subscriptionID = subscriptionID;
       this.storage = storage;
@@ -100,11 +101,18 @@
    public void increment(Transaction tx, int add) throws Exception
    {
       tx.setContainsPersistent();
-      
-      long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
 
-      replayIncrement(tx, id, add);
+      if (!persistent)
+      {
+         replayIncrement(tx, -1, add);
+      }
+      else
+      {
+         long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
 
+         replayIncrement(tx, id, add);
+      }
+
    }
 
    /**
@@ -126,7 +134,7 @@
 
       oper.operations.add(new ItemOper(this, recordID, add));
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
     */
@@ -135,8 +143,6 @@
       this.value.set(value);
       this.recordID = recordID;
    }
-   
-   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
@@ -158,9 +164,9 @@
    {
       if (loadList == null)
       {
-         loadList = new LinkedList<Pair<Long,Integer>>();
+         loadList = new LinkedList<Pair<Long, Integer>>();
       }
-      
+
       loadList.add(new Pair<Long, Integer>(id, add));
    }
 
@@ -187,14 +193,18 @@
    public void addInc(long id, int variance)
    {
       value.addAndGet(variance);
-      incrementRecords.add(id);
+      
+      if (id >= 0)
+      {
+         incrementRecords.add(id);
+      }
    }
 
    /** This method sould alwas be called from a single threaded executor */
    protected void cleanup()
    {
       ArrayList<Long> deleteList;
-      
+
       long valueReplace;
       synchronized (this)
       {
@@ -203,33 +213,33 @@
          deleteList.addAll(incrementRecords);
          incrementRecords.clear();
       }
-      
+
       long newRecordID = -1;
 
       long txCleanup = storage.generateUniqueID();
-      
+
       try
       {
          for (Long value : deleteList)
          {
             storage.deleteIncrementRecord(txCleanup, value);
          }
-         
+
          if (recordID >= 0)
          {
             storage.deletePageCounter(txCleanup, recordID);
          }
-         
-         newRecordID = storage.storePageCounter(txCleanup, subscriptionID,  valueReplace);
-         
+
+         newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+
          storage.commit(txCleanup);
-         
+
          storage.waitOnOperations();
       }
       catch (Exception e)
       {
          newRecordID = recordID;
-         
+
          log.warn(e.getMessage(), e);
          try
          {



More information about the hornetq-commits mailing list