Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 16:20:56 -0500 (Fri, 27 Nov 2009)
New Revision: 8433
Modified:
trunk/src/main/org/hornetq/core/persistence/OperationContext.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-226 - Large Message and Diverts
Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-27 21:15:05
UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-27 21:20:56
UTC (rev 8433)
@@ -13,8 +13,6 @@
package org.hornetq.core.persistence;
-import java.util.concurrent.Executor;
-
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
@@ -30,10 +28,6 @@
public interface OperationContext extends IOCompletion
{
- /** The executor used on responses.
- * If this is not set, it will use the current thread. */
- void setExecutor(Executor executor);
-
/** Execute the task when all IO operations are complete,
* Or execute it immediately if nothing is pending. */
void executeOnCompletion(IOAsyncTask runnable);
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-27
21:15:05 UTC (rev 8432)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-27
21:20:56 UTC (rev 8433)
@@ -81,7 +81,7 @@
private String errorMessage = null;
- private Executor executor;
+ private final Executor executor;
private final AtomicInteger executorsPending = new AtomicInteger(0);
@@ -102,12 +102,6 @@
replicationLineUp++;
}
- /** this method needs to be called before the executor became operational */
- public void setExecutor(Executor executor)
- {
- this.executor = executor;
- }
-
public synchronized void replicationDone()
{
replicated++;
@@ -137,25 +131,18 @@
// On this case, we can just execute the context directly
if (replicationLineUp == replicated && storeLineUp == stored)
{
- if (executor != null)
+ // We want to avoid the executor if everything is complete...
+ // However, we can't execute the context if there are executions pending
+ // We need to use the executor on this case
+ if (executorsPending.get() == 0)
{
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are executions
pending
- // We need to use the executor on this case
- if (executorsPending.get() == 0)
- {
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute the task
directly on the same thread
- executeNow = true;
- }
- else
- {
- execute(completion);
- }
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just execute the task
directly on the same thread
+ executeNow = true;
}
else
{
- executeNow = true;
+ execute(completion);
}
}
else
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-27
21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-27
21:20:56 UTC (rev 8433)
@@ -550,11 +550,6 @@
throw new IllegalStateException("Message cannot be routed more than
once");
}
- if (message.getMessageID() == 0l)
- {
- generateID(message);
- }
-
RoutingContext context = new RoutingContextImpl(tx);
SimpleString address = message.getDestination();
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-27 21:15:05 UTC
(rev 8432)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-27 21:20:56 UTC
(rev 8433)
@@ -17,6 +17,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.RoutingContext;
@@ -50,6 +51,8 @@
private final Filter filter;
private final Transformer transformer;
+
+ private final StorageManager storageManager;
public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName,
@@ -57,7 +60,8 @@
final boolean exclusive,
final Filter filter,
final Transformer transformer,
- final PostOffice postOffice)
+ final PostOffice postOffice,
+ final StorageManager storageManager)
{
this.forwardAddress = forwardAddress;
@@ -72,6 +76,8 @@
this.transformer = transformer;
this.postOffice = postOffice;
+
+ this.storageManager = storageManager;
}
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
@@ -81,17 +87,16 @@
// We must make a copy of the message, otherwise things like returning credits to
the page won't work
// properly on ack, since the original destination will be overwritten
- // TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
+ // TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
- ServerMessage copy = message.copy();
+ long id = storageManager.generateUniqueID();
+ ServerMessage copy = message.copy(id);
- // Setting the messageID to 0. The postOffice should set a new one
- copy.setMessageID(0);
-
+ // This will set the original MessageId, and the original destination
+ copy.setOriginalHeaders(message, false);
+
copy.setDestination(forwardAddress);
- copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
-
if (transformer != null)
{
copy = transformer.transform(copy);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-27 21:15:05
UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-27 21:20:56
UTC (rev 8433)
@@ -1414,7 +1414,8 @@
config.isExclusive(),
filter,
transformer,
- postOffice);
+ postOffice,
+ storageManager);
// pagingManager,
// storageManager);
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-27
21:15:05 UTC (rev 8432)
+++
trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-27
21:20:56 UTC (rev 8433)
@@ -45,8 +45,20 @@
{
private static final Logger log = Logger.getLogger(DivertTest.class);
+ final int minLargeMessageSize =
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2;
+
public void testPersistentDivert() throws Exception
{
+ doTestPersistentDivert(false);
+ }
+
+ public void testPersistentDiverLargeMessage() throws Exception
+ {
+ doTestPersistentDivert(true);
+ }
+
+ public void doTestPersistentDivert(boolean largeMessage) throws Exception
+ {
Configuration conf = createDefaultConfig();
conf.setClustered(true);
@@ -121,6 +133,11 @@
{
ClientMessage message = session.createClientMessage(true);
+ if (largeMessage)
+ {
+ message.setBodyInputStream(createFakeLargeStream(minLargeMessageSize));
+ }
+
message.putIntProperty(propKey, i);
producer.send(message);
@@ -128,12 +145,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -141,12 +163,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -154,12 +181,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -167,12 +199,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -183,9 +220,30 @@
messagingService.stop();
}
+
+ /**
+ * @param message
+ */
+ private void checkLargeMessage(ClientMessage message)
+ {
+ for (int j = 0 ; j < minLargeMessageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), message.getBodyBuffer().readByte());
+ }
+ }
public void testPersistentDivertRestartBeforeConsume() throws Exception
{
+ doTestPersistentDivertRestartBeforeConsume(false);
+ }
+
+ public void testPersistentDivertRestartBeforeConsumeLargeMessage() throws Exception
+ {
+ doTestPersistentDivertRestartBeforeConsume(true);
+ }
+
+ public void doTestPersistentDivertRestartBeforeConsume(boolean largeMessage) throws
Exception
+ {
Configuration conf = createDefaultConfig();
conf.setClustered(true);
@@ -251,6 +309,12 @@
ClientMessage message = session.createClientMessage(true);
message.putIntProperty(propKey, i);
+
+ if (largeMessage)
+ {
+ message.setBodyInputStream(createFakeLargeStream(minLargeMessageSize));
+ }
+
producer.send(message);
}
@@ -281,10 +345,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
@@ -294,10 +363,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
@@ -307,10 +381,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
@@ -320,10 +399,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2009-11-27
21:15:05 UTC (rev 8432)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2009-11-27
21:20:56 UTC (rev 8433)
@@ -112,7 +112,6 @@
for (int i = 0; i < 200; i++)
{
- System.out.println("Sent " + i);
producer.send(message);
}