JBoss hornetq SVN: r9873 - in branches/Branch_Large_Message_Compression: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-11 07:03:18 -0500 (Thu, 11 Nov 2010)
New Revision: 9873
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
a working compression prototype (non-thread)
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11 02:23:43 UTC (rev 9872)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11 12:03:18 UTC (rev 9873)
@@ -15,12 +15,19 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -99,7 +106,7 @@
}
while ((size = input.read(readBytes)) > 0)
{
- System.out.println("Read " + size + " bytes on compressing thread");
+// System.out.println("Read " + size + " bytes on compressing thread");
out.write(readBytes, 0, size);
}
System.out.println("Finished compressing");
@@ -163,5 +170,212 @@
});
}
+ /*
+ * we keep a list of byte arrays. when writing, we start with the first array.
+ * when getBuffer() is called, the returned value is subject to the following rules:
+ *
+ * 1. if not closed, return the last full array. then update flags and pointers.
+ * 2. if closed, return all the remaining.
+ */
+ public static class DynamicOutputStream extends OutputStream
+ {
+ private List<byte[]> writeBuffer;
+ private int bufferSize;
+ private int counter, index;
+ private int readIndex, nextIndex;
+ private boolean closed;
+
+ public DynamicOutputStream(int size, int cache)
+ {
+ bufferSize = size;
+ writeBuffer = new ArrayList<byte[]>(cache);
+ for (int i = 0; i < cache; i++)
+ {
+ writeBuffer.add(new byte[size]);
+ }
+ counter = 0;
+ index = 0;
+ readIndex = 0;
+ nextIndex = 1;
+ closed = false;
+ }
+
+ public void write(int b) throws IOException
+ {
+ writeBuffer.get(index)[counter++] = (byte)b;
+ if (counter == bufferSize)
+ {
+ index = nextIndex;
+ nextIndex++;
+ if (index == writeBuffer.size())
+ {
+ writeBuffer.add(new byte[bufferSize]);
+ }
+ counter = 0;
+ }
+ }
+
+ public void close() throws IOException
+ {
+ closed = true;
+ }
+
+ /*
+ * logic:
+ * if index > readIndex, return readIndex, then readIndex++
+ * if index == readIndex, then return zero length byte[]. adjust nextIndex; if closed, return the remaining.
+ * if index < readIndex, then return readIndex, readIndex = 0
+ *
+ * if closed and no more data, returns null.
+ */
+ public byte[] getBuffer()
+ {
+ byte[] result = new byte[0];
+ if (index > readIndex)
+ {
+ result = writeBuffer.get(readIndex);
+ readIndex++;
+ }
+ else if (index == readIndex)
+ {
+ if (closed)
+ {
+ if (counter == 0)
+ {
+ result = null;
+ }
+ else
+ {
+ result = new byte[counter];
+ System.arraycopy(writeBuffer.get(index), 0, result, 0, result.length);
+ counter = 0;
+ }
+ }
+ }
+ else if (index < readIndex)
+ {
+ result = writeBuffer.get(readIndex);
+ readIndex = 0;
+ }
+ return result;
+ }
+
+ }
+
+ public static class GZipPipe
+ {
+ private InputStream input;
+ private byte[] readBuffer;
+ private GZIPOutputStream zipOut;
+ private DynamicOutputStream receiver;
+
+ public GZipPipe(InputStream raw, int size) throws IOException
+ {
+ input = raw;
+ readBuffer = new byte[size];
+ receiver = new DynamicOutputStream(size, 50);
+ zipOut = new GZIPOutputStream(receiver);
+ }
+
+ public byte[] read() throws IOException
+ {
+ byte[] result = receiver.getBuffer();
+ if (result == null)
+ {
+ return null;
+ }
+ else if (result.length > 0)
+ {
+ return result;
+ }
+
+ int n = input.read(readBuffer);
+ while (true)
+ {
+ if (n > 0)
+ {
+ zipOut.write(readBuffer, 0, n);
+ result = receiver.getBuffer();
+ if ((result != null) && (result.length > 0))
+ {
+ break;
+ }
+ n = input.read(readBuffer);
+ }
+ else
+ {
+ zipOut.close();
+ result = receiver.getBuffer();
+ break;
+ }
+ }
+ return result;
+ }
+ }
+
+ public static void main(String[] args) throws HornetQException, IOException
+ {
+ long begin = System.currentTimeMillis();
+/*
+ FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new FileOutputStream("/home/howard/tmp/output3.zip");
+ GZIPOutputStream zipOut = new GZIPOutputStream(output);
+
+ byte[] buffer = new byte[1024];
+
+ int n = input.read(buffer);
+
+ int counter = 0;
+
+ while (n > 0)
+ {
+ zipOut.write(buffer, 0, n);
+ counter += n;
+ n = input.read(buffer);
+ }
+ zipOut.close();
+
+ System.out.println("----total output: " + counter);
+*/
+
+ FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.zip");
+ GZipPipe pipe = new GZipPipe(input, 2048);
+
+ byte[] buffer;
+
+ buffer = pipe.read();
+
+ while (buffer != null)
+ {
+ //System.out.println("buffer size: " + buffer.length);
+ output.write(buffer);
+ buffer = pipe.read();
+ }
+
+ output.close();
+
+/*
+ FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new FileOutputStream("/home/howard/tmp/output.zip");
+ ExecutorService service = Executors.newCachedThreadPool();
+ InputStream result = GZipUtil.pipeGZip(input, true, service);
+
+ byte[] buffer = new byte[2048];
+ int n = result.read(buffer);
+ System.out.println("got first data");
+
+ while (n > 0)
+ {
+ output.write(buffer);
+ n = result.read(buffer);
+ }
+*/
+ long end = System.currentTimeMillis();
+
+
+ System.out.println("done. time: " + (end - begin));
+ }
+
}
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-11-11 02:23:43 UTC (rev 9872)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-11-11 12:03:18 UTC (rev 9873)
@@ -2684,6 +2684,75 @@
}
}
+ public void testLargeMessageCompression() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ msg1.acknowledge();
+ session.commit();
+ Assert.assertNotNull(msg1);
+
+ consumer.close();
+
+ try
+ {
+ msg1.getBodyBuffer().readByte();
+ Assert.fail("Exception was expected");
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
14 years, 1 month
JBoss hornetq SVN: r9872 - branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 21:23:43 -0500 (Wed, 10 Nov 2010)
New Revision: 9872
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
adding new test
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11 02:11:52 UTC (rev 9871)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11 02:23:43 UTC (rev 9872)
@@ -113,6 +113,16 @@
public void testWithDiverts() throws Exception
{
+ internalMultiQueuesTest(true);
+ }
+
+ public void testWithMultiQueues() throws Exception
+ {
+ internalMultiQueuesTest(false);
+ }
+
+ public void internalMultiQueuesTest(final boolean divert) throws Exception
+ {
clearData();
Configuration config = createDefaultConfig();
@@ -123,28 +133,31 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
- DivertConfiguration divert1 = new DivertConfiguration("dv1",
- "nm1",
- PagingTest.ADDRESS.toString(),
- PagingTest.ADDRESS.toString() + "-1",
- true,
- null,
- null);
+ if (divert)
+ {
+ DivertConfiguration divert1 = new DivertConfiguration("dv1",
+ "nm1",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-1",
+ true,
+ null,
+ null);
+
+ DivertConfiguration divert2 = new DivertConfiguration("dv2",
+ "nm2",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-2",
+ true,
+ null,
+ null);
+
+ ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+ divertList.add(divert1);
+ divertList.add(divert2);
+
+ config.setDivertConfigurations(divertList);
+ }
- DivertConfiguration divert2 = new DivertConfiguration("dv2",
- "nm2",
- PagingTest.ADDRESS.toString(),
- PagingTest.ADDRESS.toString() + "-2",
- true,
- null,
- null);
-
- ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
- divertList.add(divert1);
- divertList.add(divert2);
-
- config.setDivertConfigurations(divertList);
-
server.start();
final int messageSize = 1024;
@@ -171,10 +184,19 @@
ClientSession session = sf.createSession(false, false, false);
- session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS + "-1", null, true);
+ if (divert)
+ {
+ session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS + "-1", null, true);
- session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS + "-2", null, true);
+ session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS + "-2", null, true);
+ }
+ else
+ {
+ session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-1", null, true);
+ session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, true);
+ }
+
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
14 years, 1 month
JBoss hornetq SVN: r9871 - branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 21:11:52 -0500 (Wed, 10 Nov 2010)
New Revision: 9871
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
small tweak
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11 02:09:12 UTC (rev 9870)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11 02:11:52 UTC (rev 9871)
@@ -51,7 +51,6 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
14 years, 1 month
JBoss hornetq SVN: r9870 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 9 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 21:09:12 -0500 (Wed, 10 Nov 2010)
New Revision: 9870
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
update page transactions
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -42,12 +42,16 @@
void store(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
- void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
+ void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
+
+ void storeUpdate(StorageManager storageManager, PagingManager pagingManager) throws Exception;
// To be used after the update was stored or reload
void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
void increment();
+
+ void increment(int size);
int getNumberOfMessages();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -31,6 +31,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -164,133 +165,115 @@
ack(position);
}
- class CursorIterator implements LinkedListIterator<PagedReference>
+ public void scheduleCleanupCheck()
{
- private PagePosition position = null;
-
- private PagePosition lastOperation = null;
-
- private final LinkedListIterator<PagePosition> redeliveryIterator;
-
- private volatile boolean isredelivery = false;
-
- /** next element taken on hasNext test.
- * it has to be delivered on next next operation */
- private volatile PagedReference cachedNext;
-
- public CursorIterator()
+ if (autoCleanup)
{
- synchronized (redeliveries)
+ executor.execute(new Runnable()
{
- redeliveryIterator = redeliveries.iterator();
- }
- }
-
- public void repeat()
- {
- if (isredelivery)
- {
- synchronized (redeliveries)
+ public void run()
{
- redeliveryIterator.repeat();
+ try
+ {
+ cleanupEntries();
+ }
+ catch (Exception e)
+ {
+ PageSubscriptionImpl.log.warn("Error on cleaning up cursor pages", e);
+ }
}
- }
- else
- {
- if (lastOperation == null)
- {
- position = null;
- }
- else
- {
- position = lastOperation;
- }
- }
+ });
}
+ }
- /* (non-Javadoc)
- * @see java.util.Iterator#next()
- */
- public synchronized PagedReference next()
+ /**
+ * It will cleanup all the records for completed pages
+ * */
+ public void cleanupEntries() throws Exception
+ {
+ Transaction tx = new TransactionImpl(store);
+
+ boolean persist = false;
+
+ final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
+
+ // First get the completed pages using a lock
+ synchronized (this)
{
-
- if (cachedNext != null)
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
- PagedReference retPos = cachedNext;
- cachedNext = null;
- return retPos;
- }
-
- try
- {
- synchronized (redeliveries)
+ PageCursorInfo info = entry.getValue();
+ if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
{
- if (redeliveryIterator.hasNext())
+ if (entry.getKey() == lastAckedPosition.getPageNr())
{
- // There's a redelivery pending, we will get it out of that pool instead
- isredelivery = true;
- return getReference(redeliveryIterator.next());
+ PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
}
else
{
- isredelivery = false;
+ info.setPendingDelete();
+ completedPages.add(entry.getValue());
}
}
-
- if (position == null)
- {
- position = getStartPosition();
- }
+ }
+ }
- PagedReference nextPos = moveNext(position);
- if (nextPos != null)
+ for (int i = 0; i < completedPages.size(); i++)
+ {
+ PageCursorInfo info = completedPages.get(i);
+
+ for (PagePosition pos : info.acks)
+ {
+ if (pos.getRecordID() > 0)
{
- lastOperation = position;
- position = nextPos.getPosition();
+ store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+ if (!persist)
+ {
+ // only need to set it once
+ tx.setContainsPersistent();
+ persist = true;
+ }
}
- return nextPos;
}
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
}
- /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
- * It would be a rare race condition but I would prefer avoiding that scenario */
- public synchronized boolean hasNext()
+ tx.addOperation(new TransactionOperationAbstract()
{
- // if an unbehaved program called hasNext twice before next, we only cache it once.
- if (cachedNext != null)
+
+ @Override
+ public void afterCommit(final Transaction tx)
{
- return true;
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ synchronized (PageSubscriptionImpl.this)
+ {
+ for (PageCursorInfo completePage : completedPages)
+ {
+ if (isTrace)
+ {
+ PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
+ }
+ if (consumedPages.remove(completePage.getPageId()) == null)
+ {
+ PageSubscriptionImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
+ " from consumed pages on cursor for address " +
+ pageStore.getAddress());
+ }
+ }
+ }
+
+ cursorProvider.scheduleCleanup();
+ }
+ });
}
-
- if (!pageStore.isPaging())
- {
- return false;
- }
-
- cachedNext = next();
+ });
- return cachedNext != null;
- }
+ tx.commit();
- /* (non-Javadoc)
- * @see java.util.Iterator#remove()
- */
- public void remove()
- {
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.utils.LinkedListIterator#close()
- */
- public void close()
- {
- }
}
private PagedReference getReference(PagePosition pos) throws Exception
@@ -396,12 +379,41 @@
return new PagePositionImpl(pageStore.getFirstPage(), -1);
}
+ public void ackTx(final Transaction tx, final PagePosition position) throws Exception
+ {
+ // if the cursor is persistent
+ if (persistent)
+ {
+ store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+ }
+ installTXCallback(tx, position);
+
+ }
+
+
+ public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
+ {
+ ackTx(tx, reference.getPosition());
+
+ PageTransactionInfo txInfo = getPageTransaction(reference);
+ if (txInfo != null)
+ {
+ txInfo.storeUpdate(store, pageStore.getPagingManager(), tx);
+ }
+ }
+
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void ack(final PagedReference position) throws Exception
+ public void ack(final PagedReference reference) throws Exception
{
- ack(position.getPosition());
+ ack(reference.getPosition());
+ PageTransactionInfo txInfo = getPageTransaction(reference);
+ if (txInfo != null)
+ {
+ txInfo.storeUpdate(this.store, pageStore.getPagingManager());
+ }
}
public void ack(final PagePosition position) throws Exception
@@ -426,23 +438,6 @@
});
}
- public void ackTx(final Transaction tx, final PagePosition position) throws Exception
- {
- // if the cursor is persistent
- if (persistent)
- {
- store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
- }
- installTXCallback(tx, position);
-
- }
-
-
- public void ackTx(final Transaction tx, final PagedReference position) throws Exception
- {
- ackTx(tx, position.getPosition());
- }
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
*/
@@ -743,6 +738,18 @@
cursorTX.addPositionConfirmation(this, position);
}
+
+ private PageTransactionInfo getPageTransaction(final PagedReference reference)
+ {
+ if (reference.getPagedMessage().getTransactionID() != 0)
+ {
+ return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
+ }
+ else
+ {
+ return null;
+ }
+ }
/**
* A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
@@ -755,118 +762,6 @@
scheduleCleanupCheck();
}
}
-
- public void scheduleCleanupCheck()
- {
- if (autoCleanup)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- try
- {
- cleanupEntries();
- }
- catch (Exception e)
- {
- PageSubscriptionImpl.log.warn("Error on cleaning up cursor pages", e);
- }
- }
- });
- }
- }
-
- /**
- * It will cleanup all the records for completed pages
- * */
- public void cleanupEntries() throws Exception
- {
- Transaction tx = new TransactionImpl(store);
-
- boolean persist = false;
-
- final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
-
- // First get the completed pages using a lock
- synchronized (this)
- {
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
- {
- PageCursorInfo info = entry.getValue();
- if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
- {
- if (entry.getKey() == lastAckedPosition.getPageNr())
- {
- PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
- }
- else
- {
- info.setPendingDelete();
- completedPages.add(entry.getValue());
- }
- }
- }
- }
-
- for (int i = 0; i < completedPages.size(); i++)
- {
- PageCursorInfo info = completedPages.get(i);
-
- for (PagePosition pos : info.acks)
- {
- if (pos.getRecordID() > 0)
- {
- store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
- if (!persist)
- {
- // only need to set it once
- tx.setContainsPersistent();
- persist = true;
- }
- }
- }
- }
-
- tx.addOperation(new TransactionOperationAbstract()
- {
-
- @Override
- public void afterCommit(final Transaction tx)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- synchronized (PageSubscriptionImpl.this)
- {
- for (PageCursorInfo completePage : completedPages)
- {
- if (isTrace)
- {
- PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
- }
- if (consumedPages.remove(completePage.getPageId()) == null)
- {
- PageSubscriptionImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
- " from consumed pages on cursor for address " +
- pageStore.getAddress());
- }
- }
- }
-
- cursorProvider.scheduleCleanup();
- }
- });
- }
- });
-
- tx.commit();
-
- }
-
// Inner classes -------------------------------------------------
/**
@@ -1038,6 +933,137 @@
}
}
+ }
+
+ class CursorIterator implements LinkedListIterator<PagedReference>
+ {
+ private PagePosition position = null;
+
+ private PagePosition lastOperation = null;
+
+ private final LinkedListIterator<PagePosition> redeliveryIterator;
+
+ private volatile boolean isredelivery = false;
+
+ /** next element taken on hasNext test.
+ * it has to be delivered on next next operation */
+ private volatile PagedReference cachedNext;
+
+ public CursorIterator()
+ {
+ synchronized (redeliveries)
+ {
+ redeliveryIterator = redeliveries.iterator();
+ }
+ }
+
+
+ public void repeat()
+ {
+ if (isredelivery)
+ {
+ synchronized (redeliveries)
+ {
+ redeliveryIterator.repeat();
+ }
+ }
+ else
+ {
+ if (lastOperation == null)
+ {
+ position = null;
+ }
+ else
+ {
+ position = lastOperation;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ public synchronized PagedReference next()
+ {
+
+ if (cachedNext != null)
+ {
+ PagedReference retPos = cachedNext;
+ cachedNext = null;
+ return retPos;
+ }
+
+ try
+ {
+ synchronized (redeliveries)
+ {
+ if (redeliveryIterator.hasNext())
+ {
+ // There's a redelivery pending, we will get it out of that pool instead
+ isredelivery = true;
+ return getReference(redeliveryIterator.next());
+ }
+ else
+ {
+ isredelivery = false;
+ }
+ }
+
+ if (position == null)
+ {
+ position = getStartPosition();
+ }
+
+ PagedReference nextPos = moveNext(position);
+ if (nextPos != null)
+ {
+ lastOperation = position;
+ position = nextPos.getPosition();
+ }
+ return nextPos;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
+ * It would be a rare race condition but I would prefer avoiding that scenario */
+ public synchronized boolean hasNext()
+ {
+ // if an unbehaved program called hasNext twice before next, we only cache it once.
+ if (cachedNext != null)
+ {
+ return true;
+ }
+
+ if (!pageStore.isPaging())
+ {
+ return false;
+ }
+
+ cachedNext = next();
+
+ return cachedNext != null;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#remove()
+ */
+ public void remove()
+ {
+ PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.LinkedListIterator#close()
+ */
+ public void close()
+ {
+ }
}
+
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -13,12 +13,15 @@
package org.hornetq.core.paging.impl;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
@@ -27,6 +30,8 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.utils.DataConstants;
/**
@@ -98,10 +103,7 @@
{
log.warn("Can't delete page transaction id=" + this.recordID);
}
- }
-
- if (sizeAfterUpdate == 0 && pagingManager != null)
- {
+
pagingManager.removeTransaction(this.transactionID);
}
}
@@ -110,6 +112,11 @@
{
numberOfMessages.incrementAndGet();
}
+
+ public void increment(final int size)
+ {
+ numberOfMessages.addAndGet(size);
+ }
public int getNumberOfMessages()
{
@@ -158,40 +165,37 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
*/
- public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int depages) throws Exception
+ public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception
{
- storageManager.updatePageTransaction(tx.getID(), this, depages);
+ UpdatePageTXOperation pgtxUpdate = (UpdatePageTXOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE);
- final PageTransactionInfo pgToUpdate = this;
+ if (pgtxUpdate == null)
+ {
+ pgtxUpdate = new UpdatePageTXOperation(storageManager, pagingManager);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE, pgtxUpdate);
+ tx.addOperation(pgtxUpdate);
+ }
- tx.addOperation(new TransactionOperation()
+ pgtxUpdate.addUpdate(this);
+ }
+
+ public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager) throws Exception
+ {
+ storageManager.updatePageTransaction(this, 1);
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- public void beforeRollback(Transaction tx) throws Exception
+ public void onError(int errorCode, String errorMessage)
{
}
- public void beforePrepare(Transaction tx) throws Exception
+ public void done()
{
+ PageTransactionInfoImpl.this.onUpdate(1, storageManager, pagingManager);
}
-
- public void beforeCommit(Transaction tx) throws Exception
- {
- }
-
- public void afterRollback(Transaction tx)
- {
- }
-
- public void afterPrepare(Transaction tx)
- {
- }
-
- public void afterCommit(Transaction tx)
- {
- pgToUpdate.onUpdate(depages, storageManager, pagingManager);
- }
});
}
+
+
public boolean isCommit()
{
@@ -260,4 +264,68 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+
+ static class UpdatePageTXOperation extends TransactionOperationAbstract
+ {
+ private HashMap<PageTransactionInfo, AtomicInteger> countsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
+
+ private boolean stored = false;
+
+ private final StorageManager storageManager;
+
+ private final PagingManager pagingManager;
+
+ public UpdatePageTXOperation(final StorageManager storageManager, final PagingManager pagingManager)
+ {
+ this.storageManager = storageManager;
+ this.pagingManager = pagingManager;
+ }
+
+ public void addUpdate(PageTransactionInfo info)
+ {
+ AtomicInteger counter = countsToUpdate.get(info);
+
+ if (counter == null)
+ {
+ counter = new AtomicInteger(0);
+ countsToUpdate.put(info, counter);
+ }
+
+ counter.incrementAndGet();
+ }
+
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ storeUpdates(tx);
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ storeUpdates(tx);
+ }
+
+ public void afterCommit(Transaction tx)
+ {
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : countsToUpdate.entrySet())
+ {
+ entry.getKey().onUpdate(entry.getValue().intValue(), storageManager, pagingManager);
+ }
+ }
+
+ private void storeUpdates(Transaction tx) throws Exception
+ {
+ if (!stored)
+ {
+ stored = true;
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : countsToUpdate.entrySet())
+ {
+ storageManager.updatePageTransaction(tx.getID(), entry.getKey(), entry.getValue().get());
+ }
+ }
+ }
+
+
+
+ }
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -14,7 +14,6 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,8 +50,8 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ExecutorFactory;
@@ -700,59 +699,6 @@
* @return
* @throws Exception
*/
- protected boolean readPage() throws Exception
- {
- Page page = depage();
-
- // It's important that only depage should happen while locked
- // or we would be holding a lock for a long time
- // The reading (IO part) should happen outside of any locks
-
- if (page == null)
- {
- return false;
- }
-
- page.open();
-
- List<PagedMessage> messages = null;
-
- try
- {
- messages = page.read();
- }
- finally
- {
- try
- {
- page.close();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- if (onDepage(page.getPageId(), storeName, messages))
- {
- if (page.delete())
- {
- // DuplicateCache could be null during replication
- // however the deletes on the journal will happen through replicated journal
- if (duplicateCache != null)
- {
- duplicateCache.deleteFromCache(generateDuplicateID(page.getPageId()));
- }
- }
-
- return true;
- }
- else
- {
- return false;
- }
-
- }
-
private Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
private class MemoryFreedRunnablesExecutor implements Runnable
@@ -951,7 +897,7 @@
Transaction tx = ctx.getTransaction();
- pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(tx));
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(tx, listCtx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -991,7 +937,7 @@
return ids;
}
- private long getTransactionID(Transaction tx) throws Exception
+ private long getTransactionID(final Transaction tx, final RouteContextList listCtx) throws Exception
{
if (tx == null)
{
@@ -999,34 +945,42 @@
}
else
{
- if (tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION) == null)
+ PageTransactionInfo pgTX = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ if (pgTX == null)
{
- PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+ pgTX = new PageTransactionInfoImpl(tx.getID());
System.out.println("Creating pageTransaction " + pgTX.getTransactionID());
- storageManager.storePageTransaction(tx.getID(), pgTX);
pagingManager.addTransaction(pgTX);
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
- tx.addOperation(new FinishPageMessageOperation());
+ tx.addOperation(new FinishPageMessageOperation(pgTX));
tx.setContainsPersistent();
}
+ pgTX.increment(listCtx.getNumberOfQueues());
+
return tx.getID();
}
}
- private static class FinishPageMessageOperation implements TransactionOperation
+ private class FinishPageMessageOperation implements TransactionOperation
{
+ private final PageTransactionInfo pageTransaction;
+
+ private boolean stored = false;
+ public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
+ {
+ this.pageTransaction = pageTransaction;
+ }
+
public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
// or else we could deliver the messages out of order
- PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
if (pageTransaction != null)
{
pageTransaction.commit();
@@ -1039,8 +993,6 @@
public void afterRollback(final Transaction tx)
{
- PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
if (tx.getState() == State.PREPARED && pageTransaction != null)
{
pageTransaction.rollback();
@@ -1049,11 +1001,22 @@
public void beforeCommit(final Transaction tx) throws Exception
{
+ storePageTX(tx);
}
public void beforePrepare(final Transaction tx) throws Exception
{
+ storePageTX(tx);
}
+
+ private void storePageTX(final Transaction tx) throws Exception
+ {
+ if (!stored)
+ {
+ storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ stored = true;
+ }
+ }
public void beforeRollback(final Transaction tx) throws Exception
{
@@ -1067,157 +1030,7 @@
* If persistent messages are also used, it will update eventual PageTransactions
*/
- private boolean onDepage(final int pageId, final SimpleString address, final List<PagedMessage> pagedMessages) throws Exception
- {
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Depaging....");
- }
-
- if (pagedMessages.size() == 0)
- {
- // nothing to be done on this case.
- return true;
- }
-
- // Depage has to be done atomically, in case of failure it should be
- // back to where it was
-
- byte[] duplicateIdForPage = generateDuplicateID(pageId);
-
- Transaction depageTransaction = new TransactionImpl(storageManager);
-
- // DuplicateCache could be null during replication
- if (duplicateCache != null)
- {
- if (duplicateCache.contains(duplicateIdForPage))
- {
- log.warn("Page " + pageId +
- " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
- return true;
- }
-
- duplicateCache.addToCache(duplicateIdForPage, depageTransaction);
- }
-
- depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
-
- HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
-
- for (PagedMessage pagedMessage : pagedMessages)
- {
- ServerMessage message = pagedMessage.getMessage();
-
- if (message.isLargeMessage())
- {
- LargeServerMessage largeMsg = (LargeServerMessage)message;
- if (!largeMsg.isFileExists())
- {
- PagingStoreImpl.log.warn("File for large message " + largeMsg.getMessageID() +
- " doesn't exist, so ignoring depage for this large message");
- continue;
- }
- }
-
- final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-
- PageTransactionInfo pageUserTransaction = null;
- AtomicInteger countPageTX = null;
-
- if (transactionIdDuringPaging >= 0)
- {
- pageUserTransaction = pagingManager.getTransaction(transactionIdDuringPaging);
-
- if (pageUserTransaction == null)
- {
- // This is not supposed to happen
- PagingStoreImpl.log.warn("Transaction " + pagedMessage.getTransactionID() +
- " used during paging not found");
- continue;
- }
- else
- {
- countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
- if (countPageTX == null)
- {
- countPageTX = new AtomicInteger();
- pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
- }
-
- // This is to avoid a race condition where messages are depaged
- // before the commit arrived
-
- while (running)
- {
- // This is just to give us a chance to interrupt the process..
- // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
- // the shutdown of the server
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Waiting pageTransaction to complete");
- }
- }
-
- if (!running)
- {
- break;
- }
-
- if (!pageUserTransaction.isCommit())
- {
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Rollback was called after prepare, ignoring message " + message);
- }
- continue;
- }
- }
-
- }
-
- postOffice.route(message, depageTransaction, false);
-
- // This means the page is duplicated. So we need to ignore this
- if (depageTransaction.getState() == State.ROLLBACK_ONLY)
- {
- break;
- }
-
- // Update information about transactions
- // This needs to be done after routing because of duplication detection
- if (pageUserTransaction != null && message.isDurable())
- {
- countPageTX.incrementAndGet();
- }
- }
-
- if (!running)
- {
- depageTransaction.rollback();
- return false;
- }
-
- for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : pageTransactionsToUpdate.entrySet())
- {
- // This will set the journal transaction to commit;
- depageTransaction.setContainsPersistent();
-
- entry.getKey().storeUpdate(storageManager, this.pagingManager, depageTransaction, entry.getValue().intValue());
- }
-
- depageTransaction.commit();
-
- storageManager.waitOnOperations();
-
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Depage committed, running = " + running);
- }
-
- return true;
- }
-
- /**
+ /**
* @param pageId
* @return
*/
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -142,6 +142,8 @@
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
+
+ void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception;
void deletePageTransactional(long recordID) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -606,6 +606,16 @@
depages));
}
+ public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
+ {
+ messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
+ }
+
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -479,4 +479,13 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -24,6 +24,8 @@
*/
public interface RouteContextList
{
+
+ int getNumberOfQueues();
List<Queue> getDurableQueues();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -1272,7 +1272,8 @@
if (msgsToDeliver > 0)
{
- System.out.println("Depaging " + msgsToDeliver + " messages");
+ //System.out.println("Depaging " + msgsToDeliver + " messages");
+ System.out.println("Depage " + msgsToDeliver + " now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
int nmessages = 0;
while (nmessages < msgsToDeliver && pageIterator.hasNext())
@@ -1281,7 +1282,13 @@
addTail(pageIterator.next(), false);
pageIterator.remove();
}
+
+ System.out.println("Depaged " + nmessages);
}
+ else
+ {
+ System.out.println("Depaging not being done now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
+ }
deliverAsync();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -124,6 +124,11 @@
private List<Queue> durableQueue = new ArrayList<Queue>(1);
private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
+
+ public int getNumberOfQueues()
+ {
+ return durableQueue.size() + nonDurableQueue.size();
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.RouteContextList#getDurableQueues()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -24,8 +24,9 @@
*/
public class TransactionPropertyIndexes
{
- public static final int IS_DEPAGE = 3;
+ public static final int PAGE_TRANSACTION_UPDATE = 4;
+
public static final int PAGE_TRANSACTION = 5;
public static final int REFS_OPERATION = 6;
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -1012,37 +1012,6 @@
syncNonTransactional);
}
- protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx, org.hornetq.core.server.RouteContextList listCtx, boolean sync) throws Exception
- {
- boolean paged = super.page(message, ctx, listCtx, sync);
-
- if (paged)
- {
-
- if (countDepage.incrementAndGet() == 1)
- {
- countDepage.set(0);
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- while (isStarted() && readPage());
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- });
- }
- }
-
- return paged;
- }
-
public boolean startDepaging()
{
// do nothing, we are hacking depage right in between paging
@@ -1306,8 +1275,9 @@
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = consumer.receive(500000);
+ ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
+ System.out.println("Received " + i);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-11 02:09:12 UTC (rev 9870)
@@ -1553,6 +1553,15 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
14 years, 1 month
JBoss hornetq SVN: r9869 - in trunk/src/config: jboss-as-4/non-clustered and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 20:11:10 -0500 (Wed, 10 Nov 2010)
New Revision: 9869
Modified:
trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml
trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml
trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml
trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml
trunk/src/config/stand-alone/clustered/hornetq-jms.xml
trunk/src/config/stand-alone/non-clustered/hornetq-jms.xml
Log:
tweaks on configs
Modified: trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml 2010-11-11 01:10:29 UTC (rev 9868)
+++ trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml 2010-11-11 01:11:10 UTC (rev 9869)
@@ -3,38 +3,65 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
- <xa>true<xa>
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
- <entry name="/ConnectionFactory"/>
<entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="NettyConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
- <entry name="/ThroughputConnectionFactory"/>
<entry name="/XAThroughputConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty-throughput"/>
+ </connectors>
+ <entries>
+ <entry name="/ThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="InVMConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
- <entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="InVMConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="in-vm"/>
+ </connectors>
+ <entries>
+ <entry name="java:/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
Modified: trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml 2010-11-11 01:10:29 UTC (rev 9868)
+++ trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml 2010-11-11 01:11:10 UTC (rev 9869)
@@ -8,33 +8,60 @@
<connector-ref connector-name="netty"/>
</connectors>
<entries>
- <entry name="/ConnectionFactory"/>
<entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="NettyConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
- <connectors>
+ <connectors>
<connector-ref connector-name="netty-throughput"/>
- </connectors>
- <entries>
- <entry name="/ThroughputConnectionFactory"/>
- <entry name="/XAThroughputConnectionFactory"/>
- </entries>
- </connection-factory>
+ </connectors>
+ <entries>
+ <entry name="/XAThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty-throughput"/>
+ </connectors>
+ <entries>
+ <entry name="/ThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="InVMConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
- <entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
</entries>
</connection-factory>
-
+
+ <connection-factory name="InVMConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="in-vm"/>
+ </connectors>
+ <entries>
+ <entry name="java:/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
Modified: trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml 2010-11-11 01:10:29 UTC (rev 9868)
+++ trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml 2010-11-11 01:11:10 UTC (rev 9869)
@@ -8,33 +8,60 @@
<connector-ref connector-name="netty"/>
</connectors>
<entries>
- <entry name="/ConnectionFactory"/>
<entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="NettyConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
- <entry name="/ThroughputConnectionFactory"/>
<entry name="/XAThroughputConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty-throughput"/>
+ </connectors>
+ <entries>
+ <entry name="/ThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="InVMConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
- <entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="InVMConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="in-vm"/>
+ </connectors>
+ <entries>
+ <entry name="java:/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
Modified: trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml 2010-11-11 01:10:29 UTC (rev 9868)
+++ trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml 2010-11-11 01:11:10 UTC (rev 9869)
@@ -8,33 +8,60 @@
<connector-ref connector-name="netty"/>
</connectors>
<entries>
- <entry name="/ConnectionFactory"/>
<entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
+ <connection-factory name="NettyConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
- <connectors>
+ <connectors>
<connector-ref connector-name="netty-throughput"/>
- </connectors>
- <entries>
- <entry name="/ThroughputConnectionFactory"/>
- <entry name="/XAThroughputConnectionFactory"/>
- </entries>
- </connection-factory>
+ </connectors>
+ <entries>
+ <entry name="/XAThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty-throughput"/>
+ </connectors>
+ <entries>
+ <entry name="/ThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<connection-factory name="InVMConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
- <entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
</entries>
</connection-factory>
-
+
+ <connection-factory name="InVMConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="in-vm"/>
+ </connectors>
+ <entries>
+ <entry name="java:/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
Modified: trunk/src/config/stand-alone/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-jms.xml 2010-11-11 01:10:29 UTC (rev 9868)
+++ trunk/src/config/stand-alone/clustered/hornetq-jms.xml 2010-11-11 01:11:10 UTC (rev 9869)
@@ -1,38 +1,53 @@
<configuration xmlns="urn:hornetq"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
-
- <connection-factory name="NettyConnectionFactory">
- <connectors>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+
+ <connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
+ <connectors>
<connector-ref connector-name="netty"/>
- </connectors>
- <entries>
- <entry name="/ConnectionFactory"/>
- <entry name="/XAConnectionFactory"/>
- </entries>
- </connection-factory>
-
- <connection-factory name="NettyThroughputConnectionFactory">
- <connectors>
+ </connectors>
+ <entries>
+ <entry name="/XAConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="NettyConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
+ <connectors>
<connector-ref connector-name="netty-throughput"/>
- </connectors>
- <entries>
- <entry name="/ThroughputConnectionFactory"/>
- <entry name="/XAThroughputConnectionFactory"/>
- </entries>
- </connection-factory>
-
- <queue name="DLQ">
- <entry name="/queue/DLQ"/>
- </queue>
- <queue name="ExpiryQueue">
- <entry name="/queue/ExpiryQueue"/>
- </queue>
- <queue name="ExampleQueue">
- <entry name="/queue/ExampleQueue"/>
- </queue>
- <topic name="ExampleTopic">
- <entry name="/topic/ExampleTopic"/>
- </topic>
-
-</configuration>
\ No newline at end of file
+ </connectors>
+ <entries>
+ <entry name="/XAThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty-throughput"/>
+ </connectors>
+ <entries>
+ <entry name="/ThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <queue name="DLQ">
+ <entry name="/queue/DLQ"/>
+ </queue>
+
+ <queue name="ExpiryQueue">
+ <entry name="/queue/ExpiryQueue"/>
+ </queue>
+
+</configuration>
Modified: trunk/src/config/stand-alone/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-jms.xml 2010-11-11 01:10:29 UTC (rev 9868)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-jms.xml 2010-11-11 01:11:10 UTC (rev 9869)
@@ -1,38 +1,53 @@
<configuration xmlns="urn:hornetq"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
-
- <connection-factory name="NettyConnectionFactory">
- <connectors>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+
+ <connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
+ <connectors>
<connector-ref connector-name="netty"/>
- </connectors>
- <entries>
- <entry name="/ConnectionFactory"/>
- <entry name="/XAConnectionFactory"/>
- </entries>
- </connection-factory>
-
- <connection-factory name="NettyThroughputConnectionFactory">
- <connectors>
+ </connectors>
+ <entries>
+ <entry name="/XAConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="NettyConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
+ <connectors>
<connector-ref connector-name="netty-throughput"/>
- </connectors>
- <entries>
- <entry name="/ThroughputConnectionFactory"/>
- <entry name="/XAThroughputConnectionFactory"/>
- </entries>
- </connection-factory>
-
- <queue name="DLQ">
- <entry name="/queue/DLQ"/>
- </queue>
- <queue name="ExpiryQueue">
- <entry name="/queue/ExpiryQueue"/>
- </queue>
- <queue name="ExampleQueue">
- <entry name="/queue/ExampleQueue"/>
- </queue>
- <topic name="ExampleTopic">
- <entry name="/topic/ExampleTopic"/>
- </topic>
-
-</configuration>
\ No newline at end of file
+ </connectors>
+ <entries>
+ <entry name="/XAThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="NettyThroughputConnectionFactory">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty-throughput"/>
+ </connectors>
+ <entries>
+ <entry name="/ThroughputConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <queue name="DLQ">
+ <entry name="/queue/DLQ"/>
+ </queue>
+
+ <queue name="ExpiryQueue">
+ <entry name="/queue/ExpiryQueue"/>
+ </queue>
+
+</configuration>
14 years, 1 month
JBoss hornetq SVN: r9868 - trunk/hornetq-rest/docbook.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 20:10:29 -0500 (Wed, 10 Nov 2010)
New Revision: 9868
Modified:
trunk/hornetq-rest/docbook/
Log:
svn:ignore
Property changes on: trunk/hornetq-rest/docbook
___________________________________________________________________
Name: svn:ignore
+ target
14 years, 1 month
JBoss hornetq SVN: r9867 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 15:34:02 -0500 (Wed, 10 Nov 2010)
New Revision: 9867
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
tweak
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-10 19:25:04 UTC (rev 9866)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-10 20:34:02 UTC (rev 9867)
@@ -37,7 +37,6 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReference;
-import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
@@ -220,7 +219,6 @@
{
PagedReference retPos = cachedNext;
cachedNext = null;
- System.out.println("Returning cached next " + retPos);
return retPos;
}
14 years, 1 month
JBoss hornetq SVN: r9866 - in branches/Branch_New_Paging/src/main/org/hornetq/core: postoffice/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 14:25:04 -0500 (Wed, 10 Nov 2010)
New Revision: 9866
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
Log:
async deliveries after paging
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-10 19:25:04 UTC (rev 9866)
@@ -948,8 +948,10 @@
// This will force everything to be persisted
message.bodyChanged();
}
+
+ Transaction tx = ctx.getTransaction();
- pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(ctx));
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(tx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -989,9 +991,8 @@
return ids;
}
- private long getTransactionID(RoutingContext ctx) throws Exception
+ private long getTransactionID(Transaction tx) throws Exception
{
- Transaction tx = ctx.getTransaction();
if (tx == null)
{
return 0l;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-10 19:25:04 UTC (rev 9866)
@@ -15,9 +15,11 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -59,6 +61,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -834,6 +837,28 @@
processRoute(message, context, false);
}
}
+
+
+ private class PageDelivery extends TransactionOperationAbstract
+ {
+ private Set<Queue> queues = new HashSet<Queue>();
+
+ public void addQueues(List<Queue> queueList)
+ {
+ queues.addAll(queueList);
+ }
+
+ public void afterCommit(Transaction tx)
+ {
+ // We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is going towards the queues
+ // The queue will try to depage case it's empty
+ for (Queue queue : queues)
+ {
+ queue.deliverAsync();
+ }
+ }
+
+ }
private void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
{
@@ -848,6 +873,50 @@
if (store.page(message, context, entry.getValue()))
{
+
+ if (tx != null)
+ {
+ PageDelivery delivery = (PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
+ if (delivery == null)
+ {
+ delivery = new PageDelivery();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
+ tx.addOperation(delivery);
+ }
+
+ delivery.addQueues(entry.getValue().getDurableQueues());
+ delivery.addQueues(entry.getValue().getNonDurableQueues());
+ }
+ else
+ {
+
+ List<Queue> durableQueues = entry.getValue().getDurableQueues();
+ List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
+
+ final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
+
+ queues.addAll(durableQueues);
+ queues.addAll(nonDurableQueues);
+
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ for (Queue queue : queues)
+ {
+ // in case of paging, we need to kick asynchronous delivery to try delivering
+ queue.deliverAsync();
+ }
+ }
+ });
+ }
+
+
continue;
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java 2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java 2010-11-10 19:25:04 UTC (rev 9866)
@@ -50,16 +50,16 @@
/** After commit shouldn't throw any exception. Any verification has to be done on before commit */
public void afterCommit(Transaction tx)
{
- };
+ }
public void beforeRollback(Transaction tx) throws Exception
{
- };
+ }
/** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
public void afterRollback(Transaction tx)
{
- };
+ }
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-10 19:25:04 UTC (rev 9866)
@@ -30,7 +30,7 @@
public static final int REFS_OPERATION = 6;
- public static final int PAGE_MESSAGES_OPERATION = 7;
+ public static final int PAGE_DELIVERY = 7;
public static final int PAGE_CURSOR_POSITIONS = 8;
}
14 years, 1 month
JBoss hornetq SVN: r9865 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 12:28:31 -0500 (Wed, 10 Nov 2010)
New Revision: 9865
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
fixing deadlock
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-10 02:49:26 UTC (rev 9864)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-10 17:28:31 UTC (rev 9865)
@@ -167,23 +167,35 @@
class CursorIterator implements LinkedListIterator<PagedReference>
{
- PagePosition position = null;
+ private PagePosition position = null;
- PagePosition lastOperation = null;
+ private PagePosition lastOperation = null;
- LinkedListIterator<PagePosition> redeliveryIterator = redeliveries.iterator();
+ private final LinkedListIterator<PagePosition> redeliveryIterator;
- boolean isredelivery = false;
+ private volatile boolean isredelivery = false;
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
- PagedReference cachedNext;
+ private volatile PagedReference cachedNext;
+
+ public CursorIterator()
+ {
+ synchronized (redeliveries)
+ {
+ redeliveryIterator = redeliveries.iterator();
+ }
+ }
+
public void repeat()
{
if (isredelivery)
{
- redeliveryIterator.repeat();
+ synchronized (redeliveries)
+ {
+ redeliveryIterator.repeat();
+ }
}
else
{
@@ -214,16 +226,19 @@
try
{
- if (redeliveryIterator.hasNext())
+ synchronized (redeliveries)
{
- // There's a redelivery pending, we will get it out of that pool instead
- isredelivery = true;
- return getReference(redeliveryIterator.next());
+ if (redeliveryIterator.hasNext())
+ {
+ // There's a redelivery pending, we will get it out of that pool instead
+ isredelivery = true;
+ return getReference(redeliveryIterator.next());
+ }
+ else
+ {
+ isredelivery = false;
+ }
}
- else
- {
- isredelivery = false;
- }
if (position == null)
{
@@ -448,9 +463,12 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public synchronized void redeliver(final PagePosition position)
+ public void redeliver(final PagePosition position)
{
- redeliveries.addTail(position);
+ synchronized (redeliveries)
+ {
+ redeliveries.addTail(position);
+ }
}
/**
14 years, 1 month
JBoss hornetq SVN: r9864 - in trunk/src/config: jboss-as-4/non-clustered and 4 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-09 21:49:26 -0500 (Tue, 09 Nov 2010)
New Revision: 9864
Modified:
trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml
trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml
trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml
trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml
trunk/src/config/jboss-as-6/clustered/hornetq-jms.xml
trunk/src/config/jboss-as-6/non-clustered/hornetq-jms.xml
Log:
HORNETQ-566
Modified: trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml 2010-11-10 02:27:37 UTC (rev 9863)
+++ trunk/src/config/jboss-as-4/clustered/hornetq-jms.xml 2010-11-10 02:49:26 UTC (rev 9864)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
+ <xa>true<xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -13,6 +14,7 @@
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
@@ -23,6 +25,7 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
Modified: trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml 2010-11-10 02:27:37 UTC (rev 9863)
+++ trunk/src/config/jboss-as-4/non-clustered/hornetq-jms.xml 2010-11-10 02:49:26 UTC (rev 9864)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -13,6 +14,7 @@
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
@@ -23,6 +25,7 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
Modified: trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml 2010-11-10 02:27:37 UTC (rev 9863)
+++ trunk/src/config/jboss-as-5/clustered/hornetq-jms.xml 2010-11-10 02:49:26 UTC (rev 9864)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -13,6 +14,7 @@
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
@@ -23,6 +25,7 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
Modified: trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml 2010-11-10 02:27:37 UTC (rev 9863)
+++ trunk/src/config/jboss-as-5/non-clustered/hornetq-jms.xml 2010-11-10 02:49:26 UTC (rev 9864)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -13,6 +14,7 @@
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
@@ -23,6 +25,7 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
Modified: trunk/src/config/jboss-as-6/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-6/clustered/hornetq-jms.xml 2010-11-10 02:27:37 UTC (rev 9863)
+++ trunk/src/config/jboss-as-6/clustered/hornetq-jms.xml 2010-11-10 02:49:26 UTC (rev 9864)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -13,6 +14,7 @@
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
@@ -23,6 +25,7 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
@@ -40,4 +43,5 @@
<entry name="/queue/ExpiryQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
+
Modified: trunk/src/config/jboss-as-6/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as-6/non-clustered/hornetq-jms.xml 2010-11-10 02:27:37 UTC (rev 9863)
+++ trunk/src/config/jboss-as-6/non-clustered/hornetq-jms.xml 2010-11-10 02:49:26 UTC (rev 9864)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -13,6 +14,7 @@
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
@@ -23,6 +25,7 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
@@ -40,4 +43,4 @@
<entry name="/queue/ExpiryQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
14 years, 1 month