[jboss-cvs] JBoss Messaging SVN: r5628 - branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 13 19:57:44 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-13 19:57:44 -0500 (Tue, 13 Jan 2009)
New Revision: 5628
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
tweaks
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-13 23:37:49 UTC (rev 5627)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-14 00:57:44 UTC (rev 5628)
@@ -26,7 +26,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -94,8 +93,6 @@
private final int minLargeMessageSize;
private final ServerSession session;
-
- private final Executor executor;
private final Lock lock = new ReentrantLock();
@@ -136,8 +133,7 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice,
final Channel channel,
- final boolean preAcknowledge,
- final Executor executor)
+ final boolean preAcknowledge)
{
this.id = id;
@@ -146,8 +142,6 @@
this.filter = filter;
this.session = session;
-
- this.executor = executor;
this.started = browseOnly || started;
@@ -489,26 +483,25 @@
private void promptDelivery()
{
- if (largeMessageSender != null)
+ lock.lock();
+ try
{
- // If there is a LargeMessageSender, it needs to first resume & finalize sending the current message...
- // and go for session.promptDelivery only after the message was finished.
- // We also need to use an executor, or else the LargeMessageSend will be performed at the caller's thread
- executor.execute(new Runnable(){
-
- public void run()
+ if (largeMessageSender != null)
+ {
+ if (largeMessageSender.sendLargeMessage())
{
- if (largeMessageSender.sendLargeMessage())
- {
- // prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
- }
+ // prompt Delivery only if chunk was finished
+ session.promptDelivery(messageQueue);
}
- });
+ }
+ else
+ {
+ session.promptDelivery(messageQueue);
+ }
}
- else
+ finally
{
- session.promptDelivery(messageQueue);
+ lock.unlock();
}
}
@@ -555,6 +548,8 @@
// This has to be checked inside the lock as the set to null is done inside the lock
if (largeMessageSender != null)
{
+ log.info("doHandle: LargeMessageSender != null," + messageQueue.isBackup() +
+ " can't send another message while send is pending", new Exception("trace"));
if (trace)
{
trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
@@ -607,7 +602,8 @@
// TODO: Should we block until the replication is done?
channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
- // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with ordering and flow control
+ // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with
+ // ordering and flow control
largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
largeMessageSender.sendLargeMessage();
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-13 23:37:49 UTC (rev 5627)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-14 00:57:44 UTC (rev 5628)
@@ -990,19 +990,20 @@
{
DelayedResult result = channel.replicatePacket(packet);
- if (result == null)
+ try
{
- try
- {
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- log.error("Failed to receive credits", e);
- }
+ // Note we don't wait for response before handling this
- channel.confirm(packet);
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
+ if (result == null)
+ {
+ channel.confirm(packet);
}
else
{
@@ -1010,16 +1011,7 @@
{
public void run()
{
- try
- {
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- log.error("Failed to receive credits", e);
- }
channel.confirm(packet);
-
}
});
}
@@ -1309,8 +1301,7 @@
queueSettingsRepository,
postOffice,
channel,
- preAcknowledge,
- this.executor);
+ preAcknowledge);
consumers.put(consumer.getID(), consumer);
More information about the jboss-cvs-commits
mailing list