[hornetq-commits] JBoss hornetq SVN: r10044 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 16 10:49:32 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-16 10:49:32 -0500 (Thu, 16 Dec 2010)
New Revision: 10044

Added:
   trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Removed:
   trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
Modified:
   trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   trunk/src/main/org/hornetq/core/persistence/StorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
PageCounters first commit

Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -33,7 +33,10 @@
    // To be called before the server is down
    void stop();
 
+   // TODO: this method is only used on testcases and can go away
    void bookmark(PagePosition position) throws Exception;
+   
+   PageSubscriptionCounter getCounter();
 
    long getId();
 

Added: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A PagingSubscriptionCounterInterface
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface PageSubscriptionCounter
+{
+
+   public abstract long getValue();
+
+   public abstract void increment(Transaction tx, int add) throws Exception;
+
+   public abstract void loadValue(final long recordValueID, final long value);
+
+   public abstract void incrementProcessed(long id, int variance);
+
+   /**
+    * 
+    * This method is also used by Journal.loadMessageJournal
+    * @param id
+    * @param variance
+    */
+   public abstract void addInc(long id, int variance);
+
+}
\ No newline at end of file

Added: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
+
+/**
+ * This class will encapsulate the persistent counters for the PagingSubscription
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
+{
+
+   // Constants -----------------------------------------------------
+   static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+
+
+   // Attributes ----------------------------------------------------
+   
+   // TODO: making this configurable
+   private static final int FLUSH_COUNTER = 1000;
+
+   private final long subscriptionID;
+   
+   // the journal record id that is holding the current value
+   private long recordID = -1;
+   
+   private final boolean persistent;
+
+   private final StorageManager storage;
+
+   private final AtomicLong value = new AtomicLong(0);
+
+   private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
+
+   private final Executor executor;
+   
+   private final Runnable cleanupCheck = new Runnable()
+   {
+      public void run()
+      {
+         cleanup();
+      }
+   };
+   
+   // protected LinkedList
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public PageSubscriptionCounterImpl(final StorageManager storage, final boolean persistent, final long subscriptionID, final Executor executor)
+   {
+      this.subscriptionID = subscriptionID;
+      this.storage = storage;
+      this.executor = executor;
+      this.persistent = persistent;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#getValue()
+    */
+   public long getValue()
+   {
+      return value.get();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#increment(org.hornetq.core.transaction.Transaction, int)
+    */
+   public void increment(Transaction tx, int add) throws Exception
+   {
+      CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
+
+      if (oper == null)
+      {
+         oper = new CounterOperations();
+         tx.putProperty(TransactionPropertyIndexes.PAGE_COUNT_INC, oper);
+         tx.addOperation(oper);
+      }
+
+      long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+
+      oper.operations.add(new ItemOper(this, id, add));
+
+   }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
+    */
+   public synchronized void loadValue(final long recordValueID, final long value)
+   {
+      this.value.set(value);
+   }
+   
+   
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
+    */
+   public synchronized void incrementProcessed(long id, int variance)
+   {
+      addInc(id, variance);
+      if (incrementRecords.size() > FLUSH_COUNTER)
+      {
+         executor.execute(cleanupCheck);
+      }
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#addInc(long, int)
+    */
+   public void addInc(long id, int variance)
+   {
+      value.addAndGet(variance);
+      incrementRecords.add(id);
+   }
+
+   /** This method sould alwas be called from a single threaded executor */
+   protected void cleanup()
+   {
+      ArrayList<Long> deleteList;
+      
+      long valueReplace;
+      synchronized (this)
+      {
+         valueReplace = value.get();
+         deleteList = new ArrayList<Long>(incrementRecords.size());
+         deleteList.addAll(incrementRecords);
+         incrementRecords.clear();
+      }
+      
+      long newRecordID = -1;
+
+      long txCleanup = storage.generateUniqueID();
+      
+      try
+      {
+         for (Long value : deleteList)
+         {
+            storage.deleteIncrementRecord(txCleanup, value);
+         }
+         
+         if (recordID >= 0)
+         {
+            storage.deletePageCounter(txCleanup, recordID);
+         }
+         
+         newRecordID = storage.storePageCounter(txCleanup, subscriptionID,  valueReplace);
+         
+         storage.commit(txCleanup);
+         
+         storage.waitOnOperations();
+      }
+      catch (Exception e)
+      {
+         newRecordID = recordID;
+         
+         log.warn(e.getMessage(), e);
+         try
+         {
+            storage.rollback(txCleanup);
+         }
+         catch (Exception ignored)
+         {
+         }
+      }
+      finally
+      {
+         recordID = newRecordID;
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   static class ItemOper
+   {
+
+      public ItemOper(PageSubscriptionCounter counter, long id, int add)
+      {
+         this.counter = counter;
+         this.id = id;
+         this.ammount = add;
+      }
+
+      PageSubscriptionCounter counter;
+
+      long id;
+
+      int ammount;
+   }
+
+   static class CounterOperations implements TransactionOperation
+   {
+      LinkedList<ItemOper> operations = new LinkedList<ItemOper>();
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforePrepare(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterPrepare(Transaction tx)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforeCommit(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterCommit(Transaction tx)
+      {
+         for (ItemOper oper : operations)
+         {
+            oper.counter.incrementProcessed(oper.id, oper.ammount);
+         }
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforeRollback(Transaction tx) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+       */
+      public void afterRollback(Transaction tx)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+       */
+      public List<MessageReference> getRelatedMessageReferences()
+      {
+         return null;
+      }
+   }
+
+}

Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -39,6 +39,7 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
 import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.MessageReference;
@@ -98,6 +99,8 @@
    private List<PagePosition> recoveredACK;
 
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+   
+   private final PageSubscriptionCounter counter;
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
    private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -121,6 +124,7 @@
       this.executor = executor;
       this.filter = filter;
       this.persistent = persistent;
+      this.counter = new PageSubscriptionCounterImpl(store, persistent, cursorId, executor);
    }
 
    // Public --------------------------------------------------------
@@ -167,6 +171,11 @@
       ack(position);
    }
 
+   public PageSubscriptionCounter getCounter()
+   {
+      return counter;
+   }
+   
    public void scheduleCleanupCheck()
    {
       if (autoCleanup)

Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -39,7 +39,6 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.utils.UUID;
 
 /**
  * 
@@ -190,4 +189,21 @@
    void deleteSecurityRoles(SimpleString addressMatch) throws Exception;
 
    List<PersistedRoles> recoverPersistedRoles() throws Exception;
+   
+   /** 
+    * @return The ID with the stored counter
+    */
+   long storePageCounter(long txID, long queueID, long value) throws Exception;
+   
+   void deleteIncrementRecord(long txID, long recordID) throws Exception;
+   
+   void deletePageCounter(long txID, long recordID) throws Exception;
+
+   /**
+    * @return the ID with the increment record
+    * @throws Exception 
+    */
+   long storePageCounterInc(long txID, long queueID, int add) throws Exception;
+   
+   
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -52,8 +52,8 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.GroupingInfo;
@@ -75,9 +75,9 @@
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.ExecutorFactory;
@@ -138,6 +138,10 @@
    public static final byte HEURISTIC_COMPLETION = 38;
 
    public static final byte ACKNOWLEDGE_CURSOR = 39;
+   
+   public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
+   
+   public static final byte PAGE_CURSOR_COUNTER_INC = 41;
 
    private UUID persistentID;
 
@@ -1166,7 +1170,48 @@
    {
       bindingsJournal.appendDeleteRecord(queueBindingID, true);
    }
+   
+   
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
+    */
+   public long storePageCounterInc(long txID, long queueID, int value) throws Exception
+   {
+      long recordID = idGenerator.generateID();
+      messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_INC, new PageCountRecord(queueID, value));
+      return recordID;
+   }
+
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+    */
+   public long storePageCounter(long txID, long queueID, long value) throws Exception
+   {
+      long recordID = idGenerator.generateID();
+      messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
+      return recordID;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+    */
+   public void deleteIncrementRecord(long txID, long recordID) throws Exception
+   {
+      messageJournal.appendDeleteRecordTransactional(txID, recordID);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+    */
+   public void deletePageCounter(long txID, long recordID) throws Exception
+   {
+      messageJournal.appendDeleteRecordTransactional(txID, recordID);
+   }
+   
+   
+   
    public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
                                                     final List<GroupingInfo> groupingInfos) throws Exception
    {
@@ -2253,7 +2298,53 @@
 
    }
    
+   private static final class PageCountRecord implements EncodingSupport
+   {
+      
+      PageCountRecord()
+      {
+         
+      }
+      
+      PageCountRecord(long queueID, long value)
+      {
+         this.queueID = queueID;
+         this.value = value;
+      }
+      
+      long queueID;
+      
+      long value;
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return DataConstants.SIZE_LONG * 2;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void encode(HornetQBuffer buffer)
+      {
+         buffer.writeLong(queueID);
+         buffer.writeLong(value);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void decode(HornetQBuffer buffer)
+      {
+         queueID = buffer.readLong();
+         value = buffer.readLong();
+      }
+      
+      
+   }
+
    private static final class AddMessageRecord
    {
       public AddMessageRecord(final ServerMessage message)

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -471,4 +471,40 @@
       
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+    */
+   public long storePageCounter(long txID, long queueID, long value) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+    */
+   public void deleteIncrementRecord(long txID, long recordID) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+    */
+   public void deletePageCounter(long txID, long recordID) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long, int)
+    */
+   public long storePageCounterInc(long txID, long queueID, int add) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -135,6 +135,8 @@
                      SimpleString filterString,
                      boolean durable,
                      boolean temporary) throws Exception;
+   
+   Queue locateQueue(SimpleString queueName) throws Exception;
 
    void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -22,9 +22,8 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-	import java.util.Map.Entry;
+import java.util.Map.Entry;
 import java.util.Set;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -83,7 +82,15 @@
 import org.hornetq.core.security.Role;
 import org.hornetq.core.security.SecurityStore;
 import org.hornetq.core.security.impl.SecurityStoreImpl;
-import org.hornetq.core.server.*;
+import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.Bindable;
+import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.MemoryManager;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.Transformer;
 import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -956,6 +963,20 @@
    {
       return createQueue(address, queueName, filterString, durable, temporary, false);
    }
+   
+   public Queue locateQueue(SimpleString queueName) throws Exception
+   {
+      Binding binding = postOffice.getBinding(queueName);
+      
+      Bindable queue = binding.getBindable();
+      
+      if (!(queue instanceof Queue))
+      {
+         throw new IllegalStateException("locateQueue should only be used to locate queues");
+      }
+      
+      return (Queue) binding.getBindable();
+   }
 
    public Queue deployQueue(final SimpleString address,
                             final SimpleString queueName,

Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -25,6 +25,8 @@
 public class TransactionPropertyIndexes
 {
 
+   public static final int PAGE_COUNT_INC = 3;
+   
    public static final int PAGE_TRANSACTION_UPDATE = 4;
    
    public static final int PAGE_TRANSACTION = 5;

Deleted: trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -1,67 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.paging;
-
-import org.hornetq.tests.util.ServiceTestBase;
-
-/**
- * A PageCursorTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PagePositionTest extends ServiceTestBase
-{
-
-   // Test what would happen on redelivery situations
-   public void testRedeliverLike()
-   {
-      
-   }
-   
-   public void testRedeliverPersistence()
-   {
-      
-   }
-   
-   public void testDeletePagesAfterRedelivery()
-   {
-      
-   }
-   
-   public void testNextAfterPosition()
-   {
-      
-   }
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Added: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingCounterTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingCounterTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private HornetQServer server;
+
+   private ServerLocator sl;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testCounter() throws Exception
+   {
+      ClientSessionFactory sf = sl.createSessionFactory();
+      ClientSession session = sf.createSession();
+
+      try
+      {
+         Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+         PageSubscriptionCounter counter = locateCounter(queue);
+
+         StorageManager storage = server.getStorageManager();
+
+         Transaction tx = new TransactionImpl(server.getStorageManager());
+
+         counter.increment(tx, 1);
+
+         assertEquals(0, counter.getValue());
+
+         tx.commit();
+
+         storage.waitOnOperations();
+
+         assertEquals(1, counter.getValue());
+      }
+      finally
+      {
+         sf.close();
+         session.close();
+      }
+   }
+
+   public void testRestartCounter() throws Exception
+   {
+      Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+      PageSubscriptionCounter counter = locateCounter(queue);
+
+      StorageManager storage = server.getStorageManager();
+
+      Transaction tx = new TransactionImpl(server.getStorageManager());
+
+      counter.increment(tx, 1);
+
+      assertEquals(0, counter.getValue());
+
+      tx.commit();
+
+      storage.waitOnOperations();
+
+      assertEquals(1, counter.getValue());
+
+      sl.close();
+
+      server.stop();
+
+      server = newHornetQServer();
+
+      server.start();
+      
+      queue = server.locateQueue(new SimpleString("A1"));
+      
+      assertNotNull(queue);
+      
+      counter = locateCounter(queue);
+      
+      //assertEquals(1, counter.getValue());
+
+   }
+
+   /**
+    * @param queue
+    * @return
+    * @throws Exception
+    */
+   private PageSubscriptionCounter locateCounter(Queue queue) throws Exception
+   {
+      PageSubscription subscription = server.getPagingManager()
+                                            .getPageStore(new SimpleString("A1"))
+                                            .getCursorProvier()
+                                            .getSubscription(queue.getID());
+
+      PageSubscriptionCounter counter = subscription.getCounter();
+      return counter;
+   }
+
+   public void testPrepareCounter() throws Exception
+   {
+      ClientSessionFactory sf = sl.createSessionFactory();
+      ClientSession session = sf.createSession();
+
+      try
+      {
+         Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+         PageSubscriptionCounter counter = locateCounter(queue);
+
+         StorageManager storage = server.getStorageManager();
+
+         Transaction tx = new TransactionImpl(server.getStorageManager());
+
+         counter.increment(tx, 1);
+
+         assertEquals(0, counter.getValue());
+
+         tx.commit();
+
+         storage.waitOnOperations();
+
+         assertEquals(1, counter.getValue());
+      }
+      finally
+      {
+         sf.close();
+         session.close();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      server = newHornetQServer();
+
+      server.start();
+
+      sl = createInVMNonHALocator();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      sl.close();
+
+      server.stop();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   private HornetQServer newHornetQServer()
+   {
+      HornetQServer server = super.createServer(true, false);
+
+      AddressSettings defaultSetting = new AddressSettings();
+      defaultSetting.setPageSizeBytes(10 * 1024);
+      defaultSetting.setMaxSizeBytes(20 * 1024);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      return server;
+   }
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-12-16 15:49:32 UTC (rev 10044)
@@ -1590,6 +1590,42 @@
 
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+       */
+      public long storePageCounter(long txID, long queueID, long value) throws Exception
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+       */
+      public void deleteIncrementRecord(long txID, long recordID) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+       */
+      public void deletePageCounter(long txID, long recordID) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long, int)
+       */
+      public long storePageCounterInc(long txID, long queueID, int add) throws Exception
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list