[jboss-cvs] JBoss Messaging SVN: r3205 - in trunk/src/main/org/jboss: messaging/core/contract and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 19 09:22:08 EDT 2007
Author: timfox
Date: 2007-10-19 09:22:08 -0400 (Fri, 19 Oct 2007)
New Revision: 3205
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
tweaks
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-19 13:22:08 UTC (rev 3205)
@@ -179,9 +179,9 @@
private Object waitLock = new Object();
- //debug
- private SynchronizedInt toDeliverCount = new SynchronizedInt(0);
+ private boolean dosync = true;
+
// Constructors ---------------------------------------------------------------------------------
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
@@ -621,7 +621,7 @@
if (supportsFailover)
{
- postOffice.sendReplicateDeliveryMessage(queueName, id, del.getReference().getMessage().getMessageID(), deliveryId, false, true);
+ postOffice.sendReplicateDeliveryMessage(queueName, id, del.getReference().getMessage().getMessageID(), deliveryId, false, true, true);
}
}
}
@@ -911,8 +911,6 @@
while (iter.hasNext())
{
toDeliver.put(iter.next());
-
- this.toDeliverCount.increment();
}
}
@@ -1063,8 +1061,6 @@
{
toDeliver.take();
- this.toDeliverCount.decrement();
-
performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer());
delivered = true;
@@ -1300,7 +1296,6 @@
//Clear toDeliver
while (toDeliver.poll(0) != null)
{
- this.toDeliverCount.decrement();
}
log.warn("Timed out waiting for response to arrive");
@@ -1370,16 +1365,12 @@
//producer in flight (since np don't need to be replicated)
toDeliver.put(rec);
- this.toDeliverCount.increment();
-
//Race check (there's a small chance the message in the queue got removed between the empty check
//and the put so we do another check:
if (toDeliver.peek() == rec)
{
toDeliver.take();
- this.toDeliverCount.decrement();
-
performDelivery(delivery.getReference(), deliveryId, consumer);
}
}
@@ -1403,11 +1394,11 @@
toDeliver.put(rec);
- this.toDeliverCount.increment();
-
postOffice.sendReplicateDeliveryMessage(consumer.getQueueName(), id,
delivery.getReference().getMessage().getMessageID(),
- deliveryId, true, false);
+ deliveryId, true, false, dosync);
+
+ performDelivery(delivery.getReference(), deliveryId, consumer);
}
else
{
@@ -1420,10 +1411,7 @@
// Actually do the delivery now - we are only node in the cluster
performDelivery(delivery.getReference(), deliveryId, consumer);
}
- }
-
- //log.info("del count " + this.toDeliverCount.get());
-
+ }
}
void performDelivery(MessageReference ref, long deliveryID, ServerConsumerEndpoint consumer)
Modified: trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-10-19 13:22:08 UTC (rev 3205)
@@ -143,7 +143,7 @@
//FIXME - these do not belong here - only here temporarily until we implement generic Handler/Message abstraction
- void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID, boolean reply, boolean sync)
+ void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID, boolean reply, boolean sync, boolean bodge)
throws Exception;
void sendReplicateAckMessage(String queueName, long messageID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-10-19 13:22:08 UTC (rev 3205)
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Vector;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.ChannelFactory;
@@ -44,10 +45,6 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
*
* This class handles the interface with JGroups
@@ -266,6 +263,37 @@
}
}
+ public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
+ {
+ if (startedState == STARTED)
+ {
+ if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
+
+ Message message = new Message(address, null, writeRequest(request));
+
+ Vector v = new Vector();
+ v.add(address);
+
+ RspList rspList =
+ dispatcher.castMessage(v, message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);
+
+ if (sync)
+ {
+ Iterator iter = rspList.values().iterator();
+
+ while (iter.hasNext())
+ {
+ Rsp rsp = (Rsp)iter.next();
+
+ if (!rsp.wasReceived())
+ {
+ throw new IllegalStateException(this + " response not received from " + rsp.getSender() + " - there may be others");
+ }
+ }
+ }
+ }
+ }
+
public void multicastData(ClusterRequest request) throws Exception
{
if (startedState == STARTED)
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-19 13:22:08 UTC (rev 3205)
@@ -612,7 +612,7 @@
//TODO - these don't belong here
public void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID,
- boolean reply, boolean sync)
+ boolean reply, boolean sync, boolean bodgesync)
throws Exception
{
//We use a semaphore to limit the number of outstanding replicates we can send without getting a response
@@ -631,7 +631,7 @@
Address replyAddress = null;
- if (reply)
+ if (reply && !bodgesync)
{
//TODO optimise this
@@ -648,8 +648,15 @@
Address address = getFailoverNodeDataChannelAddress();
if (address != null)
- {
- groupMember.unicastData(request, address);
+ {
+ if (bodgesync)
+ {
+ groupMember.unicastControl(request, address, true);
+ }
+ else
+ {
+ groupMember.unicastData(request, address);
+ }
}
}
catch (Exception e)
More information about the jboss-cvs-commits
mailing list