[jboss-cvs] JBoss Messaging SVN: r5627 - branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 13 18:37:49 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-13 18:37:49 -0500 (Tue, 13 Jan 2009)
New Revision: 5627
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Change on SendLargeMessage
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-13 22:56:14 UTC (rev 5626)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-13 23:37:49 UTC (rev 5627)
@@ -26,7 +26,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -94,6 +94,8 @@
private final int minLargeMessageSize;
private final ServerSession session;
+
+ private final Executor executor;
private final Lock lock = new ReentrantLock();
@@ -134,7 +136,8 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice,
final Channel channel,
- final boolean preAcknowledge)
+ final boolean preAcknowledge,
+ final Executor executor)
{
this.id = id;
@@ -143,6 +146,8 @@
this.filter = filter;
this.session = session;
+
+ this.executor = executor;
this.started = browseOnly || started;
@@ -486,11 +491,20 @@
{
if (largeMessageSender != null)
{
- if (largeMessageSender.sendLargeMessage())
- {
- // prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
- }
+ // If there is a LargeMessageSender, it needs to first resume & finalize sending the current message...
+ // and go for session.promptDelivery only after the message was finished.
+ // We also need to use an executor, or else the LargeMessageSend will be performed at the caller's thread
+ executor.execute(new Runnable(){
+
+ public void run()
+ {
+ if (largeMessageSender.sendLargeMessage())
+ {
+ // prompt Delivery only if chunk was finished
+ session.promptDelivery(messageQueue);
+ }
+ }
+ });
}
else
{
@@ -593,7 +607,7 @@
// TODO: Should we block until the replication is done?
channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
- // SendLargeMessage has to be done on the same thread used on the QueueImpl, or we would have problems.
+ // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with ordering and flow control
largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
largeMessageSender.sendLargeMessage();
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-13 22:56:14 UTC (rev 5626)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-13 23:37:49 UTC (rev 5627)
@@ -1309,7 +1309,8 @@
queueSettingsRepository,
postOffice,
channel,
- preAcknowledge);
+ preAcknowledge,
+ this.executor);
consumers.put(consumer.getID(), consumer);
More information about the jboss-cvs-commits
mailing list