[hornetq-commits] JBoss hornetq SVN: r10650 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/journal/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri May 13 01:14:48 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-13 01:14:47 -0400 (Fri, 13 May 2011)
New Revision: 10650
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-6466 - improving IO on depaging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java 2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java 2011-05-13 05:14:47 UTC (rev 10650)
@@ -95,7 +95,7 @@
SequentialFile copy();
- void copyTo(SequentialFile newFileName);
+ void copyTo(SequentialFile newFileName) throws Exception;
void setTimedBuffer(TimedBuffer buffer);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-05-13 05:14:47 UTC (rev 10650)
@@ -105,35 +105,27 @@
file.delete();
}
- public void copyTo(SequentialFile newFileName)
+ public void copyTo(SequentialFile newFileName) throws Exception
{
- try
+ log.debug("Copying " + this + " as " + newFileName);
+ newFileName.open();
+ this.open();
+
+
+ ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+
+ for (;;)
{
- log.debug("Copying " + this + " as " + newFileName);
- newFileName.open();
- this.open();
-
-
- ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
-
- for (;;)
+ buffer.rewind();
+ int size = this.read(buffer);
+ newFileName.writeInternal(buffer);
+ if (size < 10 * 1024)
{
- buffer.rewind();
- int size = this.read(buffer);
- newFileName.writeInternal(buffer);
- if (size < 10 * 1024)
- {
- break;
- }
+ break;
}
- newFileName.close();
- this.close();
}
- catch (Exception e)
- {
- log.warn("Error on copying file " + this + " as " + newFileName, e);
- }
-
+ newFileName.close();
+ this.close();
}
public void position(final long pos) throws Exception
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-05-13 05:14:47 UTC (rev 10650)
@@ -307,19 +307,29 @@
}
else
{
- SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-
- file.copyTo(newFile);
-
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
-
- newMessage.linkMessage = null;
-
- newMessage.setPaged();
-
- return newMessage;
+ try
+ {
+ validateFile();
+
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+
+ newMessage.linkMessage = null;
+
+ newMessage.setPaged();
+
+ return newMessage;
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on copying large message this for DLA or Expiry", e);
+ return null;
+ }
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-13 05:14:47 UTC (rev 10650)
@@ -120,7 +120,7 @@
{
locator.close();
- super.tearDown();
+ //super.tearDown();
}
public void testPreparePersistent() throws Exception
@@ -3741,6 +3741,163 @@
}
}
+ public void testExpireLargeMessageOnPaging() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+ AddressSettings dla = new AddressSettings();
+ dla.setMaxDeliveryAttempts(5);
+ dla.setDeadLetterAddress(new SimpleString("DLA"));
+ dla.setExpiryAddress(new SimpleString("DLA"));
+ settings.put(ADDRESS.toString(), dla);
+
+ final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+ server.start();
+
+ final int messageSize = 20;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.createQueue("DLA", "DLA");
+
+ PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+ pgStoreAddress.startPaging();
+ PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < 500; i++)
+ {
+ log.info("send message #" + i);
+ message = session.createMessage(true);
+
+ message.putStringProperty("id", "str" + i);
+
+ message.setExpiration(System.currentTimeMillis() + 2000);
+
+ if (i % 2 == 0)
+ {
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+ }
+ else
+ {
+ byte bytes[] = new byte[messageSize];
+ for (int s = 0 ; s < bytes.length; s++)
+ {
+ bytes[s] = getSamplebyte(s);
+ }
+ message.getBodyBuffer().writeBytes(bytes);
+ }
+
+ producer.send(message);
+
+ if ((i + 1) % 2 == 0)
+ {
+ session.commit();
+ if (i < 400)
+ {
+ pgStoreAddress.forceAnotherPage();
+ }
+ }
+ }
+
+ session.commit();
+
+ sf.close();
+
+ locator.close();
+
+ server.stop();
+
+ Thread.sleep(3000);
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ assertNull(cons.receive(1000));
+
+ cons.close();
+
+ cons = session.createConsumer("DLA");
+
+ for (int i = 0; i < 500; i++)
+ {
+ log.info("Received message " + i);
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ message.saveToOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+
+ }
+ });
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ session.commit();
+
+ cons.close();
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+
+ while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+ {
+ Thread.sleep(50);
+ }
+
+ assertFalse(pgStoreAddress.isPaging());
+
+ session.close();
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the hornetq-commits
mailing list