[jboss-cvs] JBoss Messaging SVN: r5630 - branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 13 21:28:22 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-13 21:28:21 -0500 (Tue, 13 Jan 2009)
New Revision: 5630
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
tweaks
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-14 01:12:44 UTC (rev 5629)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-14 02:28:21 UTC (rev 5630)
@@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -94,6 +95,8 @@
private final ServerSession session;
+ private final Executor executor;
+
private final Lock lock = new ReentrantLock();
private AtomicInteger availableCredits = new AtomicInteger(0);
@@ -133,7 +136,8 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice,
final Channel channel,
- final boolean preAcknowledge)
+ final boolean preAcknowledge,
+ final Executor executor)
{
this.id = id;
@@ -143,6 +147,8 @@
this.session = session;
+ this.executor = executor;
+
this.started = browseOnly || started;
this.browseOnly = browseOnly;
@@ -485,11 +491,7 @@
{
if (largeMessageSender != null)
{
- if (largeMessageSender.sendLargeMessage())
- {
- // prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
- }
+ resumeLargeMessage();
}
else
{
@@ -497,6 +499,25 @@
}
}
+ /**
+ *
+ */
+ private void resumeLargeMessage()
+ {
+ if (messageQueue.isBackup())
+ {
+ // We are supposed to finish largeMessageSender, or use all the possible credits before we return this method.
+ // If we play the commands on a different order than how they were generated on the live node, we will
+ // eventually still be running this largeMessage before the next message come, what would reject messages
+ // from the cluster
+ largeMessageSender.resumeLargeMessageRunnable.run();
+ }
+ else
+ {
+ executor.execute(largeMessageSender.resumeLargeMessageRunnable);
+ }
+ }
+
private HandleStatus doHandle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
@@ -540,8 +561,6 @@
// This has to be checked inside the lock as the set to null is done inside the lock
if (largeMessageSender != null)
{
- log.info("doHandle: LargeMessageSender != null," + messageQueue.isBackup() +
- " can't send another message while send is pending", new Exception("trace"));
if (trace)
{
trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
@@ -656,6 +675,18 @@
/** The current position on the message being processed */
private volatile long positionPendingLargeMessage;
+ final Runnable resumeLargeMessageRunnable = new Runnable()
+ {
+ public void run()
+ {
+ if (sendLargeMessage())
+ {
+ // prompt Delivery only if chunk was finished
+ session.promptDelivery(messageQueue);
+ }
+ }
+ };
+
public LargeMessageSender(final LargeServerMessage message, final MessageReference ref)
{
pendingLargeMessage = message;
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-14 01:12:44 UTC (rev 5629)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-14 02:28:21 UTC (rev 5630)
@@ -1301,7 +1301,8 @@
queueSettingsRepository,
postOffice,
channel,
- preAcknowledge);
+ preAcknowledge,
+ executor);
consumers.put(consumer.getID(), consumer);
More information about the jboss-cvs-commits
mailing list