Author: clebert.suconic(a)jboss.com
Date: 2010-11-01 20:24:34 -0400 (Mon, 01 Nov 2010)
New Revision: 9829
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Removed:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Renaming Pagecursor to PageSubscription
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -14,7 +14,7 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
@@ -58,5 +58,5 @@
* @param cursorPos
* @return true if the message will be delivered later, false if it should be
delivered right away
*/
- boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos);
+ boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
}
Deleted:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -1,99 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.cursor;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.transaction.Transaction;
-import org.hornetq.utils.LinkedListIterator;
-
-/**
- * A PageCursor
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
- *
- */
-public interface PageCursor
-{
-
- // Cursor query operations --------------------------------------
-
- // To be called before the server is down
- void stop();
-
- void bookmark(PagePosition position) throws Exception;
-
- /** It will be 0 if non persistent cursor */
- public long getId();
-
- public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
-
- // To be called when the cursor is closed for good. Most likely when the queue is
deleted
- void close() throws Exception;
-
- void scheduleCleanupCheck();
-
- void cleanupEntries() throws Exception;
-
- void disableAutoCleanup();
-
- void enableAutoCleanup();
-
- void ack(PagePosition position) throws Exception;
-
- void ackTx(Transaction tx, PagePosition position) throws Exception;
- /**
- *
- * @return the first page in use or MAX_LONG if none is in use
- */
- long getFirstPage();
-
- // Reload operations
-
- /**
- * @param position
- */
- void reloadACK(PagePosition position);
-
- /**
- * To be called when the cursor decided to ignore a position.
- * @param position
- */
- void positionIgnored(PagePosition position);
-
- /**
- * To be used to avoid a redelivery of a prepared ACK after load
- * @param position
- */
- void reloadPreparedACK(Transaction tx, PagePosition position);
-
- void processReload() throws Exception;
-
- /**
- * To be used on redeliveries
- * @param position
- */
- void redeliver(PagePosition position);
-
- void printDebug();
-
- /**
- * @param minPage
- * @return
- */
- boolean isComplete(long minPage);
-
- void flushExecutors();
-}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -49,17 +49,17 @@
* @param queueId The cursorID should be the same as the queueId associated for
persistance
* @return
*/
- PageCursor getPersistentCursor(long queueId);
+ PageSubscription getPersistentCursor(long queueId);
- PageCursor createPersistentCursor(long queueId, Filter filter);
+ PageSubscription createPersistentSubscription(long queueId, Filter filter);
/**
* Create a non persistent cursor, usually associated with browsing
* @return
*/
- PageCursor createNonPersistentCursor(Filter filter);
+ PageSubscription createNonPersistentSubscription(Filter filter);
- Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos)
throws Exception;
+ Pair<PagePosition, PagedMessage> getNext(PageSubscription cursor, PagePosition
pos) throws Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
@@ -77,7 +77,7 @@
/**
* @param pageCursorImpl
*/
- void close(PageCursor pageCursorImpl);
+ void close(PageSubscription pageCursorImpl);
// to be used on tests -------------------------------------------
Copied:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
(from rev 9827,
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java)
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
(rev 0)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A PageCursor
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ *
+ *
+ */
+public interface PageSubscription
+{
+
+ // Cursor query operations --------------------------------------
+
+ // To be called before the server is down
+ void stop();
+
+ void bookmark(PagePosition position) throws Exception;
+
+ /** It will be 0 if non persistent cursor */
+ public long getId();
+
+ public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+
+ // To be called when the cursor is closed for good. Most likely when the queue is
deleted
+ void close() throws Exception;
+
+ void scheduleCleanupCheck();
+
+ void cleanupEntries() throws Exception;
+
+ void disableAutoCleanup();
+
+ void enableAutoCleanup();
+
+ void ack(PagePosition position) throws Exception;
+
+ void ackTx(Transaction tx, PagePosition position) throws Exception;
+ /**
+ *
+ * @return the first page in use or MAX_LONG if none is in use
+ */
+ long getFirstPage();
+
+ // Reload operations
+
+ /**
+ * @param position
+ */
+ void reloadACK(PagePosition position);
+
+ /**
+ * To be called when the cursor decided to ignore a position.
+ * @param position
+ */
+ void positionIgnored(PagePosition position);
+
+ /**
+ * To be used to avoid a redelivery of a prepared ACK after load
+ * @param position
+ */
+ void reloadPreparedACK(Transaction tx, PagePosition position);
+
+ void processReload() throws Exception;
+
+ /**
+ * To be used on redeliveries
+ * @param position
+ */
+ void redeliver(PagePosition position);
+
+ void printDebug();
+
+ /**
+ * @param minPage
+ * @return
+ */
+ boolean isComplete(long minPage);
+
+ void flushExecutors();
+}
Deleted:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -1,943 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.cursor.impl;
-
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursor;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
-import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperationAbstract;
-import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.Future;
-import org.hornetq.utils.LinkedListImpl;
-import org.hornetq.utils.LinkedListIterator;
-
-/**
- * A PageCursorImpl
- *
- * A page cursor will always store its
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
- *
- */
-public class PageCursorImpl implements PageCursor
-{
- // Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(PageCursorImpl.class);
-
- // Attributes ----------------------------------------------------
-
- private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
-
- private static void trace(final String message)
- {
- // PageCursorImpl.log.info(message);
- System.out.println(message);
- }
-
- private volatile boolean autoCleanup = true;
-
- private final StorageManager store;
-
- private final long cursorId;
-
- private final Filter filter;
-
- private final PagingStore pageStore;
-
- private final PageCursorProvider cursorProvider;
-
- private final Executor executor;
-
- private volatile PagePosition lastPosition;
-
- private volatile PagePosition lastAckedPosition;
-
- private List<PagePosition> recoveredACK;
-
- private final SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
-
- // We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
- private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new
LinkedListImpl<PagePosition>();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public PageCursorImpl(final PageCursorProvider cursorProvider,
- final PagingStore pageStore,
- final StorageManager store,
- final Executor executor,
- final Filter filter,
- final long cursorId)
- {
- this.pageStore = pageStore;
- this.store = store;
- this.cursorProvider = cursorProvider;
- this.cursorId = cursorId;
- this.executor = executor;
- this.filter = filter;
- }
-
- // Public --------------------------------------------------------
-
- public void disableAutoCleanup()
- {
- autoCleanup = false;
- }
-
- public void enableAutoCleanup()
- {
- autoCleanup = true;
- }
-
- public PageCursorProvider getProvider()
- {
- return cursorProvider;
- }
-
- public void bookmark(PagePosition position) throws Exception
- {
- if (lastPosition != null)
- {
- throw new RuntimeException("Bookmark can only be done at the time of the
cursor's creation");
- }
-
- lastPosition = position;
-
- PageCursorInfo cursorInfo = getPageInfo(position);
-
- if (position.getMessageNr() > 0)
- {
- cursorInfo.confirmed.addAndGet(position.getMessageNr());
- }
-
- ack(position);
- }
-
- class CursorIterator implements LinkedListIterator<Pair<PagePosition,
PagedMessage>>
- {
- PagePosition position = getLastPosition();
-
- PagePosition lastOperation = null;
-
- LinkedListIterator<PagePosition> redeliveryIterator =
redeliveries.iterator();
-
- boolean isredelivery = false;
-
- public void repeat()
- {
- if (isredelivery)
- {
- redeliveryIterator.repeat();
- }
- else
- {
- if (lastOperation == null)
- {
- position = getLastPosition();
- }
- else
- {
- position = lastOperation;
- }
- }
- }
-
- /* (non-Javadoc)
- * @see java.util.Iterator#next()
- */
- public Pair<PagePosition, PagedMessage> next()
- {
- try
- {
- if (redeliveryIterator.hasNext())
- {
- isredelivery = true;
- return getMessage(redeliveryIterator.next());
- }
- else
- {
- isredelivery = false;
- }
-
- Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
- if (nextPos != null)
- {
- position = nextPos.a;
- }
- return nextPos;
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- public boolean hasNext()
- {
- return true;
- }
-
- /* (non-Javadoc)
- * @see java.util.Iterator#remove()
- */
- public void remove()
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.utils.LinkedListIterator#close()
- */
- public void close()
- {
- }
- }
-
- private Pair<PagePosition, PagedMessage> getMessage(PagePosition pos) throws
Exception
- {
- return new Pair<PagePosition, PagedMessage>(pos,
cursorProvider.getMessage(pos));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
- */
- public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
- {
- return new CursorIterator();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
- */
- public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition
position) throws Exception
- {
- boolean match = false;
-
- Pair<PagePosition, PagedMessage> message = null;
-
- PagePosition tmpPosition = position;
-
- do
- {
- message = cursorProvider.getNext(this, tmpPosition);
-
- if (message != null)
- {
- tmpPosition = message.a;
-
- match = match(message.b.getMessage());
-
- if (!match)
- {
- processACK(message.a);
- }
- }
-
- }
- while (message != null && !match);
-
- return message;
- }
-
- /**
- *
- */
- private PagePosition getLastPosition()
- {
- if (lastPosition == null)
- {
- // it will start at the first available page
- long firstPage = pageStore.getFirstPage();
- lastPosition = new PagePositionImpl(firstPage, -1);
- }
-
- return lastPosition;
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
- */
- public void ack(final PagePosition position) throws Exception
- {
-
- // if we are dealing with a persistent cursor
- if (cursorId != 0)
- {
- store.storeCursorAcknowledge(cursorId, position);
- }
-
- store.afterCompleteOperations(new IOAsyncTask()
- {
-
- public void onError(final int errorCode, final String errorMessage)
- {
- }
-
- public void done()
- {
- processACK(position);
- }
- });
- }
-
- public void ackTx(final Transaction tx, final PagePosition position) throws Exception
- {
- // if the cursor is persistent
- if (cursorId != 0)
- {
- store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
- }
- installTXCallback(tx, position);
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
- */
- public long getFirstPage()
- {
- if (consumedPages.isEmpty())
- {
- return 0;
- }
- else
- {
- return consumedPages.firstKey();
- }
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
- */
- public synchronized void redeliver(final PagePosition position)
- {
- redeliveries.addTail(position);
- }
-
- /**
- * Theres no need to synchronize this method as it's only called from journal load
on startup
- */
- public void reloadACK(final PagePosition position)
- {
- if (recoveredACK == null)
- {
- recoveredACK = new LinkedList<PagePosition>();
- }
-
- recoveredACK.add(position);
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
- */
- public void reloadPreparedACK(final Transaction tx, final PagePosition position)
- {
- installTXCallback(tx, position);
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
- */
- public void positionIgnored(final PagePosition position)
- {
- processACK(position);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
- */
- public boolean isComplete(long page)
- {
- PageCursorInfo info = consumedPages.get(page);
- return info != null && info.isDone();
- }
-
- /**
- * All the data associated with the cursor should go away here
- */
- public void close() throws Exception
- {
- final long tx = store.generateUniqueID();
-
- final ArrayList<Exception> ex = new ArrayList<Exception>();
-
- final AtomicBoolean isPersistent = new AtomicBoolean(false);
-
- // We can't delete the records at the caller's thread
- // because an executor may be holding the synchronized on PageCursorImpl
- // what would lead to a dead lock
- // so, we delete it inside the executor also
- // and wait for the result
- // The caller will be treating eventual IO exceptions and dispatching to the
original thread's caller
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- try
- {
- synchronized (PageCursorImpl.this)
- {
- for (PageCursorInfo cursor : consumedPages.values())
- {
- for (PagePosition info : cursor.acks)
- {
- if (info.getRecordID() != 0)
- {
- isPersistent.set(true);
- store.deleteCursorAcknowledgeTransactional(tx,
info.getRecordID());
- }
- }
- }
- }
- }
- catch (Exception e)
- {
- ex.add(e);
- PageCursorImpl.log.warn(e.getMessage(), e);
- }
- }
- });
-
- Future future = new Future();
-
- executor.execute(future);
-
- while (!future.await(5000))
- {
- PageCursorImpl.log.warn("Timeout on waiting cursor " + this + "
to be closed");
- }
-
- if (isPersistent.get())
- {
- // Another reason to perform the commit at the main thread is because the
OperationContext may only send the
- // result to the client when
- // the IO on commit is done
- if (ex.size() == 0)
- {
- store.commit(tx);
- }
- else
- {
- store.rollback(tx);
- throw ex.get(0);
- }
- }
-
- cursorProvider.close(this);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#getId()
- */
- public long getId()
- {
- return cursorId;
- }
-
- public void processReload() throws Exception
- {
- if (recoveredACK != null)
- {
- if (isTrace)
- {
- PageCursorImpl.trace("********** processing reload!!!!!!!");
- }
- Collections.sort(recoveredACK);
-
- boolean first = true;
-
- PagePosition previousPos = null;
- for (PagePosition pos : recoveredACK)
- {
- PageCursorInfo positions = getPageInfo(pos);
- if (first)
- {
- first = false;
- if (pos.getMessageNr() > 0)
- {
- positions.confirmed.addAndGet(pos.getMessageNr());
- }
- }
-
- positions.addACK(pos);
-
- lastPosition = pos;
- if (previousPos != null)
- {
- if (!previousPos.isRightAfter(previousPos))
- {
- PagePosition tmpPos = previousPos;
- // looking for holes on the ack list for redelivery
- while (true)
- {
- Pair<PagePosition, PagedMessage> msgCheck =
cursorProvider.getNext(this, tmpPos);
-
- positions = getPageInfo(tmpPos);
-
- // end of the hole, we can finish processing here
- // It may be also that the next was just a next page, so we just
ignore it
- if (msgCheck == null || msgCheck.a.equals(pos))
- {
- break;
- }
- else
- {
- if (match(msgCheck.b.getMessage()))
- {
- redeliver(msgCheck.a);
- }
- else
- {
- // The reference was ignored. But we must take a count from
the reference count
- // otherwise the page will never be deleted hence we would
never leave paging even if
- // everything was consumed
- positions.confirmed.incrementAndGet();
- }
- }
- tmpPos = msgCheck.a;
- }
- }
- }
-
- previousPos = pos;
- }
-
- lastAckedPosition = lastPosition;
-
- recoveredACK.clear();
- recoveredACK = null;
- }
- }
-
- public void flushExecutors()
- {
- Future future = new Future();
- executor.execute(future);
- while (!future.await(1000))
- {
- PageCursorImpl.log.warn("Waiting page cursor to finish executors - " +
this);
- }
- }
-
- public void stop()
- {
- flushExecutors();
- }
-
- public void printDebug()
- {
- printDebug(toString());
- }
-
- public void printDebug(final String msg)
- {
- System.out.println("Debug information on PageCurorImpl- " + msg);
- for (PageCursorInfo info : consumedPages.values())
- {
- System.out.println(info);
- }
- }
-
- /**
- * @param page
- * @return
- */
- private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
- {
- PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
-
- if (pageInfo == null)
- {
- PageCache cache = cursorProvider.getPageCache(pos);
- pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(),
cache);
- consumedPages.put(pos.getPageNr(), pageInfo);
- }
-
- return pageInfo;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected boolean match(final ServerMessage message)
- {
- if (filter == null)
- {
- return true;
- }
- else
- {
- return filter.match(message);
- }
- }
-
- // Private -------------------------------------------------------
-
- // To be called only after the ACK has been processed and guaranteed to be on storae
- // The only exception is on non storage events such as not matching messages
- private void processACK(final PagePosition pos)
- {
- if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
- {
- if (lastAckedPosition != null && lastAckedPosition.getPageNr() !=
pos.getPageNr())
- {
- // there's a different page being acked, we will do the check right away
- if (autoCleanup)
- {
- scheduleCleanupCheck();
- }
- }
- lastAckedPosition = pos;
- }
- PageCursorInfo info = getPageInfo(pos);
-
- info.addACK(pos);
- }
-
- /**
- * @param tx
- * @param position
- */
- private void installTXCallback(final Transaction tx, final PagePosition position)
- {
- if (position.getRecordID() > 0)
- {
- // It needs to persist, otherwise the cursor will return to the fist page
position
- tx.setContainsPersistent();
- }
-
- PageCursorTX cursorTX =
(PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
-
- if (cursorTX == null)
- {
- cursorTX = new PageCursorTX();
- tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX);
- tx.addOperation(cursorTX);
- }
-
- cursorTX.addPositionConfirmation(this, position);
-
- }
-
- /**
- * A callback from the PageCursorInfo. It will be called when all the messages on a
page have been acked
- * @param info
- */
- private void onPageDone(final PageCursorInfo info)
- {
- if (autoCleanup)
- {
- scheduleCleanupCheck();
- }
- }
-
- public void scheduleCleanupCheck()
- {
- if (autoCleanup)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- try
- {
- cleanupEntries();
- }
- catch (Exception e)
- {
- PageCursorImpl.log.warn("Error on cleaning up cursor pages",
e);
- }
- }
- });
- }
- }
-
- /**
- * It will cleanup all the records for completed pages
- * */
- public void cleanupEntries() throws Exception
- {
- Transaction tx = new TransactionImpl(store);
-
- boolean persist = false;
-
- final ArrayList<PageCursorInfo> completedPages = new
ArrayList<PageCursorInfo>();
-
- // First get the completed pages using a lock
- synchronized (this)
- {
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
- {
- PageCursorInfo info = entry.getValue();
- if (info.isDone() && !info.isPendingDelete() &&
lastAckedPosition != null)
- {
- if (entry.getKey() == lastAckedPosition.getPageNr())
- {
- PageCursorImpl.trace("We can't clear page " +
entry.getKey() + " now since it's the current page");
- }
- else
- {
- info.setPendingDelete();
- completedPages.add(entry.getValue());
- }
- }
- }
- }
-
- for (int i = 0; i < completedPages.size(); i++)
- {
- PageCursorInfo info = completedPages.get(i);
-
- for (PagePosition pos : info.acks)
- {
- if (pos.getRecordID() > 0)
- {
- store.deleteCursorAcknowledgeTransactional(tx.getID(),
pos.getRecordID());
- if (!persist)
- {
- // only need to set it once
- tx.setContainsPersistent();
- persist = true;
- }
- }
- }
- }
-
- tx.addOperation(new TransactionOperationAbstract()
- {
-
- @Override
- public void afterCommit(final Transaction tx)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- synchronized (PageCursorImpl.this)
- {
- for (PageCursorInfo completePage : completedPages)
- {
- if (isTrace)
- {
- PageCursorImpl.trace("Removing page " +
completePage.getPageId());
- }
- if (consumedPages.remove(completePage.getPageId()) == null)
- {
- PageCursorImpl.log.warn("Couldn't remove page "
+ completePage.getPageId() +
- " from consumed pages on cursor
for address " +
- pageStore.getAddress());
- }
- }
- }
-
- cursorProvider.scheduleCleanup();
- }
- });
- }
- });
-
- tx.commit();
-
- }
-
- // Inner classes -------------------------------------------------
-
- private class PageCursorInfo
- {
- // Number of messages existent on this page
- private final int numberOfMessages;
-
- private final long pageId;
-
- // Confirmed ACKs on this page
- private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
-
- private WeakReference<PageCache> cache;
-
- // The page was live at the time of the creation
- private final boolean wasLive;
-
- // There's a pending delete on the async IO pipe
- // We're holding this object to avoid delete the pages before the IO is
complete,
- // however we can't delete these records again
- private boolean pendingDelete;
-
- // We need a separate counter as the cursor may be ignoring certain values because
of incomplete transactions or
- // expressions
- private final AtomicInteger confirmed = new AtomicInteger(0);
-
- @Override
- public String toString()
- {
- return "PageCursorInfo::PageID=" + pageId +
- " numberOfMessage = " +
- numberOfMessages +
- ", confirmed = " +
- confirmed;
- }
-
- public PageCursorInfo(final long pageId, final int numberOfMessages, final
PageCache cache)
- {
- this.pageId = pageId;
- this.numberOfMessages = numberOfMessages;
- wasLive = cache.isLive();
- if (wasLive)
- {
- this.cache = new WeakReference<PageCache>(cache);
- }
- }
-
- public boolean isDone()
- {
- return getNumberOfMessages() == confirmed.get();
- }
-
- public boolean isPendingDelete()
- {
- return pendingDelete;
- }
-
- public void setPendingDelete()
- {
- pendingDelete = true;
- }
-
- /**
- * @return the pageId
- */
- public long getPageId()
- {
- return pageId;
- }
-
- public void addACK(final PagePosition posACK)
- {
- if (posACK.getRecordID() > 0)
- {
- // We store these elements for later cleanup
- acks.add(posACK);
- }
-
- if (isTrace)
- {
- PageCursorImpl.trace("numberOfMessages = " + getNumberOfMessages()
+
- " confirmed = " +
- (confirmed.get() + 1) +
- ", page = " +
- pageId);
- }
-
- // Negative could mean a bookmark on the first element for the page (example
-1)
- if (posACK.getMessageNr() >= 0)
- {
- if (getNumberOfMessages() == confirmed.incrementAndGet())
- {
- onPageDone(this);
- }
- }
- }
-
- private int getNumberOfMessages()
- {
- if (wasLive)
- {
- PageCache cache = this.cache.get();
- if (cache != null)
- {
- return cache.getNumberOfMessages();
- }
- else
- {
- cache = cursorProvider.getPageCache(new PagePositionImpl(pageId, 0));
- this.cache = new WeakReference<PageCache>(cache);
- return cache.getNumberOfMessages();
- }
- }
- else
- {
- return numberOfMessages;
- }
- }
-
- }
-
- static class PageCursorTX extends TransactionOperationAbstract
- {
- HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new
HashMap<PageCursorImpl, List<PagePosition>>();
-
- public void addPositionConfirmation(final PageCursorImpl cursor, final PagePosition
position)
- {
- List<PagePosition> list = pendingPositions.get(cursor);
-
- if (list == null)
- {
- list = new LinkedList<PagePosition>();
- pendingPositions.put(cursor, list);
- }
-
- list.add(position);
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
- */
- @Override
- public void afterCommit(final Transaction tx)
- {
- for (Entry<PageCursorImpl, List<PagePosition>> entry :
pendingPositions.entrySet())
- {
- PageCursorImpl cursor = entry.getKey();
-
- List<PagePosition> positions = entry.getValue();
-
- for (PagePosition confirmed : positions)
- {
- cursor.processACK(confirmed);
- }
-
- }
- }
-
- }
-}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -29,7 +29,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
@@ -70,9 +70,9 @@
private Map<Long, PageCache> softCache = new SoftValueHashMap<Long,
PageCache>();
- private ConcurrentMap<Long, PageCursor> activeCursors = new
ConcurrentHashMap<Long, PageCursor>();
+ private ConcurrentMap<Long, PageSubscription> activeCursors = new
ConcurrentHashMap<Long, PageSubscription>();
- private ConcurrentSet<PageCursor> nonPersistentCursors = new
ConcurrentHashSet<PageCursor>();
+ private ConcurrentSet<PageSubscription> nonPersistentCursors = new
ConcurrentHashSet<PageSubscription>();
// Static --------------------------------------------------------
@@ -96,15 +96,15 @@
return pagingStore;
}
- public synchronized PageCursor createPersistentCursor(long cursorID, Filter filter)
+ public synchronized PageSubscription createPersistentSubscription(long cursorID,
Filter filter)
{
- PageCursor activeCursor = activeCursors.get(cursorID);
+ PageSubscription activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
{
throw new IllegalStateException("Cursor " + cursorID + " had
already been created");
}
- activeCursor = new PageCursorImpl(this,
+ activeCursor = new PageSubscriptionImpl(this,
pagingStore,
storageManager,
executorFactory.getExecutor(),
@@ -117,7 +117,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public synchronized PageCursor getPersistentCursor(long cursorID)
+ public synchronized PageSubscription getPersistentCursor(long cursorID)
{
return activeCursors.get(cursorID);
}
@@ -125,9 +125,9 @@
/**
* this will create a non-persistent cursor
*/
- public synchronized PageCursor createNonPersistentCursor(Filter filter)
+ public synchronized PageSubscription createNonPersistentSubscription(Filter filter)
{
- PageCursor cursor = new PageCursorImpl(this,
+ PageSubscription cursor = new PageSubscriptionImpl(this,
pagingStore,
storageManager,
executorFactory.getExecutor(),
@@ -140,7 +140,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor,
PagePosition cursorPos) throws Exception
+ public Pair<PagePosition, PagedMessage> getNext(final PageSubscription cursor,
PagePosition cursorPos) throws Exception
{
while (true)
@@ -260,7 +260,7 @@
public void processReload() throws Exception
{
- for (PageCursor cursor : this.activeCursors.values())
+ for (PageSubscription cursor : this.activeCursors.values())
{
cursor.processReload();
}
@@ -271,12 +271,12 @@
public void stop()
{
- for (PageCursor cursor : activeCursors.values())
+ for (PageSubscription cursor : activeCursors.values())
{
cursor.stop();
}
- for (PageCursor cursor : nonPersistentCursors)
+ for (PageSubscription cursor : nonPersistentCursors)
{
cursor.stop();
}
@@ -294,12 +294,12 @@
public void flushExecutors()
{
- for (PageCursor cursor : activeCursors.values())
+ for (PageSubscription cursor : activeCursors.values())
{
cursor.flushExecutors();
}
- for (PageCursor cursor : nonPersistentCursors)
+ for (PageSubscription cursor : nonPersistentCursors)
{
cursor.flushExecutors();
}
@@ -315,7 +315,7 @@
}
- public void close(PageCursor cursor)
+ public void close(PageSubscription cursor)
{
if (cursor.getId() != 0)
{
@@ -359,7 +359,7 @@
return;
}
- ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+ ArrayList<PageSubscription> cursorList = new
ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
cursorList.addAll(nonPersistentCursors);
@@ -369,7 +369,7 @@
{
boolean complete = true;
- for (PageCursor cursor : cursorList)
+ for (PageSubscription cursor : cursorList)
{
if (!cursor.isComplete(minPage))
{
@@ -389,7 +389,7 @@
try
{
// First step: Move every cursor to the next bookmarked page (that
was just created)
- for (PageCursor cursor : cursorList)
+ for (PageSubscription cursor : cursorList)
{
cursor.ack(new PagePositionImpl(currentPage.getPageId(), -1));
}
@@ -398,7 +398,7 @@
}
finally
{
- for (PageCursor cursor : cursorList)
+ for (PageSubscription cursor : cursorList)
{
cursor.enableAutoCleanup();
}
@@ -407,7 +407,7 @@
pagingStore.stopPaging();
// This has to be called after we stopped paging
- for (PageCursor cursor : cursorList)
+ for (PageSubscription cursor : cursorList)
{
cursor.scheduleCleanupCheck();
}
@@ -481,11 +481,11 @@
/**
* This method is synchronized because we want it to be atomic with the cursors being
used
*/
- private long checkMinPage(List<PageCursor> cursorList)
+ private long checkMinPage(List<PageSubscription> cursorList)
{
long minPage = Long.MAX_VALUE;
- for (PageCursor cursor : cursorList)
+ for (PageSubscription cursor : cursorList)
{
long firstPage = cursor.getFirstPage();
if (firstPage < minPage)
Copied:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
(from rev 9828,
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java)
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
(rev 0)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -0,0 +1,943 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListImpl;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A PageCursorImpl
+ *
+ * A page cursor will always store its
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ *
+ *
+ */
+public class PageSubscriptionImpl implements PageSubscription
+{
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageSubscriptionImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
+
+ private static void trace(final String message)
+ {
+ // PageCursorImpl.log.info(message);
+ System.out.println(message);
+ }
+
+ private volatile boolean autoCleanup = true;
+
+ private final StorageManager store;
+
+ private final long cursorId;
+
+ private final Filter filter;
+
+ private final PagingStore pageStore;
+
+ private final PageCursorProvider cursorProvider;
+
+ private final Executor executor;
+
+ private volatile PagePosition lastPosition;
+
+ private volatile PagePosition lastAckedPosition;
+
+ private List<PagePosition> recoveredACK;
+
+ private final SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+
+ // We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
+ private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new
LinkedListImpl<PagePosition>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageSubscriptionImpl(final PageCursorProvider cursorProvider,
+ final PagingStore pageStore,
+ final StorageManager store,
+ final Executor executor,
+ final Filter filter,
+ final long cursorId)
+ {
+ this.pageStore = pageStore;
+ this.store = store;
+ this.cursorProvider = cursorProvider;
+ this.cursorId = cursorId;
+ this.executor = executor;
+ this.filter = filter;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void disableAutoCleanup()
+ {
+ autoCleanup = false;
+ }
+
+ public void enableAutoCleanup()
+ {
+ autoCleanup = true;
+ }
+
+ public PageCursorProvider getProvider()
+ {
+ return cursorProvider;
+ }
+
+ public void bookmark(PagePosition position) throws Exception
+ {
+ if (lastPosition != null)
+ {
+ throw new RuntimeException("Bookmark can only be done at the time of the
cursor's creation");
+ }
+
+ lastPosition = position;
+
+ PageCursorInfo cursorInfo = getPageInfo(position);
+
+ if (position.getMessageNr() > 0)
+ {
+ cursorInfo.confirmed.addAndGet(position.getMessageNr());
+ }
+
+ ack(position);
+ }
+
+ class CursorIterator implements LinkedListIterator<Pair<PagePosition,
PagedMessage>>
+ {
+ PagePosition position = getLastPosition();
+
+ PagePosition lastOperation = null;
+
+ LinkedListIterator<PagePosition> redeliveryIterator =
redeliveries.iterator();
+
+ boolean isredelivery = false;
+
+ public void repeat()
+ {
+ if (isredelivery)
+ {
+ redeliveryIterator.repeat();
+ }
+ else
+ {
+ if (lastOperation == null)
+ {
+ position = getLastPosition();
+ }
+ else
+ {
+ position = lastOperation;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ public Pair<PagePosition, PagedMessage> next()
+ {
+ try
+ {
+ if (redeliveryIterator.hasNext())
+ {
+ isredelivery = true;
+ return getMessage(redeliveryIterator.next());
+ }
+ else
+ {
+ isredelivery = false;
+ }
+
+ Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+ if (nextPos != null)
+ {
+ position = nextPos.a;
+ }
+ return nextPos;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public boolean hasNext()
+ {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#remove()
+ */
+ public void remove()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.LinkedListIterator#close()
+ */
+ public void close()
+ {
+ }
+ }
+
+ private Pair<PagePosition, PagedMessage> getMessage(PagePosition pos) throws
Exception
+ {
+ return new Pair<PagePosition, PagedMessage>(pos,
cursorProvider.getMessage(pos));
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
+ */
+ public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+ {
+ return new CursorIterator();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
+ */
+ public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition
position) throws Exception
+ {
+ boolean match = false;
+
+ Pair<PagePosition, PagedMessage> message = null;
+
+ PagePosition tmpPosition = position;
+
+ do
+ {
+ message = cursorProvider.getNext(this, tmpPosition);
+
+ if (message != null)
+ {
+ tmpPosition = message.a;
+
+ match = match(message.b.getMessage());
+
+ if (!match)
+ {
+ processACK(message.a);
+ }
+ }
+
+ }
+ while (message != null && !match);
+
+ return message;
+ }
+
+ /**
+ *
+ */
+ private PagePosition getLastPosition()
+ {
+ if (lastPosition == null)
+ {
+ // it will start at the first available page
+ long firstPage = pageStore.getFirstPage();
+ lastPosition = new PagePositionImpl(firstPage, -1);
+ }
+
+ return lastPosition;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void ack(final PagePosition position) throws Exception
+ {
+
+ // if we are dealing with a persistent cursor
+ if (cursorId != 0)
+ {
+ store.storeCursorAcknowledge(cursorId, position);
+ }
+
+ store.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ processACK(position);
+ }
+ });
+ }
+
+ public void ackTx(final Transaction tx, final PagePosition position) throws Exception
+ {
+ // if the cursor is persistent
+ if (cursorId != 0)
+ {
+ store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+ }
+ installTXCallback(tx, position);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
+ */
+ public long getFirstPage()
+ {
+ if (consumedPages.isEmpty())
+ {
+ return 0;
+ }
+ else
+ {
+ return consumedPages.firstKey();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public synchronized void redeliver(final PagePosition position)
+ {
+ redeliveries.addTail(position);
+ }
+
+ /**
+ * Theres no need to synchronize this method as it's only called from journal load
on startup
+ */
+ public void reloadACK(final PagePosition position)
+ {
+ if (recoveredACK == null)
+ {
+ recoveredACK = new LinkedList<PagePosition>();
+ }
+
+ recoveredACK.add(position);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void reloadPreparedACK(final Transaction tx, final PagePosition position)
+ {
+ installTXCallback(tx, position);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void positionIgnored(final PagePosition position)
+ {
+ processACK(position);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
+ */
+ public boolean isComplete(long page)
+ {
+ PageCursorInfo info = consumedPages.get(page);
+ return info != null && info.isDone();
+ }
+
+ /**
+ * All the data associated with the cursor should go away here
+ */
+ public void close() throws Exception
+ {
+ final long tx = store.generateUniqueID();
+
+ final ArrayList<Exception> ex = new ArrayList<Exception>();
+
+ final AtomicBoolean isPersistent = new AtomicBoolean(false);
+
+ // We can't delete the records at the caller's thread
+ // because an executor may be holding the synchronized on PageCursorImpl
+ // what would lead to a dead lock
+ // so, we delete it inside the executor also
+ // and wait for the result
+ // The caller will be treating eventual IO exceptions and dispatching to the
original thread's caller
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ synchronized (PageSubscriptionImpl.this)
+ {
+ for (PageCursorInfo cursor : consumedPages.values())
+ {
+ for (PagePosition info : cursor.acks)
+ {
+ if (info.getRecordID() != 0)
+ {
+ isPersistent.set(true);
+ store.deleteCursorAcknowledgeTransactional(tx,
info.getRecordID());
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ ex.add(e);
+ PageSubscriptionImpl.log.warn(e.getMessage(), e);
+ }
+ }
+ });
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ while (!future.await(5000))
+ {
+ PageSubscriptionImpl.log.warn("Timeout on waiting cursor " + this +
" to be closed");
+ }
+
+ if (isPersistent.get())
+ {
+ // Another reason to perform the commit at the main thread is because the
OperationContext may only send the
+ // result to the client when
+ // the IO on commit is done
+ if (ex.size() == 0)
+ {
+ store.commit(tx);
+ }
+ else
+ {
+ store.rollback(tx);
+ throw ex.get(0);
+ }
+ }
+
+ cursorProvider.close(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#getId()
+ */
+ public long getId()
+ {
+ return cursorId;
+ }
+
+ public void processReload() throws Exception
+ {
+ if (recoveredACK != null)
+ {
+ if (isTrace)
+ {
+ PageSubscriptionImpl.trace("********** processing reload!!!!!!!");
+ }
+ Collections.sort(recoveredACK);
+
+ boolean first = true;
+
+ PagePosition previousPos = null;
+ for (PagePosition pos : recoveredACK)
+ {
+ PageCursorInfo positions = getPageInfo(pos);
+ if (first)
+ {
+ first = false;
+ if (pos.getMessageNr() > 0)
+ {
+ positions.confirmed.addAndGet(pos.getMessageNr());
+ }
+ }
+
+ positions.addACK(pos);
+
+ lastPosition = pos;
+ if (previousPos != null)
+ {
+ if (!previousPos.isRightAfter(previousPos))
+ {
+ PagePosition tmpPos = previousPos;
+ // looking for holes on the ack list for redelivery
+ while (true)
+ {
+ Pair<PagePosition, PagedMessage> msgCheck =
cursorProvider.getNext(this, tmpPos);
+
+ positions = getPageInfo(tmpPos);
+
+ // end of the hole, we can finish processing here
+ // It may be also that the next was just a next page, so we just
ignore it
+ if (msgCheck == null || msgCheck.a.equals(pos))
+ {
+ break;
+ }
+ else
+ {
+ if (match(msgCheck.b.getMessage()))
+ {
+ redeliver(msgCheck.a);
+ }
+ else
+ {
+ // The reference was ignored. But we must take a count from
the reference count
+ // otherwise the page will never be deleted hence we would
never leave paging even if
+ // everything was consumed
+ positions.confirmed.incrementAndGet();
+ }
+ }
+ tmpPos = msgCheck.a;
+ }
+ }
+ }
+
+ previousPos = pos;
+ }
+
+ lastAckedPosition = lastPosition;
+
+ recoveredACK.clear();
+ recoveredACK = null;
+ }
+ }
+
+ public void flushExecutors()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ while (!future.await(1000))
+ {
+ PageSubscriptionImpl.log.warn("Waiting page cursor to finish executors -
" + this);
+ }
+ }
+
+ public void stop()
+ {
+ flushExecutors();
+ }
+
+ public void printDebug()
+ {
+ printDebug(toString());
+ }
+
+ public void printDebug(final String msg)
+ {
+ System.out.println("Debug information on PageCurorImpl- " + msg);
+ for (PageCursorInfo info : consumedPages.values())
+ {
+ System.out.println(info);
+ }
+ }
+
+ /**
+ * @param page
+ * @return
+ */
+ private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
+ {
+ PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
+
+ if (pageInfo == null)
+ {
+ PageCache cache = cursorProvider.getPageCache(pos);
+ pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(),
cache);
+ consumedPages.put(pos.getPageNr(), pageInfo);
+ }
+
+ return pageInfo;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected boolean match(final ServerMessage message)
+ {
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ return filter.match(message);
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ // To be called only after the ACK has been processed and guaranteed to be on storae
+ // The only exception is on non storage events such as not matching messages
+ private void processACK(final PagePosition pos)
+ {
+ if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
+ {
+ if (lastAckedPosition != null && lastAckedPosition.getPageNr() !=
pos.getPageNr())
+ {
+ // there's a different page being acked, we will do the check right away
+ if (autoCleanup)
+ {
+ scheduleCleanupCheck();
+ }
+ }
+ lastAckedPosition = pos;
+ }
+ PageCursorInfo info = getPageInfo(pos);
+
+ info.addACK(pos);
+ }
+
+ /**
+ * @param tx
+ * @param position
+ */
+ private void installTXCallback(final Transaction tx, final PagePosition position)
+ {
+ if (position.getRecordID() > 0)
+ {
+ // It needs to persist, otherwise the cursor will return to the fist page
position
+ tx.setContainsPersistent();
+ }
+
+ PageCursorTX cursorTX =
(PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
+
+ if (cursorTX == null)
+ {
+ cursorTX = new PageCursorTX();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX);
+ tx.addOperation(cursorTX);
+ }
+
+ cursorTX.addPositionConfirmation(this, position);
+
+ }
+
+ /**
+ * A callback from the PageCursorInfo. It will be called when all the messages on a
page have been acked
+ * @param info
+ */
+ private void onPageDone(final PageCursorInfo info)
+ {
+ if (autoCleanup)
+ {
+ scheduleCleanupCheck();
+ }
+ }
+
+ public void scheduleCleanupCheck()
+ {
+ if (autoCleanup)
+ {
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ cleanupEntries();
+ }
+ catch (Exception e)
+ {
+ PageSubscriptionImpl.log.warn("Error on cleaning up cursor
pages", e);
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * It will cleanup all the records for completed pages
+ * */
+ public void cleanupEntries() throws Exception
+ {
+ Transaction tx = new TransactionImpl(store);
+
+ boolean persist = false;
+
+ final ArrayList<PageCursorInfo> completedPages = new
ArrayList<PageCursorInfo>();
+
+ // First get the completed pages using a lock
+ synchronized (this)
+ {
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+ {
+ PageCursorInfo info = entry.getValue();
+ if (info.isDone() && !info.isPendingDelete() &&
lastAckedPosition != null)
+ {
+ if (entry.getKey() == lastAckedPosition.getPageNr())
+ {
+ PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() + " now since it's the current page");
+ }
+ else
+ {
+ info.setPendingDelete();
+ completedPages.add(entry.getValue());
+ }
+ }
+ }
+ }
+
+ for (int i = 0; i < completedPages.size(); i++)
+ {
+ PageCursorInfo info = completedPages.get(i);
+
+ for (PagePosition pos : info.acks)
+ {
+ if (pos.getRecordID() > 0)
+ {
+ store.deleteCursorAcknowledgeTransactional(tx.getID(),
pos.getRecordID());
+ if (!persist)
+ {
+ // only need to set it once
+ tx.setContainsPersistent();
+ persist = true;
+ }
+ }
+ }
+ }
+
+ tx.addOperation(new TransactionOperationAbstract()
+ {
+
+ @Override
+ public void afterCommit(final Transaction tx)
+ {
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ synchronized (PageSubscriptionImpl.this)
+ {
+ for (PageCursorInfo completePage : completedPages)
+ {
+ if (isTrace)
+ {
+ PageSubscriptionImpl.trace("Removing page " +
completePage.getPageId());
+ }
+ if (consumedPages.remove(completePage.getPageId()) == null)
+ {
+ PageSubscriptionImpl.log.warn("Couldn't remove page
" + completePage.getPageId() +
+ " from consumed pages on cursor
for address " +
+ pageStore.getAddress());
+ }
+ }
+ }
+
+ cursorProvider.scheduleCleanup();
+ }
+ });
+ }
+ });
+
+ tx.commit();
+
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private class PageCursorInfo
+ {
+ // Number of messages existent on this page
+ private final int numberOfMessages;
+
+ private final long pageId;
+
+ // Confirmed ACKs on this page
+ private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
+
+ private WeakReference<PageCache> cache;
+
+ // The page was live at the time of the creation
+ private final boolean wasLive;
+
+ // There's a pending delete on the async IO pipe
+ // We're holding this object to avoid delete the pages before the IO is
complete,
+ // however we can't delete these records again
+ private boolean pendingDelete;
+
+ // We need a separate counter as the cursor may be ignoring certain values because
of incomplete transactions or
+ // expressions
+ private final AtomicInteger confirmed = new AtomicInteger(0);
+
+ @Override
+ public String toString()
+ {
+ return "PageCursorInfo::PageID=" + pageId +
+ " numberOfMessage = " +
+ numberOfMessages +
+ ", confirmed = " +
+ confirmed;
+ }
+
+ public PageCursorInfo(final long pageId, final int numberOfMessages, final
PageCache cache)
+ {
+ this.pageId = pageId;
+ this.numberOfMessages = numberOfMessages;
+ wasLive = cache.isLive();
+ if (wasLive)
+ {
+ this.cache = new WeakReference<PageCache>(cache);
+ }
+ }
+
+ public boolean isDone()
+ {
+ return getNumberOfMessages() == confirmed.get();
+ }
+
+ public boolean isPendingDelete()
+ {
+ return pendingDelete;
+ }
+
+ public void setPendingDelete()
+ {
+ pendingDelete = true;
+ }
+
+ /**
+ * @return the pageId
+ */
+ public long getPageId()
+ {
+ return pageId;
+ }
+
+ public void addACK(final PagePosition posACK)
+ {
+ if (posACK.getRecordID() > 0)
+ {
+ // We store these elements for later cleanup
+ acks.add(posACK);
+ }
+
+ if (isTrace)
+ {
+ PageSubscriptionImpl.trace("numberOfMessages = " +
getNumberOfMessages() +
+ " confirmed = " +
+ (confirmed.get() + 1) +
+ ", page = " +
+ pageId);
+ }
+
+ // Negative could mean a bookmark on the first element for the page (example
-1)
+ if (posACK.getMessageNr() >= 0)
+ {
+ if (getNumberOfMessages() == confirmed.incrementAndGet())
+ {
+ onPageDone(this);
+ }
+ }
+ }
+
+ private int getNumberOfMessages()
+ {
+ if (wasLive)
+ {
+ PageCache cache = this.cache.get();
+ if (cache != null)
+ {
+ return cache.getNumberOfMessages();
+ }
+ else
+ {
+ cache = cursorProvider.getPageCache(new PagePositionImpl(pageId, 0));
+ this.cache = new WeakReference<PageCache>(cache);
+ return cache.getNumberOfMessages();
+ }
+ }
+ else
+ {
+ return numberOfMessages;
+ }
+ }
+
+ }
+
+ static class PageCursorTX extends TransactionOperationAbstract
+ {
+ HashMap<PageSubscriptionImpl, List<PagePosition>> pendingPositions =
new HashMap<PageSubscriptionImpl, List<PagePosition>>();
+
+ public void addPositionConfirmation(final PageSubscriptionImpl cursor, final
PagePosition position)
+ {
+ List<PagePosition> list = pendingPositions.get(cursor);
+
+ if (list == null)
+ {
+ list = new LinkedList<PagePosition>();
+ pendingPositions.put(cursor, list);
+ }
+
+ list.add(position);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
+ @Override
+ public void afterCommit(final Transaction tx)
+ {
+ for (Entry<PageSubscriptionImpl, List<PagePosition>> entry :
pendingPositions.entrySet())
+ {
+ PageSubscriptionImpl cursor = entry.getKey();
+
+ List<PagePosition> positions = entry.getValue();
+
+ for (PagePosition confirmed : positions)
+ {
+ cursor.processACK(confirmed);
+ }
+
+ }
+ }
+
+ }
+}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -22,7 +22,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
@@ -52,7 +52,7 @@
private AtomicInteger numberOfMessages = new AtomicInteger(0);
- private List<Pair<PageCursor, PagePosition>> lateDeliveries;
+ private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
// Static --------------------------------------------------------
@@ -141,7 +141,7 @@
committed = true;
if (lateDeliveries != null)
{
- for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+ for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
{
pos.a.redeliver(pos.b);
}
@@ -210,7 +210,7 @@
if (lateDeliveries != null)
{
- for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+ for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
{
pos.a.positionIgnored(pos.b);
}
@@ -230,7 +230,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.PageTransactionInfo#deliverAfterCommit(org.hornetq.core.paging.cursor.PageCursor,
org.hornetq.core.paging.cursor.PagePosition)
*/
- public synchronized boolean deliverAfterCommit(PageCursor cursor, PagePosition
cursorPos)
+ public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition
cursorPos)
{
if (committed)
{
@@ -246,9 +246,9 @@
{
if (lateDeliveries == null)
{
- lateDeliveries = new LinkedList<Pair<PageCursor,
PagePosition>>();
+ lateDeliveries = new LinkedList<Pair<PageSubscription,
PagePosition>>();
}
- lateDeliveries.add(new Pair<PageCursor, PagePosition>(cursor,
cursorPos));
+ lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor,
cursorPos));
return true;
}
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -51,7 +51,7 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -1028,7 +1028,7 @@
{
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
- PageCursor cursor =
store.getCursorProvier().getPersistentCursor(encoding.queueID);
+ PageSubscription cursor =
store.getCursorProvier().getPersistentCursor(encoding.queueID);
cursor.reloadACK(encoding.position);
}
else
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -1214,7 +1214,7 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
-
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentCursor(queue.getID(),
filter);
+
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentSubscription(queue.getID(),
filter);
}
for (GroupingInfo groupingInfo : groupingInfos)
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
00:20:27 UTC (rev 9828)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
00:24:34 UTC (rev 9829)
@@ -29,10 +29,10 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -117,7 +117,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursor cursor = createNonPersistentCursor();
+ PageSubscription cursor = createNonPersistentCursor();
Pair<PagePosition, PagedMessage> msg;
@@ -157,7 +157,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursor cursorEven = createNonPersistentCursor(new Filter()
+ PageSubscription cursorEven = createNonPersistentCursor(new Filter()
{
public boolean match(ServerMessage message)
@@ -180,7 +180,7 @@
});
- PageCursor cursorOdd = createNonPersistentCursor(new Filter()
+ PageSubscription cursorOdd = createNonPersistentCursor(new Filter()
{
public boolean match(ServerMessage message)
@@ -273,10 +273,10 @@
// need to change this after some integration
// PageCursor cursor =
//
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- PageCursor cursor = this.server.getPagingManager()
+ PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentCursor(queue.getID(), null);
+ .createPersistentSubscription(queue.getID(), null);
PageCache firstPage = cursorProvider.getPageCache(new
PagePositionImpl(server.getPagingManager()
.getPageStore(ADDRESS)
@@ -370,10 +370,10 @@
// need to change this after some integration
// PageCursor cursor =
//
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- PageCursor cursor = this.server.getPagingManager()
+ PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentCursor(queue.getID(), null);
+ .createPersistentSubscription(queue.getID(), null);
System.out.println("Cursor: " + cursor);
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
@@ -436,10 +436,10 @@
// need to change this after some integration
// PageCursor cursor =
//
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- PageCursor cursor = this.server.getPagingManager()
+ PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentCursor(queue.getID(), null);
+ .createPersistentSubscription(queue.getID(), null);
System.out.println("Cursor: " + cursor);
@@ -514,10 +514,10 @@
// need to change this after some integration
// PageCursor cursor =
//
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- PageCursor cursor = this.server.getPagingManager()
+ PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentCursor(queue.getID(), null);
+ .createPersistentSubscription(queue.getID(), null);
System.out.println("Cursor: " + cursor);
@@ -684,10 +684,10 @@
// need to change this after some integration
// PageCursor cursor =
//
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- PageCursor cursor = this.server.getPagingManager()
+ PageSubscription cursor = this.server.getPagingManager()
.getPageStore(ADDRESS)
.getCursorProvier()
- .createPersistentCursor(queue.getID(), null);
+ .createPersistentSubscription(queue.getID(), null);
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -760,8 +760,8 @@
PageCursorProvider cursorProvider = lookupCursorProvider();
- PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
- PageCursorImpl cursor2 =
(PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
+ PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
+ PageSubscriptionImpl cursor2 =
(PageSubscriptionImpl)cursorProvider.createNonPersistentSubscription(null);
Pair<PagePosition, PagedMessage> msg;
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
@@ -832,7 +832,7 @@
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+ PageSubscription cursor = cursorProvider.createNonPersistentSubscription(null);
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -880,7 +880,7 @@
// need to change this after some integration
// PageCursor cursor =
//
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
- PageCursor cursor = cursorProvider.createPersistentCursor(queue.getID(), null);
+ PageSubscription cursor =
cursorProvider.createPersistentSubscription(queue.getID(), null);
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -1013,18 +1013,18 @@
* @return
* @throws Exception
*/
- private PageCursor createNonPersistentCursor() throws Exception
+ private PageSubscription createNonPersistentCursor() throws Exception
{
- return lookupCursorProvider().createNonPersistentCursor(null);
+ return lookupCursorProvider().createNonPersistentSubscription(null);
}
/**
* @return
* @throws Exception
*/
- private PageCursor createNonPersistentCursor(Filter filter) throws Exception
+ private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
{
- return lookupCursorProvider().createNonPersistentCursor(filter);
+ return lookupCursorProvider().createNonPersistentSubscription(filter);
}
/**