JBoss hornetq SVN: r9883 - branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-12 15:24:00 -0500 (Fri, 12 Nov 2010)
New Revision: 9883
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
adding new test
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-12 19:37:52 UTC (rev 9882)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-12 20:24:00 UTC (rev 9883)
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -621,6 +622,116 @@
}
+
+ public void testConsumeLivePageMultiThread() throws Exception
+ {
+ final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_TX = 100;
+
+ final int MSGS_TX = 100;
+
+ final int TOTAL_MSG = NUM_TX * MSGS_TX;
+
+ final int messageSize = 1024;
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ final StorageManager storage = this.server.getStorageManager();
+
+ final AtomicInteger exceptions = new AtomicInteger(0);
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ int count = 0;
+
+ for (int txCount = 0; txCount < NUM_TX; txCount++)
+ {
+
+ Transaction tx = null;
+
+ if (txCount % 2 == 0)
+ {
+ tx = new TransactionImpl(storage);
+ }
+
+ RoutingContext ctx = generateCTX(tx);
+
+ for (int i = 0 ; i < MSGS_TX; i++)
+ {
+ //System.out.println("Sending " + count);
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, count);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", count++);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+ }
+
+ if (tx != null)
+ {
+ tx.commit();
+ }
+
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ exceptions.incrementAndGet();
+ }
+ }
+ };
+
+ t1.start();
+
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ for (int i = 0 ; i < TOTAL_MSG; i++ )
+ {
+ assertEquals(0, exceptions.get());
+ PagedReference ref = null;
+ for (int repeat = 0 ; repeat < 5; repeat++)
+ {
+ ref = iterator.next();
+ if (ref == null)
+ {
+ Thread.sleep(1000);
+ }
+ else
+ {
+ break;
+ }
+ }
+ assertNotNull(ref);
+
+ ref.acknowledge();
+ assertNotNull(ref);
+
+ System.out.println("Consuming " + ref.getMessage().getIntProperty("key"));
+ //assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
+ }
+
+ assertEquals(0, exceptions.get());
+ }
+
private RoutingContextImpl generateCTX()
{
return generateCTX(null);
13 years, 6 months
JBoss hornetq SVN: r9882 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-12 14:37:52 -0500 (Fri, 12 Nov 2010)
New Revision: 9882
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
tweak on ordering
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12 16:56:31 UTC (rev 9881)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12 19:37:52 UTC (rev 9882)
@@ -296,7 +296,7 @@
PagePosition retPos = pos.nextMessage();
PageCache cache = cursorProvider.getPageCache(pos);
-
+
if (cache == null)
{
return null;
@@ -1014,40 +1014,19 @@
try
{
- synchronized (redeliveries)
- {
- if (redeliveryIterator.hasNext())
- {
- // There's a redelivery pending, we will get it out of that pool instead
- isredelivery = true;
- return getReference(redeliveryIterator.next());
- }
- else
- {
- isredelivery = false;
- }
- }
-
if (position == null)
{
position = getStartPosition();
}
- PagePosition previousPos = position;
- PagedReference nextPos = moveNext();
- if (nextPos != null)
- {
- lastOperation = previousPos;
- position = nextPos.getPosition();
- }
- return nextPos;
+ return moveNext();
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage(), e);
}
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
@@ -1057,18 +1036,32 @@
PagedReference message = null;
+ PagePosition lastPosition = position;
PagePosition tmpPosition = position;
do
{
- message = internalGetNext(tmpPosition);
-
+ synchronized (redeliveries)
+ {
+ if (redeliveryIterator.hasNext())
+ {
+ // There's a redelivery pending, we will get it out of that pool instead
+ isredelivery = true;
+ return getReference(redeliveryIterator.next());
+ }
+ else
+ {
+ isredelivery = false;
+ }
+ message = internalGetNext(tmpPosition);
+ }
+
if (message == null)
{
break;
}
-
+
tmpPosition = message.getPosition();
boolean valid = true;
@@ -1079,13 +1072,14 @@
// 1st... is it routed?
valid = routed(message.getPagedMessage());
- if (!valid) ignored = true;
+ if (!valid)
+ ignored = true;
// 2nd ... if TX, is it committed?
if (valid && message.getPagedMessage().getTransactionID() != 0)
{
PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
- .getTransactionID());
+ .getTransactionID());
if (tx == null)
{
log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
@@ -1108,7 +1102,8 @@
if (valid)
{
// We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
- // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
+ // Say you have a Browser that will only read the files... there's no need to control PageCursors is
+ // nothing
// is being changed. That's why the false is passed as a parameter here
PageCursorInfo info = getPageInfo(message.getPosition(), false);
if (info != null && info.isRemoved(message.getPosition()))
@@ -1116,7 +1111,7 @@
valid = false;
}
}
-
+
if (!ignored)
{
position = message.getPosition();
@@ -1138,10 +1133,14 @@
}
while (message != null && !match);
+ if (message != null)
+ {
+ lastOperation = lastPosition;
+ }
+
return message;
}
-
/** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
* It would be a rare race condition but I would prefer avoiding that scenario */
public synchronized boolean hasNext()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-12 16:56:31 UTC (rev 9881)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-12 19:37:52 UTC (rev 9882)
@@ -144,7 +144,6 @@
public synchronized void commit()
{
- committed = true;
if (lateDeliveries != null)
{
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
@@ -153,6 +152,7 @@
}
lateDeliveries.clear();
}
+ committed = true;
lateDeliveries = null;
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-12 16:56:31 UTC (rev 9881)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-12 19:37:52 UTC (rev 9882)
@@ -922,7 +922,7 @@
msg.putIntProperty("count", i);
producer.send(msg);
- if (i % 50 == 0 && i != 0)
+ if (i % 100 == 0 && i != 0)
{
sessionProducer.commit();
// Thread.sleep(500);
@@ -967,16 +967,16 @@
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = consumer.receive(500000);
+ ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
if (i > 0 && i % 10 == 0)
{
- // session.commit();
+ session.commit();
}
}
- // session.commit();
+ session.commit();
session.close();
13 years, 6 months
JBoss hornetq SVN: r9881 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-12 11:56:31 -0500 (Fri, 12 Nov 2010)
New Revision: 9881
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
fix TX Ordering on paging
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12 06:51:54 UTC (rev 9880)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12 16:56:31 UTC (rev 9881)
@@ -291,94 +291,6 @@
return new CursorIterator();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
- */
- public synchronized PagedReference moveNext(PagePosition position) throws Exception
- {
- boolean match = false;
-
- PagedReference message = null;
-
- PagePosition tmpPosition = position;
-
- do
- {
- message = internalGetNext(tmpPosition);
-
-
- if (message == null)
- {
- break;
- }
-
- tmpPosition = message.getPosition();
-
- boolean valid = true;
- boolean ignored = false;
-
- // Validate the scenarios where the message should be considered not valid even to be considered
-
- // 1st... is it routed?
-
- valid = routed(message.getPagedMessage());
- if (!valid) ignored = true;
-
- // 2nd ... if TX, is it committed?
- if (valid && message.getPagedMessage().getTransactionID() != 0)
- {
- PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
- .getTransactionID());
- if (tx == null)
- {
- log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
- ", ignoring message on position " +
- message.getPosition());
- valid = false;
- ignored = true;
- }
- else
- {
- if (tx.deliverAfterCommit(this, message.getPosition()))
- {
- valid = false;
- ignored = false;
- }
- }
- }
-
- // 3rd... was it previously removed?
- if (valid)
- {
- // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
- // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
- // is being changed. That's why the false is passed as a parameter here
- PageCursorInfo info = getPageInfo(message.getPosition(), false);
- if (info != null && info.isRemoved(message.getPosition()))
- {
- valid = false;
- }
- }
-
- if (valid)
- {
- match = match(message.getMessage());
-
- if (!match)
- {
- processACK(message.getPosition());
- }
- }
- else if (ignored)
- {
- positionIgnored(message.getPosition());
- }
- }
- while (message != null && !match);
-
- return message;
- }
-
private PagedReference internalGetNext(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -1121,10 +1033,11 @@
position = getStartPosition();
}
- PagedReference nextPos = moveNext(position);
+ PagePosition previousPos = position;
+ PagedReference nextPos = moveNext();
if (nextPos != null)
{
- lastOperation = position;
+ lastOperation = previousPos;
position = nextPos.getPosition();
}
return nextPos;
@@ -1134,7 +1047,101 @@
throw new RuntimeException(e.getMessage(), e);
}
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
+ */
+ public synchronized PagedReference moveNext() throws Exception
+ {
+ boolean match = false;
+ PagedReference message = null;
+
+ PagePosition tmpPosition = position;
+
+ do
+ {
+ message = internalGetNext(tmpPosition);
+
+
+ if (message == null)
+ {
+ break;
+ }
+
+ tmpPosition = message.getPosition();
+
+ boolean valid = true;
+ boolean ignored = false;
+
+ // Validate the scenarios where the message should be considered not valid even to be considered
+
+ // 1st... is it routed?
+
+ valid = routed(message.getPagedMessage());
+ if (!valid) ignored = true;
+
+ // 2nd ... if TX, is it committed?
+ if (valid && message.getPagedMessage().getTransactionID() != 0)
+ {
+ PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+ .getTransactionID());
+ if (tx == null)
+ {
+ log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
+ ", ignoring message on position " +
+ message.getPosition());
+ valid = false;
+ ignored = true;
+ }
+ else
+ {
+ if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition()))
+ {
+ valid = false;
+ ignored = false;
+ }
+ }
+ }
+
+ // 3rd... was it previously removed?
+ if (valid)
+ {
+ // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
+ // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
+ // is being changed. That's why the false is passed as a parameter here
+ PageCursorInfo info = getPageInfo(message.getPosition(), false);
+ if (info != null && info.isRemoved(message.getPosition()))
+ {
+ valid = false;
+ }
+ }
+
+ if (!ignored)
+ {
+ position = message.getPosition();
+ }
+
+ if (valid)
+ {
+ match = match(message.getMessage());
+
+ if (!match)
+ {
+ processACK(message.getPosition());
+ }
+ }
+ else if (ignored)
+ {
+ positionIgnored(message.getPosition());
+ }
+ }
+ while (message != null && !match);
+
+ return message;
+ }
+
+
/** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
* It would be a rare race condition but I would prefer avoiding that scenario */
public synchronized boolean hasNext()
13 years, 6 months
JBoss hornetq SVN: r9880 - branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-12 01:51:54 -0500 (Fri, 12 Nov 2010)
New Revision: 9880
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
Log:
added a convenient method for creating GZIPInputStream
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-12 05:23:35 UTC (rev 9879)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-12 06:51:54 UTC (rev 9880)
@@ -162,7 +162,7 @@
{
InputStream input = new HornetQBufferInputStream(bufferDelegate);
- dataInput = new DataInputStream(GZipUtil.pipeGZip(input, false, threadPool));
+ dataInput = new DataInputStream(GZipUtil.createUnZipInputStream(input));
}
catch (Exception e)
{
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-12 05:23:35 UTC (rev 9879)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-12 06:51:54 UTC (rev 9880)
@@ -179,6 +179,18 @@
throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
}
}
+
+ public static InputStream createUnZipInputStream(InputStream input) throws HornetQException
+ {
+ try
+ {
+ return new GZIPInputStream(input);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+ }
/*
* we keep a list of byte arrays. when writing, we start with the first array.
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java 2010-11-12 05:23:35 UTC (rev 9879)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java 2010-11-12 06:51:54 UTC (rev 9880)
@@ -61,7 +61,7 @@
}
else
{
- return bb.readByte();
+ return bb.readByte() & 0xFF;
}
}
13 years, 6 months
JBoss hornetq SVN: r9879 - branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-12 00:23:35 -0500 (Fri, 12 Nov 2010)
New Revision: 9879
Modified:
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
test
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-11-12 04:51:39 UTC (rev 9878)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-11-12 05:23:35 UTC (rev 9879)
@@ -2697,6 +2697,7 @@
server.start();
ClientSessionFactory sf = createFactory(isNetty());
+ sf.setCompressLargeMessages(true);
session = sf.createSession(false, false, false);
@@ -2714,21 +2715,22 @@
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ //System.out.print(msg1.getBodyBuffer().readByte() + " ");
+ //if (i % 100 == 0) System.out.println();
+ byte b = msg1.getBodyBuffer().readByte();
+ //System.out.println("Byte read: " + (char)b + " i " + i);
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
msg1.acknowledge();
session.commit();
- Assert.assertNotNull(msg1);
consumer.close();
- try
- {
- msg1.getBodyBuffer().readByte();
- Assert.fail("Exception was expected");
- }
- catch (Throwable ignored)
- {
- }
-
session.close();
validateNoFilesOnLargeDir();
13 years, 6 months
JBoss hornetq SVN: r9878 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-11 23:51:39 -0500 (Thu, 11 Nov 2010)
New Revision: 9878
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
moving method moveNext from PageCursor to PageSubscription
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-12 04:51:39 UTC (rev 9878)
@@ -54,8 +54,6 @@
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
- PagedReference getNext(PageSubscription cursor, PagePosition pos) throws Exception;
-
PagedMessage getMessage(PagePosition pos) throws Exception;
void processReload() throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-12 04:51:39 UTC (rev 9878)
@@ -22,7 +22,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
-import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
@@ -33,7 +32,6 @@
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
@@ -59,8 +57,6 @@
private final PagingStore pagingStore;
- private final PagingManager pagingManager;
-
private final StorageManager storageManager;
private final ExecutorFactory executorFactory;
@@ -80,7 +76,6 @@
final ExecutorFactory executorFactory)
{
this.pagingStore = pagingStore;
- this.pagingManager = pagingStore.getPagingManager();
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
@@ -120,104 +115,6 @@
return activeCursors.get(cursorID);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
- */
- public PagedReference getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
- {
-
- while (true)
- {
- PagedReference retPos = internalGetNext(cursorPos, cursor);
-
- if (retPos == null)
- {
- return null;
- }
- else if (retPos != null)
- {
- cursorPos = retPos.getPosition();
-
- if (!routed(retPos.getPagedMessage(), cursor))
- {
- cursor.positionIgnored(cursorPos);
- }
- else
- if (retPos.getPagedMessage().getTransactionID() != 0)
- {
- PageTransactionInfo tx = pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
- if (tx == null)
- {
- log.warn("Couldn't locate page transaction " + retPos.getPagedMessage().getTransactionID() +
- ", ignoring message on position " +
- retPos.getPosition());
- cursor.positionIgnored(cursorPos);
- }
- else
- {
- if (!tx.deliverAfterCommit(cursor, cursorPos))
- {
- return retPos;
- }
- }
- }
- else
- {
- return retPos;
- }
- }
- }
- }
-
- private boolean routed(PagedMessage message, PageSubscription subs)
- {
- long id = subs.getId();
-
- for (long qid : message.getQueueIDs())
- {
- if (qid == id)
- {
- return true;
- }
- }
- return false;
- }
-
- private PagedReference internalGetNext(final PagePosition pos, final PageSubscription sub)
- {
- PagePosition retPos = pos.nextMessage();
-
- PageCache cache = getPageCache(pos);
-
- if (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages())
- {
- retPos = pos.nextPage();
-
- cache = getPageCache(retPos);
-
- if (cache == null)
- {
- return null;
- }
-
- if (retPos.getMessageNr() >= cache.getNumberOfMessages())
- {
- return null;
- }
- }
-
- PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
-
- if (serverMessage != null)
- {
- return newReference(retPos, serverMessage, sub);
- }
- else
- {
- return null;
- }
- }
-
public PagedMessage getMessage(final PagePosition pos) throws Exception
{
PageCache cache = getPageCache(pos);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12 04:51:39 UTC (rev 9878)
@@ -79,9 +79,9 @@
private final StorageManager store;
private final long cursorId;
-
+
private Queue queue;
-
+
private final boolean persistent;
private final Filter filter;
@@ -106,12 +106,12 @@
// Constructors --------------------------------------------------
public PageSubscriptionImpl(final PageCursorProvider cursorProvider,
- final PagingStore pageStore,
- final StorageManager store,
- final Executor executor,
- final Filter filter,
- final long cursorId,
- final boolean persistent)
+ final PagingStore pageStore,
+ final StorageManager store,
+ final Executor executor,
+ final Filter filter,
+ final long cursorId,
+ final boolean persistent)
{
this.pageStore = pageStore;
this.store = store;
@@ -128,17 +128,17 @@
{
return queue;
}
-
+
public boolean isPaging()
{
return pageStore.isPaging();
}
-
+
public void setQueue(Queue queue)
{
this.queue = queue;
}
-
+
public void disableAutoCleanup()
{
autoCleanup = false;
@@ -209,7 +209,8 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
+ PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() +
+ " now since it's the current page");
}
else
{
@@ -261,8 +262,8 @@
if (consumedPages.remove(completePage.getPageId()) == null)
{
PageSubscriptionImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
- " from consumed pages on cursor for address " +
- pageStore.getAddress());
+ " from consumed pages on cursor for address " +
+ pageStore.getAddress());
}
}
}
@@ -303,30 +304,64 @@
do
{
- message = cursorProvider.getNext(this, tmpPosition);
+ message = internalGetNext(tmpPosition);
- boolean valid = true;
-
+
if (message == null)
{
- valid = false;
+ break;
}
- else
+
+ tmpPosition = message.getPosition();
+
+ boolean valid = true;
+ boolean ignored = false;
+
+ // Validate the scenarios where the message should be considered not valid even to be considered
+
+ // 1st... is it routed?
+
+ valid = routed(message.getPagedMessage());
+ if (!valid) ignored = true;
+
+ // 2nd ... if TX, is it committed?
+ if (valid && message.getPagedMessage().getTransactionID() != 0)
{
+ PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+ .getTransactionID());
+ if (tx == null)
+ {
+ log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
+ ", ignoring message on position " +
+ message.getPosition());
+ valid = false;
+ ignored = true;
+ }
+ else
+ {
+ if (tx.deliverAfterCommit(this, message.getPosition()))
+ {
+ valid = false;
+ ignored = false;
+ }
+ }
+ }
+
+ // 3rd... was it previously removed?
+ if (valid)
+ {
// We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
- // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
+ // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
// is being changed. That's why the false is passed as a parameter here
PageCursorInfo info = getPageInfo(message.getPosition(), false);
if (info != null && info.isRemoved(message.getPosition()))
{
- tmpPosition = message.getPosition();
valid = false;
}
}
+
if (valid)
{
- tmpPosition = message.getPosition();
-
match = match(message.getMessage());
if (!match)
@@ -334,12 +369,70 @@
processACK(message.getPosition());
}
}
+ else if (ignored)
+ {
+ positionIgnored(message.getPosition());
+ }
}
while (message != null && !match);
return message;
}
+ private PagedReference internalGetNext(final PagePosition pos)
+ {
+ PagePosition retPos = pos.nextMessage();
+
+ PageCache cache = cursorProvider.getPageCache(pos);
+
+ if (cache == null)
+ {
+ return null;
+ }
+
+ if (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages())
+ {
+ retPos = pos.nextPage();
+
+ cache = cursorProvider.getPageCache(retPos);
+
+ if (cache == null)
+ {
+ return null;
+ }
+
+ if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+ {
+ return null;
+ }
+ }
+
+ PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+
+ if (serverMessage != null)
+ {
+ return cursorProvider.newReference(retPos, serverMessage, this);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private boolean routed(PagedMessage message)
+ {
+ long id = getId();
+
+ for (long qid : message.getQueueIDs())
+ {
+ if (qid == id)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
*
*/
@@ -360,7 +453,7 @@
// The list is not ordered...
// This is only done at creation of the queue, so we just scan instead of keeping the list ordened
PagePosition retValue = null;
-
+
for (PagePosition pos : entry.getValue().acks)
{
System.out.println("Analizing " + pos);
@@ -369,9 +462,9 @@
retValue = pos;
}
}
-
+
System.out.println("Returning initial position " + retValue);
-
+
return retValue;
}
}
@@ -391,11 +484,10 @@
}
-
public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
{
ackTx(tx, reference.getPosition());
-
+
PageTransactionInfo txInfo = getPageTransaction(reference);
if (txInfo != null)
{
@@ -403,7 +495,6 @@
}
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -416,7 +507,7 @@
txInfo.storeUpdate(this.store, pageStore.getPagingManager());
}
}
-
+
public void ack(final PagePosition position) throws Exception
{
// if we are dealing with a persistent cursor
@@ -465,7 +556,6 @@
}
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageSubscription#queryMessage(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -481,7 +571,6 @@
}
}
-
/**
* Theres no need to synchronize this method as it's only called from journal load on startup
*/
@@ -602,7 +691,7 @@
{
return cursorId;
}
-
+
public boolean isPersistent()
{
return persistent;
@@ -619,7 +708,7 @@
Collections.sort(recoveredACK);
boolean first = true;
-
+
for (PagePosition pos : recoveredACK)
{
lastAckedPosition = pos;
@@ -669,7 +758,7 @@
System.out.println(info);
}
}
-
+
private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
{
return getPageInfo(pos, true);
@@ -756,7 +845,7 @@
cursorTX.addPositionConfirmation(this, position);
}
-
+
private PageTransactionInfo getPageTransaction(final PagedReference reference)
{
if (reference.getPagedMessage().getTransactionID() != 0)
@@ -780,6 +869,7 @@
scheduleCleanupCheck();
}
}
+
// Inner classes -------------------------------------------------
/**
@@ -798,7 +888,7 @@
private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
private WeakReference<PageCache> cache;
-
+
private Set<PagePosition> removedReferences = new ConcurrentHashSet<PagePosition>();
// The page was live at the time of the creation
@@ -856,12 +946,12 @@
{
return pageId;
}
-
+
public boolean isRemoved(final PagePosition pos)
{
return removedReferences.contains(pos);
}
-
+
public void remove(final PagePosition position)
{
removedReferences.add(position);
@@ -875,10 +965,10 @@
if (isTrace)
{
PageSubscriptionImpl.trace("numberOfMessages = " + getNumberOfMessages() +
- " confirmed = " +
- (confirmed.get() + 1) +
- ", page = " +
- pageId);
+ " confirmed = " +
+ (confirmed.get() + 1) +
+ ", page = " +
+ pageId);
}
// Negative could mean a bookmark on the first element for the page (example -1)
@@ -952,7 +1042,6 @@
}
}
}
-
class CursorIterator implements LinkedListIterator<PagedReference>
{
@@ -963,11 +1052,11 @@
private final LinkedListIterator<PagePosition> redeliveryIterator;
private volatile boolean isredelivery = false;
-
+
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
private volatile PagedReference cachedNext;
-
+
public CursorIterator()
{
synchronized (redeliveries)
@@ -975,7 +1064,6 @@
redeliveryIterator = redeliveries.iterator();
}
}
-
public void repeat()
{
@@ -1004,14 +1092,14 @@
*/
public synchronized PagedReference next()
{
-
+
if (cachedNext != null)
{
PagedReference retPos = cachedNext;
cachedNext = null;
return retPos;
}
-
+
try
{
synchronized (redeliveries)
@@ -1027,7 +1115,7 @@
isredelivery = false;
}
}
-
+
if (position == null)
{
position = getStartPosition();
@@ -1056,12 +1144,12 @@
{
return true;
}
-
+
if (!pageStore.isPaging())
{
return false;
}
-
+
cachedNext = next();
return cachedNext != null;
@@ -1083,5 +1171,4 @@
}
}
-
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-12 04:51:39 UTC (rev 9878)
@@ -25,11 +25,10 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.utils.DataConstants;
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-11 22:23:30 UTC (rev 9877)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-12 04:51:39 UTC (rev 9878)
@@ -26,7 +26,6 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -36,7 +35,6 @@
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
@@ -55,7 +53,7 @@
import org.hornetq.utils.LinkedListIterator;
/**
- * A PageCacheTest
+ * A PageCursorTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -726,10 +724,18 @@
}
assertNull(iterator.next());
+
+ server.getStorageManager().waitOnOperations();
server.stop();
createServer();
- waitCleanup();
+
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+ {
+ Thread.sleep(500);
+ }
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
13 years, 6 months
JBoss hornetq SVN: r9877 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-11 17:23:30 -0500 (Thu, 11 Nov 2010)
New Revision: 9877
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
tweaks
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11 17:28:05 UTC (rev 9876)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11 22:23:30 UTC (rev 9877)
@@ -176,6 +176,8 @@
tx.addOperation(pgtxUpdate);
}
+ tx.setContainsPersistent();
+
pgtxUpdate.addUpdate(this);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-11 17:28:05 UTC (rev 9876)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-11 22:23:30 UTC (rev 9877)
@@ -953,8 +953,6 @@
pagingManager.addTransaction(pgTX);
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
tx.addOperation(new FinishPageMessageOperation(pgTX));
-
- tx.setContainsPersistent();
}
pgTX.increment(listCtx.getNumberOfQueues());
@@ -1013,7 +1011,8 @@
{
if (!stored)
{
- storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ tx.setContainsPersistent();
+ pageTransaction.store(storageManager, pagingManager, tx);
stored = true;
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-11 17:28:05 UTC (rev 9876)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-11 22:23:30 UTC (rev 9877)
@@ -682,20 +682,15 @@
StorageManager storage = this.server.getStorageManager();
- PageTransactionInfoImpl pgtxRollback = new PageTransactionInfoImpl(storage.generateUniqueID());
- PageTransactionInfoImpl pgtxForgotten = new PageTransactionInfoImpl(storage.generateUniqueID());
- PageTransactionInfoImpl pgtxCommit = new PageTransactionInfoImpl(storage.generateUniqueID());
+ long pgtxRollback = storage.generateUniqueID();
+ long pgtxForgotten = storage.generateUniqueID();
+ long pgtxCommit = storage.generateUniqueID();
- System.out.println("Forgetting tx " + pgtxForgotten.getTransactionID());
-
- this.server.getPagingManager().addTransaction(pgtxRollback);
- this.server.getPagingManager().addTransaction(pgtxCommit);
-
- pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
+ Transaction txRollback = pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
- pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
+ Transaction txForgotten = pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
- pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
+ Transaction txCommit = pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
addMessages(300, NUM_MESSAGES, messageSize);
@@ -714,10 +709,12 @@
assertNull(iterator.next());
cursor.printDebug();
- pgtxRollback.rollback();
+
+ txCommit.commit();
- this.server.getPagingManager().removeTransaction(pgtxRollback.getTransactionID());
- pgtxCommit.commit();
+ txRollback.rollback();
+
+ storage.waitOnOperations();
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
@@ -761,11 +758,9 @@
StorageManager storage = this.server.getStorageManager();
- PageTransactionInfoImpl txLazy = new PageTransactionInfoImpl(storage.generateUniqueID());
-
- server.getPagingManager().addTransaction(txLazy);
+ long pgtxLazy = storage.generateUniqueID();
- pgMessages(storage, pageStore, txLazy, 0, NUM_MESSAGES, messageSize);
+ Transaction txLazy = pgMessages(storage, pageStore, pgtxLazy, 0, NUM_MESSAGES, messageSize);
addMessages(100, NUM_MESSAGES, messageSize);
@@ -783,6 +778,8 @@
assertNull(iterator.next());
txLazy.commit();
+
+ storage.waitOnOperations();
for (int i = 0; i < 100; i++)
{
@@ -1184,15 +1181,15 @@
* @param messageSize
* @throws Exception
*/
- private void pgMessages(StorageManager storage,
+ private Transaction pgMessages(StorageManager storage,
PagingStoreImpl pageStore,
- PageTransactionInfo pgParameter,
+ long pgParameter,
int start,
final int NUM_MESSAGES,
final int messageSize) throws Exception
{
- TransactionImpl txImpl = new TransactionImpl(pgParameter.getTransactionID(), null, null);
+ TransactionImpl txImpl = new TransactionImpl(pgParameter, null, storage);
RoutingContext ctx = generateCTX(txImpl);
@@ -1204,6 +1201,8 @@
msg.putIntProperty("key", i);
pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
}
+
+ return txImpl;
}
13 years, 6 months
JBoss hornetq SVN: r9876 - in branches/Branch_New_Paging/src/main/org/hornetq/core: paging/cursor/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-11 12:28:05 -0500 (Thu, 11 Nov 2010)
New Revision: 9876
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Log:
Scheduling on paging
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-11 17:28:05 UTC (rev 9876)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.cursor;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.LinkedListIterator;
@@ -111,4 +112,11 @@
void setQueue(Queue queue);
Queue getQueue();
+
+ /**
+ * To be used to requery the reference case the Garbage Collection removed it from the PagedReference as it's using WeakReferences
+ * @param pos
+ * @return
+ */
+ PagedMessage queryMessage(PagePosition pos);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-11 17:28:05 UTC (rev 9876)
@@ -13,6 +13,9 @@
package org.hornetq.core.paging.cursor;
+import java.lang.ref.WeakReference;
+
+import org.hornetq.api.core.Message;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -33,18 +36,32 @@
private final PagePosition position;
- private final PagedMessage message;
+ private WeakReference<PagedMessage> message;
+ private Long deliveryTime = null;
+
private final PageSubscription subscription;
public ServerMessage getMessage()
{
- return message.getMessage();
+ return getPagedMessage().getMessage();
}
- public PagedMessage getPagedMessage()
+ public synchronized PagedMessage getPagedMessage()
{
- return message;
+ PagedMessage returnMessage = message.get();
+
+ // We only keep a few references on the Queue from paging...
+ // Besides those references are SoftReferenced on page cache...
+ // So, this will unlikely be null,
+ // unless the Queue has stalled for some time after paging
+ if (returnMessage == null)
+ {
+ // reference is gone, we will reconstruct it
+ returnMessage = subscription.queryMessage(position);
+ message = new WeakReference<PagedMessage>(returnMessage);
+ }
+ return returnMessage;
}
public PagePosition getPosition()
@@ -55,7 +72,7 @@
public PagedReferenceImpl(final PagePosition position, final PagedMessage message, final PageSubscription subscription)
{
this.position = position;
- this.message = message;
+ this.message = new WeakReference<PagedMessage>(message);
this.subscription = subscription;
}
@@ -78,8 +95,19 @@
*/
public long getScheduledDeliveryTime()
{
- // TODO Auto-generated method stub
- return 0;
+ if (deliveryTime == null)
+ {
+ ServerMessage msg = getMessage();
+ if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+ {
+ deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+ }
+ else
+ {
+ deliveryTime = 0l;
+ }
+ }
+ return deliveryTime;
}
/* (non-Javadoc)
@@ -87,8 +115,7 @@
*/
public void setScheduledDeliveryTime(final long scheduledDeliveryTime)
{
- // TODO Auto-generated method stub
-
+ deliveryTime = scheduledDeliveryTime;
}
/* (non-Javadoc)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-11 17:28:05 UTC (rev 9876)
@@ -522,9 +522,10 @@
// the page stays locked until the entire reading is finished
if (needToRead)
{
+ Page page = null;
try
{
- Page page = pagingStore.createPage((int)pageId);
+ page = pagingStore.createPage((int)pageId);
page.open();
@@ -540,6 +541,16 @@
}
finally
{
+ try
+ {
+ if (page != null)
+ {
+ page.close();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ }
cache.unlock();
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11 17:28:05 UTC (rev 9876)
@@ -32,6 +32,7 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -464,6 +465,23 @@
}
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageSubscription#queryMessage(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public PagedMessage queryMessage(PagePosition pos)
+ {
+ try
+ {
+ return cursorProvider.getMessage(pos);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+
/**
* Theres no need to synchronize this method as it's only called from journal load on startup
*/
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-11 17:28:05 UTC (rev 9876)
@@ -874,49 +874,8 @@
if (store.page(message, context, entry.getValue()))
{
- if (tx != null)
- {
- PageDelivery delivery = (PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
- if (delivery == null)
- {
- delivery = new PageDelivery();
- tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
- tx.addOperation(delivery);
- }
-
- delivery.addQueues(entry.getValue().getDurableQueues());
- delivery.addQueues(entry.getValue().getNonDurableQueues());
- }
- else
- {
-
- List<Queue> durableQueues = entry.getValue().getDurableQueues();
- List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
-
- final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
-
- queues.addAll(durableQueues);
- queues.addAll(nonDurableQueues);
-
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
-
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- for (Queue queue : queues)
- {
- // in case of paging, we need to kick asynchronous delivery to try delivering
- queue.deliverAsync();
- }
- }
- });
- }
-
-
+ // We need to kick delivery so the Queues may check for the cursors case they are empty
+ schedulePageDelivery(tx, entry);
continue;
}
@@ -1023,6 +982,56 @@
}
/**
+ * This will kick a delivery async on the queue, so the queue may have a chance to depage messages
+ * @param tx
+ * @param entry
+ */
+ private void schedulePageDelivery(Transaction tx, Map.Entry<SimpleString, RouteContextList> entry)
+ {
+ if (tx != null)
+ {
+ PageDelivery delivery = (PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
+ if (delivery == null)
+ {
+ delivery = new PageDelivery();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
+ tx.addOperation(delivery);
+ }
+
+ delivery.addQueues(entry.getValue().getDurableQueues());
+ delivery.addQueues(entry.getValue().getNonDurableQueues());
+ }
+ else
+ {
+
+ List<Queue> durableQueues = entry.getValue().getDurableQueues();
+ List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
+
+ final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
+
+ queues.addAll(durableQueues);
+ queues.addAll(nonDurableQueues);
+
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ for (Queue queue : queues)
+ {
+ // in case of paging, we need to kick asynchronous delivery to try delivering
+ queue.deliverAsync();
+ }
+ }
+ });
+ }
+ }
+
+ /**
* @param refs
*/
private void addReferences(final List<MessageReference> refs, final boolean direct)
13 years, 6 months
JBoss hornetq SVN: r9875 - in branches/Branch_Large_Message_Compression/src/main/org/hornetq: utils and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-11 09:44:07 -0500 (Thu, 11 Nov 2010)
New Revision: 9875
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
Log:
Let the GZipPipe util class extend InputStream
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-11 12:32:35 UTC (rev 9874)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-11 14:44:07 UTC (rev 9875)
@@ -411,7 +411,8 @@
if (session.isCompressLargeMessages())
{
- input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+ //input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+ input = GZipUtil.createZipInputStream(inputStreamParameter);
}
while (!lastPacket)
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11 12:32:35 UTC (rev 9874)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11 14:44:07 UTC (rev 9875)
@@ -16,6 +16,7 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -166,6 +167,18 @@
}
});
}
+
+ public static InputStream createZipInputStream(InputStream input) throws HornetQException
+ {
+ try
+ {
+ return new GZipPipe(input, 1024);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+ }
/*
* we keep a list of byte arrays. when writing, we start with the first array.
@@ -249,15 +262,16 @@
}
return result;
}
-
}
- public static class GZipPipe
+ public static class GZipPipe extends InputStream
{
private InputStream input;
private byte[] readBuffer;
private GZIPOutputStream zipOut;
private DynamicOutputStream receiver;
+ private int readPointer;
+ private byte[] buffer;
public GZipPipe(InputStream raw, int size) throws IOException
{
@@ -265,10 +279,30 @@
readBuffer = new byte[size];
receiver = new DynamicOutputStream(size, 50);
zipOut = new GZIPOutputStream(receiver);
+ readPointer = 0;
+ buffer = read1();
}
- public byte[] read() throws IOException
+ public int read() throws IOException
{
+ if (buffer == null)
+ {
+ return -1;
+ }
+
+ int val = buffer[readPointer] & 0xFF;
+ readPointer++;
+ if (readPointer == buffer.length)
+ {
+ buffer = read1();
+ readPointer = 0;
+ }
+
+ return val;
+ }
+
+ public byte[] read1() throws IOException
+ {
byte[] result = receiver.getBuffer();
if (result == null)
{
@@ -327,24 +361,7 @@
System.out.println("----total output: " + counter);
*/
-
- FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
- FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.zip");
- GZipPipe pipe = new GZipPipe(input, 2048);
-
- byte[] buffer;
-
- buffer = pipe.read();
-
- while (buffer != null)
- {
- //System.out.println("buffer size: " + buffer.length);
- output.write(buffer);
- buffer = pipe.read();
- }
-
- output.close();
-
+ unzip();
/*
FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
FileOutputStream output = new FileOutputStream("/home/howard/tmp/output.zip");
@@ -366,5 +383,48 @@
System.out.println("done. time: " + (end - begin));
}
+
+ public static void zip() throws IOException
+ {
+ FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.zip");
+ GZipPipe pipe = new GZipPipe(input, 2048);
+
+ byte[] buffer = new byte[2048];
+
+ int n = pipe.read(buffer);
+
+ while (n != -1)
+ {
+ if (n > 0)
+ {
+ output.write(buffer, 0, n);
+ }
+ n = pipe.read(buffer);
+ }
+ output.close();
+ }
+
+ public static void unzip() throws IOException
+ {
+ FileInputStream input = new FileInputStream("/home/howard/tmp/myzip.zip");
+ FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.out");
+
+ GZIPInputStream zipIn = new GZIPInputStream(input);
+
+ byte[] buffer = new byte[1024];
+
+ int n = zipIn.read(buffer);
+
+ while (n > 0)
+ {
+ //System.out.println("buffer size: " + buffer.length);
+ output.write(buffer, 0, n);
+ n = zipIn.read(buffer);
+ }
+
+ output.close();
+ }
+
}
13 years, 6 months
JBoss hornetq SVN: r9874 - branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-11 07:32:35 -0500 (Thu, 11 Nov 2010)
New Revision: 9874
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
Log:
clean up
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11 12:03:18 UTC (rev 9873)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11 12:32:35 UTC (rev 9874)
@@ -16,7 +16,6 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -26,8 +25,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -183,7 +180,7 @@
private List<byte[]> writeBuffer;
private int bufferSize;
private int counter, index;
- private int readIndex, nextIndex;
+ private int readIndex;
private boolean closed;
public DynamicOutputStream(int size, int cache)
@@ -197,7 +194,6 @@
counter = 0;
index = 0;
readIndex = 0;
- nextIndex = 1;
closed = false;
}
@@ -206,8 +202,7 @@
writeBuffer.get(index)[counter++] = (byte)b;
if (counter == bufferSize)
{
- index = nextIndex;
- nextIndex++;
+ index++;
if (index == writeBuffer.size())
{
writeBuffer.add(new byte[bufferSize]);
@@ -224,8 +219,7 @@
/*
* logic:
* if index > readIndex, return readIndex, then readIndex++
- * if index == readIndex, then return zero length byte[]. adjust nextIndex; if closed, return the remaining.
- * if index < readIndex, then return readIndex, readIndex = 0
+ * if index == readIndex, then return zero length byte[]; if closed, return the remaining.
*
* if closed and no more data, returns null.
*/
@@ -253,11 +247,6 @@
}
}
}
- else if (index < readIndex)
- {
- result = writeBuffer.get(readIndex);
- readIndex = 0;
- }
return result;
}
13 years, 6 months