[hornetq-commits] JBoss hornetq SVN: r10044 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 8 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 16 10:49:32 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-16 10:49:32 -0500 (Thu, 16 Dec 2010)
New Revision: 10044
Added:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Removed:
trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
PageCounters first commit
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -33,7 +33,10 @@
// To be called before the server is down
void stop();
+ // TODO: this method is only used on testcases and can go away
void bookmark(PagePosition position) throws Exception;
+
+ PageSubscriptionCounter getCounter();
long getId();
Added: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A PagingSubscriptionCounterInterface
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface PageSubscriptionCounter
+{
+
+ public abstract long getValue();
+
+ public abstract void increment(Transaction tx, int add) throws Exception;
+
+ public abstract void loadValue(final long recordValueID, final long value);
+
+ public abstract void incrementProcessed(long id, int variance);
+
+ /**
+ *
+ * This method is also used by Journal.loadMessageJournal
+ * @param id
+ * @param variance
+ */
+ public abstract void addInc(long id, int variance);
+
+}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
+
+/**
+ * This class will encapsulate the persistent counters for the PagingSubscription
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
+{
+
+ // Constants -----------------------------------------------------
+ static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+
+
+ // Attributes ----------------------------------------------------
+
+ // TODO: making this configurable
+ private static final int FLUSH_COUNTER = 1000;
+
+ private final long subscriptionID;
+
+ // the journal record id that is holding the current value
+ private long recordID = -1;
+
+ private final boolean persistent;
+
+ private final StorageManager storage;
+
+ private final AtomicLong value = new AtomicLong(0);
+
+ private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
+
+ private final Executor executor;
+
+ private final Runnable cleanupCheck = new Runnable()
+ {
+ public void run()
+ {
+ cleanup();
+ }
+ };
+
+ // protected LinkedList
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageSubscriptionCounterImpl(final StorageManager storage, final boolean persistent, final long subscriptionID, final Executor executor)
+ {
+ this.subscriptionID = subscriptionID;
+ this.storage = storage;
+ this.executor = executor;
+ this.persistent = persistent;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#getValue()
+ */
+ public long getValue()
+ {
+ return value.get();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#increment(org.hornetq.core.transaction.Transaction, int)
+ */
+ public void increment(Transaction tx, int add) throws Exception
+ {
+ CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
+
+ if (oper == null)
+ {
+ oper = new CounterOperations();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_COUNT_INC, oper);
+ tx.addOperation(oper);
+ }
+
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+
+ oper.operations.add(new ItemOper(this, id, add));
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
+ */
+ public synchronized void loadValue(final long recordValueID, final long value)
+ {
+ this.value.set(value);
+ }
+
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
+ */
+ public synchronized void incrementProcessed(long id, int variance)
+ {
+ addInc(id, variance);
+ if (incrementRecords.size() > FLUSH_COUNTER)
+ {
+ executor.execute(cleanupCheck);
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#addInc(long, int)
+ */
+ public void addInc(long id, int variance)
+ {
+ value.addAndGet(variance);
+ incrementRecords.add(id);
+ }
+
+ /** This method sould alwas be called from a single threaded executor */
+ protected void cleanup()
+ {
+ ArrayList<Long> deleteList;
+
+ long valueReplace;
+ synchronized (this)
+ {
+ valueReplace = value.get();
+ deleteList = new ArrayList<Long>(incrementRecords.size());
+ deleteList.addAll(incrementRecords);
+ incrementRecords.clear();
+ }
+
+ long newRecordID = -1;
+
+ long txCleanup = storage.generateUniqueID();
+
+ try
+ {
+ for (Long value : deleteList)
+ {
+ storage.deleteIncrementRecord(txCleanup, value);
+ }
+
+ if (recordID >= 0)
+ {
+ storage.deletePageCounter(txCleanup, recordID);
+ }
+
+ newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+
+ storage.commit(txCleanup);
+
+ storage.waitOnOperations();
+ }
+ catch (Exception e)
+ {
+ newRecordID = recordID;
+
+ log.warn(e.getMessage(), e);
+ try
+ {
+ storage.rollback(txCleanup);
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ finally
+ {
+ recordID = newRecordID;
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ static class ItemOper
+ {
+
+ public ItemOper(PageSubscriptionCounter counter, long id, int add)
+ {
+ this.counter = counter;
+ this.id = id;
+ this.ammount = add;
+ }
+
+ PageSubscriptionCounter counter;
+
+ long id;
+
+ int ammount;
+ }
+
+ static class CounterOperations implements TransactionOperation
+ {
+ LinkedList<ItemOper> operations = new LinkedList<ItemOper>();
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterCommit(Transaction tx)
+ {
+ for (ItemOper oper : operations)
+ {
+ oper.counter.incrementProcessed(oper.id, oper.ammount);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterRollback(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+ */
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -39,6 +39,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.MessageReference;
@@ -98,6 +99,8 @@
private List<PagePosition> recoveredACK;
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+
+ private final PageSubscriptionCounter counter;
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -121,6 +124,7 @@
this.executor = executor;
this.filter = filter;
this.persistent = persistent;
+ this.counter = new PageSubscriptionCounterImpl(store, persistent, cursorId, executor);
}
// Public --------------------------------------------------------
@@ -167,6 +171,11 @@
ack(position);
}
+ public PageSubscriptionCounter getCounter()
+ {
+ return counter;
+ }
+
public void scheduleCleanupCheck()
{
if (autoCleanup)
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -39,7 +39,6 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.utils.UUID;
/**
*
@@ -190,4 +189,21 @@
void deleteSecurityRoles(SimpleString addressMatch) throws Exception;
List<PersistedRoles> recoverPersistedRoles() throws Exception;
+
+ /**
+ * @return The ID with the stored counter
+ */
+ long storePageCounter(long txID, long queueID, long value) throws Exception;
+
+ void deleteIncrementRecord(long txID, long recordID) throws Exception;
+
+ void deletePageCounter(long txID, long recordID) throws Exception;
+
+ /**
+ * @return the ID with the increment record
+ * @throws Exception
+ */
+ long storePageCounterInc(long txID, long queueID, int add) throws Exception;
+
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -52,8 +52,8 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
@@ -75,9 +75,9 @@
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
@@ -138,6 +138,10 @@
public static final byte HEURISTIC_COMPLETION = 38;
public static final byte ACKNOWLEDGE_CURSOR = 39;
+
+ public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
+
+ public static final byte PAGE_CURSOR_COUNTER_INC = 41;
private UUID persistentID;
@@ -1166,7 +1170,48 @@
{
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
+ */
+ public long storePageCounterInc(long txID, long queueID, int value) throws Exception
+ {
+ long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_INC, new PageCountRecord(queueID, value));
+ return recordID;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+ */
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
+ return recordID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+ */
+ public void deleteIncrementRecord(long txID, long recordID) throws Exception
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+ */
+ public void deletePageCounter(long txID, long recordID) throws Exception
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
+
+
+
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception
{
@@ -2253,7 +2298,53 @@
}
+ private static final class PageCountRecord implements EncodingSupport
+ {
+
+ PageCountRecord()
+ {
+
+ }
+
+ PageCountRecord(long queueID, long value)
+ {
+ this.queueID = queueID;
+ this.value = value;
+ }
+
+ long queueID;
+
+ long value;
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG * 2;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(queueID);
+ buffer.writeLong(value);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ queueID = buffer.readLong();
+ value = buffer.readLong();
+ }
+
+
+ }
+
private static final class AddMessageRecord
{
public AddMessageRecord(final ServerMessage message)
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -471,4 +471,40 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+ */
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+ */
+ public void deleteIncrementRecord(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+ */
+ public void deletePageCounter(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long, int)
+ */
+ public long storePageCounterInc(long txID, long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -135,6 +135,8 @@
SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
+
+ Queue locateQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -22,9 +22,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
- import java.util.Map.Entry;
+import java.util.Map.Entry;
import java.util.Set;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -83,7 +82,15 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.security.impl.SecurityStoreImpl;
-import org.hornetq.core.server.*;
+import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.Bindable;
+import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.MemoryManager;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -956,6 +963,20 @@
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
+
+ public Queue locateQueue(SimpleString queueName) throws Exception
+ {
+ Binding binding = postOffice.getBinding(queueName);
+
+ Bindable queue = binding.getBindable();
+
+ if (!(queue instanceof Queue))
+ {
+ throw new IllegalStateException("locateQueue should only be used to locate queues");
+ }
+
+ return (Queue) binding.getBindable();
+ }
public Queue deployQueue(final SimpleString address,
final SimpleString queueName,
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -25,6 +25,8 @@
public class TransactionPropertyIndexes
{
+ public static final int PAGE_COUNT_INC = 3;
+
public static final int PAGE_TRANSACTION_UPDATE = 4;
public static final int PAGE_TRANSACTION = 5;
Deleted: trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -1,67 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.paging;
-
-import org.hornetq.tests.util.ServiceTestBase;
-
-/**
- * A PageCursorTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PagePositionTest extends ServiceTestBase
-{
-
- // Test what would happen on redelivery situations
- public void testRedeliverLike()
- {
-
- }
-
- public void testRedeliverPersistence()
- {
-
- }
-
- public void testDeletePagesAfterRedelivery()
- {
-
- }
-
- public void testNextAfterPosition()
- {
-
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Added: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingCounterTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingCounterTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ private ServerLocator sl;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCounter() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ counter.increment(tx, 1);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(1, counter.getValue());
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
+ public void testRestartCounter() throws Exception
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ counter.increment(tx, 1);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(1, counter.getValue());
+
+ sl.close();
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ //assertEquals(1, counter.getValue());
+
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws Exception
+ */
+ private PageSubscriptionCounter locateCounter(Queue queue) throws Exception
+ {
+ PageSubscription subscription = server.getPagingManager()
+ .getPageStore(new SimpleString("A1"))
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ PageSubscriptionCounter counter = subscription.getCounter();
+ return counter;
+ }
+
+ public void testPrepareCounter() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ counter.increment(tx, 1);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(1, counter.getValue());
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ sl = createInVMNonHALocator();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ sl.close();
+
+ server.stop();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private HornetQServer newHornetQServer()
+ {
+ HornetQServer server = super.createServer(true, false);
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(10 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -1590,6 +1590,42 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+ */
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+ */
+ public void deleteIncrementRecord(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+ */
+ public void deletePageCounter(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long, int)
+ */
+ public long storePageCounterInc(long txID, long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
More information about the hornetq-commits
mailing list