Author: clebert.suconic(a)jboss.com
Date: 2010-11-02 23:13:23 -0400 (Tue, 02 Nov 2010)
New Revision: 9834
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
renaming a few classes.. some renames.. etc
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-02
22:44:05 UTC (rev 9833)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-03
03:13:23 UTC (rev 9834)
@@ -53,7 +53,7 @@
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
- Pair<PagePosition, PagedMessage> getNext(PageSubscription cursor, PagePosition
pos) throws Exception;
+ PagedReferenceImpl getNext(PageSubscription cursor, PagePosition pos) throws
Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-02
22:44:05 UTC (rev 9833)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-03
03:13:23 UTC (rev 9834)
@@ -39,7 +39,7 @@
boolean isPersistent();
- public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+ public LinkedListIterator<PagedReferenceImpl> iterator();
// To be called when the cursor is closed for good. Most likely when the queue is
deleted
void close() throws Exception;
@@ -52,8 +52,14 @@
void enableAutoCleanup();
- void ack(PagePosition position) throws Exception;
+ void ack(PagedReference ref) throws Exception;
+ // for internal (cursor) classes
+ void ack(PagePosition ref) throws Exception;
+
+ void ackTx(Transaction tx, PagedReference position) throws Exception;
+
+ // for internal (cursor) classes
void ackTx(Transaction tx, PagePosition position) throws Exception;
/**
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java
(rev 0)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java 2010-11-03
03:13:23 UTC (rev 9834)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.server.MessageReference;
+
+/**
+ * A PagedReference
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface PagedReference extends MessageReference
+{
+ PagePosition getPosition();
+
+ PagedMessage getPagedMessage();
+}
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
(rev 0)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-03
03:13:23 UTC (rev 9834)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * A InternalReference
+ *
+ * @author clebert
+ *
+ *
+ */
+public class PagedReferenceImpl implements PagedReference
+{
+
+ private static final long serialVersionUID = -8640232251318264710L;
+
+ private PagePosition a;
+ private PagedMessage b;
+
+
+ public ServerMessage getMessage()
+ {
+ return b.getMessage();
+ }
+
+ public PagedMessage getPagedMessage()
+ {
+ return b;
+ }
+
+ public PagePosition getPosition()
+ {
+ return a;
+ }
+
+ public PagedReferenceImpl(PagePosition a, PagedMessage b)
+ {
+ this.a = a;
+ this.b = b;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
+ */
+ public MessageReference copy(Queue queue)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getScheduledDeliveryTime()
+ */
+ public long getScheduledDeliveryTime()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#setScheduledDeliveryTime(long)
+ */
+ public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getDeliveryCount()
+ */
+ public int getDeliveryCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#setDeliveryCount(int)
+ */
+ public void setDeliveryCount(int deliveryCount)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#incrementDeliveryCount()
+ */
+ public void incrementDeliveryCount()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#decrementDeliveryCount()
+ */
+ public void decrementDeliveryCount()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getQueue()
+ */
+ public Queue getQueue()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#handled()
+ */
+ public void handled()
+ {
+ // TODO Auto-generated method stub
+
+ }
+}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-02
22:44:05 UTC (rev 9833)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-03
03:13:23 UTC (rev 9834)
@@ -14,13 +14,11 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import org.hornetq.api.core.Pair;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
@@ -29,12 +27,11 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ConcurrentSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
@@ -124,12 +121,12 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public Pair<PagePosition, PagedMessage> getNext(final PageSubscription cursor,
PagePosition cursorPos) throws Exception
+ public PagedReferenceImpl getNext(final PageSubscription cursor, PagePosition
cursorPos) throws Exception
{
while (true)
{
- Pair<PagePosition, PagedMessage> retPos = internalGetNext(cursorPos);
+ PagedReferenceImpl retPos = internalGetNext(cursorPos);
if (retPos == null)
{
@@ -137,15 +134,15 @@
}
else if (retPos != null)
{
- cursorPos = retPos.a;
- if (retPos.b.getTransactionID() != 0)
+ cursorPos = retPos.getPosition();
+ if (retPos.getPagedMessage().getTransactionID() != 0)
{
- PageTransactionInfo tx =
pagingManager.getTransaction(retPos.b.getTransactionID());
+ PageTransactionInfo tx =
pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
if (tx == null)
{
- log.warn("Couldn't locate page transaction " +
retPos.b.getTransactionID() +
+ log.warn("Couldn't locate page transaction " +
retPos.getPagedMessage().getTransactionID() +
", ignoring message on position " +
- retPos.a);
+ retPos.getPosition());
cursor.positionIgnored(cursorPos);
}
else
@@ -164,7 +161,7 @@
}
}
- private Pair<PagePosition, PagedMessage> internalGetNext(final PagePosition
pos)
+ private PagedReferenceImpl internalGetNext(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -191,7 +188,7 @@
if (serverMessage != null)
{
- return new Pair<PagePosition, PagedMessage>(retPos,
cache.getMessage(retPos.getMessageNr()));
+ return new PagedReferenceImpl(retPos, cache.getMessage(retPos.getMessageNr()));
}
else
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
22:44:05 UTC (rev 9833)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-03
03:13:23 UTC (rev 9834)
@@ -17,29 +17,26 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.api.core.Pair;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
@@ -158,7 +155,7 @@
ack(position);
}
- class CursorIterator implements LinkedListIterator<Pair<PagePosition,
PagedMessage>>
+ class CursorIterator implements LinkedListIterator<PagedReferenceImpl>
{
PagePosition position = getLastPosition();
@@ -170,7 +167,7 @@
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
- Pair<PagePosition, PagedMessage> cachedNext;
+ PagedReferenceImpl cachedNext;
public void repeat()
{
@@ -194,12 +191,12 @@
/* (non-Javadoc)
* @see java.util.Iterator#next()
*/
- public Pair<PagePosition, PagedMessage> next()
+ public PagedReferenceImpl next()
{
if (cachedNext != null)
{
- Pair<PagePosition, PagedMessage> retPos = cachedNext;
+ PagedReferenceImpl retPos = cachedNext;
cachedNext = null;
return retPos;
}
@@ -215,10 +212,10 @@
isredelivery = false;
}
- Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+ PagedReferenceImpl nextPos = moveNext(position);
if (nextPos != null)
{
- position = nextPos.a;
+ position = nextPos.getPosition();
}
return nextPos;
}
@@ -257,15 +254,15 @@
}
}
- private Pair<PagePosition, PagedMessage> getMessage(PagePosition pos) throws
Exception
+ private PagedReferenceImpl getMessage(PagePosition pos) throws Exception
{
- return new Pair<PagePosition, PagedMessage>(pos,
cursorProvider.getMessage(pos));
+ return new PagedReferenceImpl(pos, cursorProvider.getMessage(pos));
}
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#iterator()
*/
- public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+ public LinkedListIterator<PagedReferenceImpl> iterator()
{
return new CursorIterator();
}
@@ -275,11 +272,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition
position) throws Exception
+ public synchronized PagedReferenceImpl moveNext(PagePosition position) throws
Exception
{
boolean match = false;
- Pair<PagePosition, PagedMessage> message = null;
+ PagedReferenceImpl message = null;
PagePosition tmpPosition = position;
@@ -294,25 +291,24 @@
}
else
{
- PageCursorInfo info = getPageInfo(message.a, false);
- if (info != null && info.isRemoved(message.a))
+ PageCursorInfo info = getPageInfo(message.getPosition(), false);
+ if (info != null && info.isRemoved(message.getPosition()))
{
- tmpPosition = message.a;
+ tmpPosition = message.getPosition();
valid = false;
}
}
if (valid)
{
- tmpPosition = message.a;
+ tmpPosition = message.getPosition();
- match = match(message.b.getMessage());
+ match = match(message.getMessage());
if (!match)
{
- processACK(message.a);
+ processACK(message.getPosition());
}
}
-
}
while (message != null && !match);
@@ -337,9 +333,13 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
+ public void ack(final PagedReference position) throws Exception
+ {
+ ack(position.getPosition());
+ }
+
public void ack(final PagePosition position) throws Exception
{
-
// if we are dealing with a persistent cursor
if (persistent)
{
@@ -371,6 +371,12 @@
}
+
+ public void ackTx(final Transaction tx, final PagedReference position) throws
Exception
+ {
+ ackTx(tx, position.getPosition());
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
*/
@@ -556,21 +562,21 @@
// looking for holes on the ack list for redelivery
while (true)
{
- Pair<PagePosition, PagedMessage> msgCheck =
cursorProvider.getNext(this, tmpPos);
+ PagedReferenceImpl msgCheck = cursorProvider.getNext(this, tmpPos);
positions = getPageInfo(tmpPos);
// end of the hole, we can finish processing here
// It may be also that the next was just a next page, so we just
ignore it
- if (msgCheck == null || msgCheck.a.equals(pos))
+ if (msgCheck == null || msgCheck.getPosition().equals(pos))
{
break;
}
else
{
- if (match(msgCheck.b.getMessage()))
+ if (match(msgCheck.getMessage()))
{
- redeliver(msgCheck.a);
+ redeliver(msgCheck.getPosition());
}
else
{
@@ -580,7 +586,7 @@
positions.confirmed.incrementAndGet();
}
}
- tmpPos = msgCheck.a;
+ tmpPos = msgCheck.getPosition();
}
}
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
22:44:05 UTC (rev 9833)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-03
03:13:23 UTC (rev 9834)
@@ -21,7 +21,6 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
@@ -30,12 +29,13 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -120,14 +120,14 @@
PageSubscription cursor =
lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
- Pair<PagePosition, PagedMessage> msg;
+ PagedReferenceImpl msg;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
int key = 0;
while ((msg = iterator.next()) != null)
{
- assertEquals(key++,
msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ assertEquals(key++,
msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg.getPosition());
}
assertEquals(NUM_MESSAGES, key);
@@ -205,30 +205,30 @@
queue.getPageSubscription().close();
- Pair<PagePosition, PagedMessage> msg;
+ PagedReferenceImpl msg;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven =
cursorEven.iterator();
+ LinkedListIterator<PagedReferenceImpl> iteratorEven = cursorEven.iterator();
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd =
cursorOdd.iterator();
+ LinkedListIterator<PagedReferenceImpl> iteratorOdd = cursorOdd.iterator();
int key = 0;
while ((msg = iteratorEven.next()) != null)
{
System.out.println("Received" + msg);
- assertEquals(key,
msg.b.getMessage().getIntProperty("key").intValue());
-
assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+
assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
key += 2;
- cursorEven.ack(msg.a);
+ cursorEven.ack(msg.getPosition());
}
assertEquals(NUM_MESSAGES, key);
key = 1;
while ((msg = iteratorOdd.next()) != null)
{
- assertEquals(key,
msg.b.getMessage().getIntProperty("key").intValue());
-
assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+
assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
key += 2;
- cursorOdd.ack(msg.a);
+ cursorOdd.ack(msg.getPosition());
}
assertEquals(NUM_MESSAGES + 1, key);
@@ -285,18 +285,18 @@
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < 1000; i++)
{
System.out.println("Reading Msg : " + i);
- Pair<PagePosition, PagedMessage> msg = iterator.next();
+ PagedReferenceImpl msg = iterator.next();
assertNotNull(msg);
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < firstPageSize)
{
- cursor.ack(msg.a);
+ cursor.ack(msg);
}
}
cursorProvider.printDebug();
@@ -319,11 +319,11 @@
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, PagedMessage> msg = iterator.next();
+ PagedReferenceImpl msg = iterator.next();
assertNotNull(msg);
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ cursor.ack(msg);
OperationContextImpl.getContext(null).waitCompletion();
@@ -361,14 +361,14 @@
.getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
- cursor.ack(msg.a);
+ cursor.ack(msg);
}
}
@@ -383,16 +383,16 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
}
server.stop();
@@ -422,15 +422,15 @@
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
- cursor.ackTx(tx, msg.a);
+ cursor.ackTx(tx, msg);
}
}
@@ -449,16 +449,16 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx, msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ackTx(tx, msg);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx, msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ackTx(tx, msg);
}
tx.commit();
@@ -490,7 +490,7 @@
System.out.println("Cursor: " + cursor);
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES; i++)
{
@@ -506,11 +506,11 @@
Assert.assertTrue(pageStore.page(msg));
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
assertNotNull(readMessage);
- assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i,
readMessage.getMessage().getIntProperty("key").intValue());
assertNull(iterator.next());
}
@@ -544,11 +544,11 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
assertNotNull(readMessage);
- assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i,
readMessage.getMessage().getIntProperty("key").intValue());
}
server.stop();
@@ -580,20 +580,20 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
assertNotNull(readMessage);
- cursor.ack(readMessage.a);
+ cursor.ack(readMessage);
- assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i,
readMessage.getMessage().getIntProperty("key").intValue());
}
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
- assertEquals(NUM_MESSAGES * 3,
readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(NUM_MESSAGES * 3,
readMessage.getMessage().getIntProperty("key").intValue());
- cursor.ack(readMessage.a);
+ cursor.ack(readMessage);
server.getStorageManager().waitOnOperations();
@@ -647,7 +647,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getSubscription(queue.getID());
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -676,10 +676,10 @@
// First consume what's already there without any tx as nothing was committed
for (int i = 300; i < 400; i++)
{
- Pair<PagePosition, PagedMessage> pos = iterator.next();
+ PagedReferenceImpl pos = iterator.next();
assertNotNull("Null at position " + i, pos);
- assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos.a);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
}
assertNull(iterator.next());
@@ -693,10 +693,10 @@
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
{
- Pair<PagePosition, PagedMessage> pos = iterator.next();
+ PagedReferenceImpl pos = iterator.next();
assertNotNull(pos);
- assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos.a);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
}
assertNull(iterator.next());
@@ -724,15 +724,15 @@
queue.getPageSubscription().close();
- Pair<PagePosition, PagedMessage> msg;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator2 =
cursor.iterator();
+ PagedReferenceImpl msg;
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator2 = cursor.iterator();
int key = 0;
while ((msg = iterator.next()) != null)
{
- assertEquals(key++,
msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ assertEquals(key++,
msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
}
assertEquals(NUM_MESSAGES, key);
@@ -741,7 +741,7 @@
for (int i = 0; i < 10; i++)
{
msg = iterator2.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
}
assertSame(cursor2.getProvider(), cursorProvider);
@@ -803,13 +803,13 @@
msg = null;
cache = null;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
- Pair<PagePosition, PagedMessage> msgCursor = null;
+ PagedReferenceImpl msgCursor = null;
while ((msgCursor = iterator.next()) != null)
{
- assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msgCursor.a);
+ assertEquals(key++,
msgCursor.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor);
}
assertEquals(NUM_MESSAGES, key);
@@ -848,12 +848,12 @@
cache = null;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
- Pair<PagePosition, PagedMessage> msgCursor = null;
+ PagedReferenceImpl msgCursor = null;
while ((msgCursor = iterator.next()) != null)
{
- assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(key++,
msgCursor.getMessage().getIntProperty("key").intValue());
}
assertEquals(NUM_MESSAGES, key);
@@ -869,8 +869,8 @@
iterator = cursor.iterator();
while ((msgCursor = iterator.next()) != null)
{
- assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msgCursor.a);
+ assertEquals(key++,
msgCursor.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor);
}
forceGC();
@@ -902,29 +902,29 @@
PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
- Iterator<Pair<PagePosition, PagedMessage>> iter = cursor.iterator();
+ Iterator<PagedReferenceImpl> iter = cursor.iterator();
- Iterator<Pair<PagePosition, PagedMessage>> iter2 = cursor.iterator();
+ Iterator<PagedReferenceImpl> iter2 = cursor.iterator();
assertTrue(iter.hasNext());
- Pair<PagePosition, PagedMessage> msg1 = iter.next();
+ PagedReferenceImpl msg1 = iter.next();
- Pair<PagePosition, PagedMessage> msg2 = iter2.next();
+ PagedReferenceImpl msg2 = iter2.next();
- assertEquals(tstProperty(msg1.b.getMessage()), tstProperty(msg2.b.getMessage()));
+ assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
- System.out.println("property = " + tstProperty(msg1.b.getMessage()));
+ System.out.println("property = " + tstProperty(msg1.getMessage()));
msg1 = iter.next();
- assertEquals(1, tstProperty(msg1.b.getMessage()));
+ assertEquals(1, tstProperty(msg1.getMessage()));
iter.remove();
msg2 = iter2.next();
- assertEquals(2, tstProperty(msg2.b.getMessage()));
+ assertEquals(2, tstProperty(msg2.getMessage()));
assertTrue(iter2.hasNext());