Author: clebert.suconic(a)jboss.com
Date: 2011-12-19 12:57:47 -0500 (Mon, 19 Dec 2011)
New Revision: 11914
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - improving things
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-19
14:51:20 UTC (rev 11913)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-19
17:57:47 UTC (rev 11914)
@@ -1276,7 +1276,7 @@
{
if (msg.getRefCount() == 0)
{
- JournalStorageManager.log.debug("Large message: " +
msg.getMessageID() +
+ JournalStorageManager.log.info("Large message: " +
msg.getMessageID() +
" didn't have any associated
reference, file will be deleted");
msg.decrementDelayDeletionCount();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-19
14:51:20 UTC (rev 11913)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-19
17:57:47 UTC (rev 11914)
@@ -1187,6 +1187,15 @@
return "HornetQServerImpl::" + (nodeManager != null ?
"serverUUID=" + nodeManager.getUUID() : "");
}
}
+
+ /**
+ * For tests only, don't use this method as it's not part of the API
+ * @param factory
+ */
+ public void replaceQueueFactory(QueueFactory factory)
+ {
+ this.queueFactory = factory;
+ }
// Package protected
// ----------------------------------------------------------------------------
@@ -1198,26 +1207,6 @@
* Protected so tests can change this behaviour
* @param backupConnector
*/
- // protected FailoverManagerImpl createBackupConnectionFailoverManager(final
TransportConfiguration backupConnector,
- // final ExecutorService threadPool,
- // final ScheduledExecutorService scheduledPool)
- // {
- // return new FailoverManagerImpl((ClientSessionFactory)null,
- // backupConnector,
- // null,
- // false,
- // HornetQClient.DEFAULT_CALL_TIMEOUT,
- // HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- // HornetQClient.DEFAULT_CONNECTION_TTL,
- // 0,
- // 1.0d,
- // 0,
- // 1,
- // false,
- // threadPool,
- // scheduledPool,
- // null);
- // }
protected PagingManager createPagingManager()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-19
14:51:20 UTC (rev 11913)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-19
17:57:47 UTC (rev 11914)
@@ -2203,7 +2203,8 @@
return status;
}
- private void postAcknowledge(final MessageReference ref)
+ // Protected as testcases may change this behaviour
+ protected void postAcknowledge(final MessageReference ref)
{
QueueImpl queue = (QueueImpl)ref.getQueue();
@@ -2219,6 +2220,15 @@
boolean durableRef = message.isDurable() && queue.durable;
+ try
+ {
+ message.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
+ }
+
if (durableRef)
{
int count = message.decrementDurableRefCount();
@@ -2250,15 +2260,6 @@
}
}
}
-
- try
- {
- message.decrementRefCount();
- }
- catch (Exception e)
- {
- QueueImpl.log.warn("Unable to decrement reference counting", e);
- }
}
void postRollback(final LinkedList<MessageReference> refs)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-19
14:51:20 UTC (rev 11913)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-19
17:57:47 UTC (rev 11914)
@@ -13,7 +13,10 @@
package org.hornetq.tests.integration.client;
+import java.io.IOException;
import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -29,14 +32,25 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.utils.ExecutorFactory;
/**
* A LargeMessageTest
@@ -128,9 +142,9 @@
}
server.stop(false);
-
+
forceGC();
-
+
server.start();
server.stop();
@@ -254,8 +268,12 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
-
+ server = createServer(true,
+ createDefaultConfig(isNetty()),
+ 10000,
+ 20000,
+ new HashMap<String, AddressSettings>());
+
// server.getConfiguration()
// .getInterceptorClassNames()
// .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -270,7 +288,7 @@
session = sf.createSession(false, true, true);
session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
+
server.getPagingManager().getPageStore(ADDRESS).startPaging();
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
@@ -282,19 +300,19 @@
producer.send(clientFile);
}
session.commit();
-
+
validateNoFilesOnLargeDir(10);
for (int h = 0; h < 5; h++)
{
session.close();
-
+
sf.close();
-
+
server.stop();
-
+
server.start();
-
+
sf = locator.createSessionFactory();
session = sf.createSession(false, false);
@@ -321,11 +339,11 @@
{
session.rollback();
}
-
+
session.close();
sf.close();
}
-
+
server.stop(false);
server.start();
@@ -363,8 +381,12 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
-
+ server = createServer(true,
+ createDefaultConfig(isNetty()),
+ 10000,
+ 20000,
+ new HashMap<String, AddressSettings>());
+
// server.getConfiguration()
// .getInterceptorClassNames()
// .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -377,7 +399,7 @@
ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(true, false, false);
-
+
Xid xid1 = newXID();
Xid xid2 = newXID();
@@ -394,13 +416,11 @@
producer.send(clientFile);
}
session.end(xid1, XAResource.TMSUCCESS);
-
+
session.prepare(xid1);
-
session.start(xid2, XAResource.TMNOFLAGS);
-
for (int i = 0; i < 10; i++)
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
@@ -409,32 +429,32 @@
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
-
+
session.prepare(xid2);
-
+
session.close();
sf.close();
-
+
server.stop(false);
server.start();
-
- for (int start = 0 ; start < 2; start++)
+
+ for (int start = 0; start < 2; start++)
{
System.out.println("Start " + start);
-
+
sf = locator.createSessionFactory();
-
+
if (start == 0)
{
session = sf.createSession(true, false, false);
session.commit(xid1, false);
session.close();
}
-
+
session = sf.createSession(false, false, false);
ClientConsumer cons1 = session.createConsumer(ADDRESS);
session.start();
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
@@ -442,7 +462,7 @@
assertEquals(1, msg.getIntProperty("txid").intValue());
msg.acknowledge();
}
-
+
if (start == 1)
{
session.commit();
@@ -451,26 +471,26 @@
{
session.rollback();
}
-
+
session.close();
sf.close();
-
+
server.stop();
server.start();
}
server.stop();
-
+
validateNoFilesOnLargeDir(10);
-
+
server.start();
sf = locator.createSessionFactory();
-
+
session = sf.createSession(true, false, false);
session.rollback(xid2);
-
+
sf.close();
-
+
server.stop();
server.start();
server.stop();
@@ -497,6 +517,296 @@
}
}
+ public void testRestartBeforeDelete() throws Exception
+ {
+
+ class NoPostACKQueue extends QueueImpl
+ {
+
+ public NoPostACKQueue(long id,
+ SimpleString address,
+ SimpleString name,
+ Filter filter,
+ PageSubscription pageSubscription,
+ boolean durable,
+ boolean temporary,
+ ScheduledExecutorService scheduledExecutor,
+ PostOffice postOffice,
+ StorageManager storageManager,
+ HierarchicalRepository<AddressSettings>
addressSettingsRepository,
+ Executor executor)
+ {
+ super(id,
+ address,
+ name,
+ filter,
+ pageSubscription,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ executor);
+ }
+
+ protected void postAcknowledge(final MessageReference ref)
+ {
+ System.out.println("Ignoring postACK on message " + ref);
+ }
+ }
+
+ class NoPostACKQueueFactory implements QueueFactory
+ {
+
+ final StorageManager storageManager;
+
+ final PostOffice postOffice;
+
+ final ScheduledExecutorService scheduledExecutor;
+
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+ final ExecutorFactory execFactory;
+
+ public NoPostACKQueueFactory(StorageManager storageManager,
+ PostOffice postOffice,
+ ScheduledExecutorService scheduledExecutor,
+ HierarchicalRepository<AddressSettings>
addressSettingsRepository,
+ final ExecutorFactory execFactory)
+ {
+ this.storageManager = storageManager;
+ this.postOffice = postOffice;
+ this.scheduledExecutor = scheduledExecutor;
+ this.addressSettingsRepository = addressSettingsRepository;
+ this.execFactory = execFactory;
+ }
+
+ public Queue createQueue(long persistenceID,
+ SimpleString address,
+ SimpleString name,
+ Filter filter,
+ PageSubscription pageSubscription,
+ boolean durable,
+ boolean temporary)
+ {
+
+ return new NoPostACKQueue(persistenceID,
+ address,
+ name,
+ filter,
+ pageSubscription,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ execFactory.getExecutor());
+// return new QueueImpl(persistenceID,
+// address,
+// name,
+// filter,
+// pageSubscription,
+// durable,
+// temporary,
+// scheduledExecutor,
+// postOffice,
+// storageManager,
+// addressSettingsRepository,
+// execFactory.getExecutor());
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.QueueFactory#setPostOffice(org.hornetq.core.postoffice.PostOffice)
+ */
+ public void setPostOffice(PostOffice postOffice)
+ {
+ }
+
+ }
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+ server.start();
+
+ QueueFactory original = server.getQueueFactory();
+
+ ((HornetQServerImpl)server).replaceQueueFactory(new
NoPostACKQueueFactory(server.getStorageManager(),
+
server.getPostOffice(),
+
server.getScheduledPool(),
+
server.getAddressSettingsRepository(),
+
server.getExecutorFactory()));
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.saveToOutputStream(new java.io.OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+ });
+ msg.acknowledge();
+ session.commit();
+ }
+
+ ((HornetQServerImpl)server).replaceQueueFactory(original);
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testConsumeAfterRestart() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+ server.start();
+
+ QueueFactory original = server.getQueueFactory();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+ sf.close();
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.saveToOutputStream(new java.io.OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+ });
+ msg.acknowledge();
+ session.commit();
+ }
+
+ ((HornetQServerImpl)server).replaceQueueFactory(original);
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public static class LargeMessageTestInterceptorIgnoreLastPacket implements
Interceptor
{