[jboss-cvs] JBoss Messaging SVN: r1396 - in trunk/src/main/org/jboss: jms/client/remoting jms/server/connectionfactory jms/server/remoting messaging/core messaging/core/message messaging/core/plugin/postoffice messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 29 06:25:57 EDT 2006
Author: timfox
Date: 2006-09-29 06:25:41 -0400 (Fri, 29 Sep 2006)
New Revision: 1396
Modified:
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
Log:
Removed info logging and added trace logging
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -28,7 +28,6 @@
import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.remoting.MessagingMarshallable;
-import org.jboss.logging.Logger;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.ServerInvocationHandler;
import org.jboss.remoting.ServerInvoker;
@@ -50,9 +49,6 @@
*/
public class CallbackManager implements ServerInvocationHandler
{
- private static final Logger log = Logger.getLogger(CallbackManager.class);
-
-
protected Map callbackHandlers;
public CallbackManager()
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -85,8 +85,6 @@
String locatorURI = (String)server.getAttribute(connectorObjectName, "InvokerLocator");
- log.info("******* LOCATOR URI IS " + locatorURI);
-
ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
connectionFactoryManager = serverPeer.getConnectionFactoryManager();
Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -29,7 +29,6 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
-import org.jboss.logging.Logger;
import org.jboss.remoting.serialization.IMarshalledValue;
import org.jboss.remoting.serialization.SerializationManager;
@@ -52,9 +51,6 @@
*/
public class MessagingSerializationManager extends SerializationManager
{
- private static final Logger log = Logger.getLogger(MessagingSerializationManager.class);
-
-
public IMarshalledValue createdMarshalledValue(Object arg0) throws IOException
{
throw new UnsupportedOperationException();
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -32,7 +32,6 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PersistenceManager.InitialLoadInfo;
import org.jboss.messaging.core.plugin.contract.PersistenceManager.ReferenceInfo;
-import org.jboss.messaging.core.tx.Transaction;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
Modified: trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -28,7 +28,6 @@
import java.io.Serializable;
import java.util.Map;
-import org.jboss.logging.Logger;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.util.StreamUtils;
@@ -45,9 +44,7 @@
public abstract class MessageSupport extends RoutableSupport implements Message
{
// Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(MessageSupport.class);
-
+
// Attributes ----------------------------------------------------
// Must be hidden from subclasses
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -67,6 +67,8 @@
public class DefaultPostOffice extends JDBCSupport implements PostOffice
{
private static final Logger log = Logger.getLogger(DefaultPostOffice.class);
+
+ private boolean trace = log.isTraceEnabled();
private String officeName;
@@ -128,23 +130,30 @@
public void start() throws Exception
{
- log.info(this + " starting");
+ if (trace) { log.trace(this + " starting"); }
+
super.start();
loadBindings();
- log.info(this + " started");
+ if (trace) { log.trace(this + " started"); }
}
public void stop() throws Exception
{
+ if (trace) { log.trace(this + " stopping"); }
+
super.stop();
+
+ if (trace) { log.trace(this + " stopped"); }
}
// PostOffice implementation ---------------------------------------
public Binding bindQueue(String condition, Queue queue) throws Exception
{
+ if (trace) { log.trace(this + " binding queue " + queue.getName() + " with condition " + condition); }
+
if (queue.getName() == null)
{
throw new IllegalArgumentException("Queue name is null");
@@ -194,6 +203,8 @@
public Binding unbindQueue(String queueName) throws Throwable
{
+ if (trace) { log.trace(this + " unbinding queue " + queueName); }
+
if (queueName == null)
{
throw new IllegalArgumentException("Queue name is null");
@@ -285,6 +296,8 @@
public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
{
+ if (trace) { log.trace(this + " routing ref " + ref + " with condition " + condition + " and transaction " + tx); }
+
if (ref == null)
{
throw new IllegalArgumentException("Message reference is null");
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
-import org.jboss.logging.Logger;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.tx.TxCallback;
@@ -61,8 +60,6 @@
*/
class CastMessagesCallback implements TxCallback
{
- private static final Logger log = Logger.getLogger(CastMessagesCallback.class);
-
private List persistent;
private List nonPersistent;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -80,6 +80,8 @@
public class DefaultClusteredPostOffice extends DefaultPostOffice implements ClusteredPostOffice, PostOfficeInternal
{
private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
+
+ private boolean trace = log.isTraceEnabled();
private Channel syncChannel;
@@ -276,16 +278,16 @@
super.start();
Address currentAddress = syncChannel.getLocalAddress();
-
- log.info(this.nodeId + " address is " + currentAddress);
-
+
handleAddressNodeMapping(currentAddress, nodeId);
syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));
statsSender.start();
- started = true;
+ started = true;
+
+ if (log.isTraceEnabled()) { log.trace("Started " + this + " with address " + currentAddress); }
}
public synchronized void stop() throws Exception
@@ -299,14 +301,19 @@
asyncChannel.close();
started = false;
+
+ if (log.isTraceEnabled()) { log.trace("Stopped " + this); }
}
// PostOffice implementation ---------------------------------------
public Binding bindClusteredQueue(String condition, LocalClusteredQueue queue) throws Exception
{
- log.info(this.nodeId + " binding clustered queue: " + queue + " with condition: " + condition);
-
+ if (log.isTraceEnabled())
+ {
+ log.trace(this.nodeId + " binding clustered queue: " + queue + " with condition: " + condition);
+ }
+
if (queue.getNodeId() != this.nodeId)
{
throw new IllegalArgumentException("Queue node id does not match office node id");
@@ -325,6 +332,11 @@
public Binding unbindClusteredQueue(String queueName) throws Throwable
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this.nodeId + " unbind clustered queue: " + queueName);
+ }
+
Binding binding = (Binding)super.unbindQueue(queueName);
UnbindRequest request = new UnbindRequest(nodeId, queueName);
@@ -336,6 +348,11 @@
public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
{
+ if (trace)
+ {
+ log.trace(this.nodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
+ }
+
if (ref == null)
{
throw new IllegalArgumentException("Message reference is null");
@@ -370,6 +387,10 @@
// We need to do this if there is anything other than
// No durable subs or exactly one local durable sub
startInternalTx = true;
+ if (trace)
+ {
+ log.trace(this.nodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
+ }
}
}
@@ -400,7 +421,11 @@
ClusteredQueue queue = (ClusteredQueue)del.getObserver();
- log.info("Routing message to queue:" + queue.getName() + " on node " + queue.getNodeId());
+ if (trace)
+ {
+ log.trace(this.nodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
+ queue.getNodeId() +" local:" + queue.isLocal());
+ }
if (router.numberOfReceivers() > 1)
{
@@ -483,6 +508,7 @@
if (startInternalTx)
{
tx.commit();
+ if (trace) { log.trace("Committed internal transaction"); }
}
}
}
@@ -510,7 +536,10 @@
{
lock.writeLock().acquire();
- log.info(this.nodeId + " adding binding from node: " + nodeId +" queue: " + queueName + " with condition: " + condition);
+ if (log.isTraceEnabled())
+ {
+ log.trace(this.nodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);
+ }
try
{
@@ -553,6 +582,11 @@
{
lock.writeLock().acquire();
+ if (log.isTraceEnabled())
+ {
+ log.trace(this.nodeId + " removing binding from node: " + nodeId + " queue: " + queueName);
+ }
+
try
{
// Sanity
@@ -573,6 +607,11 @@
{
lock.writeLock().acquire();
+ if (trace)
+ {
+ log.trace(this.nodeId + " Adding address node mapping for " + address + " and " + nodeId);
+ }
+
try
{
nodeIdAddressMap.put(new Integer(nodeId), address);
@@ -586,9 +625,12 @@
public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey,
Map queueNameNodeIdMap) throws Exception
{
- log.info(this.nodeId + " received route from cluster, ref = " + message.getMessageID() + " routing key " +
- routingKey + " map " + queueNameNodeIdMap);
-
+ if (trace)
+ {
+ log.trace(this.nodeId + " routing from cluster, message: " + message + " routing key " +
+ routingKey + " map " + queueNameNodeIdMap);
+ }
+
lock.readLock().acquire();
// Need to reference the message
@@ -639,7 +681,12 @@
LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
- Delivery del = queue.handleFromCluster(ref);
+ Delivery del = queue.handleFromCluster(ref);
+
+ if (trace)
+ {
+ log.trace(this.nodeId + " queue " + queue.getName() + " handled reference from cluster " + del);
+ }
}
}
}
@@ -659,9 +706,11 @@
* Multicast a message to all members of the group
*/
public void asyncSendRequest(ClusterRequest request) throws Exception
- {
+ {
+ if (trace) { log.trace(this.nodeId + " sending asynch request to group, request: " + request); }
+
byte[] bytes = writeRequest(request);
-
+
asyncChannel.send(new Message(null, null, bytes));
}
@@ -670,8 +719,12 @@
*/
public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
{
+ if (trace) { log.trace(this.nodeId + " sending asynch request to single node, request: " + request + " node " + nodeId); }
+
Address address = this.getAddressForNodeId(nodeId);
+ if (trace) { log.trace(this.nodeId + " sending to address " + address); }
+
if (address == null)
{
throw new IllegalArgumentException("Cannot find address for node " + nodeId);
@@ -689,8 +742,12 @@
*/
public Object syncSendRequest(ClusterRequest request, int nodeId, boolean ignoreNoAddress) throws Exception
{
+ if (trace) { log.trace(this.nodeId + " sending synch request to single node, request: " + request + " node " + nodeId); }
+
Address address = this.getAddressForNodeId(nodeId);
+ if (trace) { log.trace(this.nodeId + " sending to address " + address); }
+
if (address == null)
{
if (ignoreNoAddress)
@@ -708,7 +765,9 @@
Message message = new Message(address, null, bytes);
Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
-
+
+ if (trace) { log.trace(this.nodeId + " received response: " + result); }
+
return result;
}
@@ -720,11 +779,15 @@
synchronized (holdingArea)
{
holdingArea.put(id, tx);
+
+ if (trace) { log.trace(this.nodeId + " added transaction " + tx + " to holding area with id " + id); }
}
}
public void commitTransaction(TransactionId id) throws Throwable
{
+ if (trace) { log.trace(this.nodeId + " committing transaction " + id ); }
+
ClusterTransaction tx = null;
synchronized (holdingArea)
@@ -738,6 +801,8 @@
}
tx.commit(this);
+
+ if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
}
/**
@@ -745,6 +810,8 @@
*/
public void check(Integer nodeId) throws Throwable
{
+ if (trace) { log.trace(this.nodeId + " checking for any stranded transactions for node " + nodeId); }
+
synchronized (holdingArea)
{
Iterator iter = holdingArea.entrySet().iterator();
@@ -761,8 +828,12 @@
{
ClusterTransaction tx = (ClusterTransaction)iter.next();
+ if (trace) { log.trace("Found transaction " + tx + " in holding area"); }
+
boolean commit = tx.check(this);
+ if (trace) { log.trace(this.nodeId + " transaction " + tx + " will be committed?: " + commit); }
+
if (commit)
{
tx.commit(this);
@@ -773,6 +844,8 @@
}
toRemove.add(tx);
+
+ if (trace) { log.trace(this.nodeId + " resolved " + tx); }
}
}
@@ -787,6 +860,7 @@
holdingArea.remove(id);
}
}
+ if (trace) { log.trace(this.nodeId + " check complete"); }
}
public synchronized void sendQueueStats() throws Exception
@@ -828,6 +902,8 @@
}
statsList.add(stats);
+
+ if (trace) { log.trace(this.nodeId + " adding stat for send " + stats); }
}
}
}
@@ -843,6 +919,8 @@
ClusterRequest req = new QueueStatsRequest(nodeId, statsList);
asyncSendRequest(req);
+
+ if (trace) { log.trace(this.nodeId + " Sent stats"); }
}
}
@@ -850,6 +928,8 @@
{
lock.readLock().acquire();
+ if (trace) { log.trace(this.nodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
+
try
{
if (nodeId == this.nodeId)
@@ -863,7 +943,7 @@
if (nameMap == null)
{
//This is ok, the node might have left
- log.info("But I have no bindings for " + nodeId);
+ if (trace) { log.trace(this.nodeId + " cannot find node in name map, i guess the node might have left?"); }
}
else
{
@@ -884,20 +964,28 @@
stub.setStats(st);
+ if (trace) { log.trace(this.nodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
+
ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
+ //Maybe the local queue now wants to pull message(s) from the remote queue given that the
+ //stats for the remote queue have changed
LocalClusteredQueue localQueue = router.getLocalQueue();
if (localQueue != null)
{
RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
+ if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+
if (toQueue != null)
{
localQueue.setPullInfo(toQueue, pullSize);
//We now trigger delivery - this may cause a pull event
localQueue.deliver(false);
+
+ if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
}
}
}
@@ -916,6 +1004,8 @@
public List getDeliveries(String queueName, int numMessages) throws Exception
{
+ if (trace) { log.trace(this.nodeId + " getting max " + numMessages + " deliveries for " + queueName); }
+
Binding binding = getBindingForQueueName(queueName);
if (binding == null)
@@ -927,6 +1017,8 @@
List dels = queue.getDeliveries(numMessages);
+ if (trace) { log.trace(this.nodeId + " retrieved " + dels.size() + " deliveries from " + queueName); }
+
return dels;
}
@@ -943,9 +1035,7 @@
lock.readLock().release();
}
}
-
-
-
+
// Public ------------------------------------------------------------------------------------------
// Protected ---------------------------------------------------------------------------------------
@@ -1026,6 +1116,8 @@
protected void loadBindings() throws Exception
{
+ if (trace) { log.trace(this.nodeId + " loading bindings"); }
+
// TODO I need to know whether this call times out - how do I know this??
boolean isState = syncChannel.getState(null, stateTimeout);
@@ -1033,7 +1125,7 @@
{
//Must be first member in group or non clustered- we load the state ourself from the database
- log.info("First member - so loading bindings from db");
+ if (trace) { log.trace(this.nodeId + " First member of group- so loading bindings from db"); }
super.loadBindings();
}
@@ -1041,7 +1133,7 @@
{
//The state will be set in due course via the MessageListener - we must wait until this happens
- log.info("Not first member - so loading state from group.. waiting");
+ if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
synchronized (setStateLock)
{
@@ -1052,7 +1144,7 @@
}
}
- log.info("Got state");
+ if (trace) { log.trace(this.nodeId + " Received state"); }
}
}
@@ -1078,23 +1170,23 @@
return binding;
}
-
-
+
// Private ------------------------------------------------------------------------------------------
-
-
-
/*
* Multicast a sync request
*/
private void syncSendRequest(ClusterRequest request) throws Exception
- {
+ {
+ if (trace) { log.trace(this.nodeId + " sending synch request to group, request: " + request); }
+
byte[] bytes = writeRequest(request);
Message message = new Message(null, null, bytes);
controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
+
+ if (trace) { log.trace(this.nodeId + " sent and executed ok"); }
}
@@ -1206,12 +1298,14 @@
private void processStateBytes(byte[] bytes) throws Exception
{
- log.info("Receiving state from group...");
+ if (trace) { log.trace(this.nodeId + " received state from group"); }
SharedState state = new SharedState();
StreamUtils.fromBytes(state, bytes);
+ if (trace) { log.trace(this.nodeId + " received " + state.getBindings().size() + " bindings and map " + state.getNodeIdAddressMap()); }
+
nameMaps.clear();
conditionMap.clear();
@@ -1365,7 +1459,7 @@
public void viewAccepted(View view)
{
- log.info("Got new view, size=" + view.size());
+ if (trace) { log.trace(nodeId + " Got new view, size=" + view.size()); }
if (currentView != null)
{
@@ -1380,6 +1474,8 @@
//Member must have left
//We don't remove bindings for ourself
+ if (trace) { log.trace(nodeId + " it seems that member " + address + " has left the group"); }
+
Address currentAddress = syncChannel.getLocalAddress();
if (!address.equals(currentAddress))
@@ -1393,11 +1489,15 @@
throw new IllegalStateException("Cannot find node id for address: " + address);
}
+ if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " Performing cleanup for node " + nodeId); }
+
//Perform a check - the member might have crashed and left uncommitted transactions
//we need to resolve this
check(nodeId);
+
+ removeBindingsForAddress(nodeId);
- removeBindingsForAddress(nodeId);
+ if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " cleanup complete"); }
}
catch (Throwable e)
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -60,6 +60,8 @@
public class DefaultRouter implements ClusterRouter
{
private static final Logger log = Logger.getLogger(DefaultRouter.class);
+
+ private boolean trace = log.isTraceEnabled();
//MUST be an arraylist for fast index access
private ArrayList queues;
@@ -143,6 +145,8 @@
public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
{
+ if (trace) { log.trace(this + " routing ref " + reference); }
+
//Favour the local queue
if (localQueue != null)
@@ -153,6 +157,8 @@
Delivery del = localQueue.handle(observer, reference, tx);
+ if (trace) { log.trace(this + " routed to local queue, it returned " + del); }
+
return del;
}
else
@@ -165,6 +171,8 @@
ClusteredQueue queue = (ClusteredQueue)queues.get(target);
Delivery del = queue.handle(observer, reference, tx);
+
+ if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
target++;
@@ -177,6 +185,9 @@
return del;
}
}
+
+ if (trace) { log.trace(this + " no queues to route to so return null"); }
+
return null;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -55,6 +55,8 @@
public class LocalClusteredQueue extends PagingFilteredQueue implements ClusteredQueue
{
private static final Logger log = Logger.getLogger(LocalClusteredQueue.class);
+
+ private boolean trace = log.isTraceEnabled();
private PostOfficeInternal office;
@@ -194,10 +196,14 @@
public Delivery handleFromCluster(MessageReference ref)
throws Exception
{
+ if (trace) { log.trace("Handling ref from cluster: " + ref); }
+
if (filter != null && !filter.accept(ref))
{
Delivery del = new SimpleDelivery(this, ref, true, false);
+ if (trace) { log.trace("Reference " + ref + " rejected by filter"); }
+
return del;
}
@@ -220,9 +226,7 @@
}
protected void deliverInternal(boolean handle) throws Throwable
- {
- log.info("in local clustered queue deliver internal");
-
+ {
int beforeSize = -1;
if (!handle)
@@ -236,7 +240,11 @@
{
int afterSize = messageRefs.size();
- log.info("receiversready:" + receiversReady + " before size:" + beforeSize + " afterSize: " + afterSize);
+ if (trace)
+ {
+ log.trace(this + " Deciding whether to pull messages. " +
+ "receiversready:" + receiversReady + " before size:" + beforeSize + " afterSize: " + afterSize);
+ }
if (receiversReady && beforeSize == 0 && afterSize == 0)
{
@@ -301,8 +309,11 @@
ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
name, thePullSize);
- log.info(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
- " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
+ if (trace)
+ {
+ log.trace(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
+ " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
+ }
byte[] bytes = (byte[])office.syncSendRequest(req, theQueue.getNodeId(), true);
@@ -312,15 +323,13 @@
return;
}
- log.info( System.identityHashCode(this) +" Executed pull messages request");
-
PullMessagesResponse response = new PullMessagesResponse();
StreamUtils.fromBytes(response, bytes);
List msgs = response.getMessages();
- log.info(System.identityHashCode(this) + " I retrieved " + msgs.size() + " messages");
+ if (trace) { log.trace(System.identityHashCode(this) + " I retrieved " + msgs.size() + " messages from pull"); }
Iterator iter = msgs.iterator();
@@ -330,7 +339,7 @@
if (msg.isReliable())
{
- //It will alerady have been persisted on the other node
+ //It will already have been persisted on the other node
msg.setPersisted(true);
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NullMessagePullPolicy.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -23,6 +23,8 @@
import java.util.List;
+import org.jboss.logging.Logger;
+
/**
*
* A NullMessagePullPolicy
@@ -35,13 +37,18 @@
*/
public class NullMessagePullPolicy implements MessagePullPolicy
{
+ private static final Logger log = Logger.getLogger(NullMessagePullPolicy.class);
+ private boolean trace = log.isTraceEnabled();
+
public NullMessagePullPolicy()
{
}
public ClusteredQueue chooseQueue(List queues)
{
+ if (trace) { log.trace(this + " always returning null"); }
+
return null;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java 2006-09-29 08:46:00 UTC (rev 1395)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/QueueStats.java 2006-09-29 10:25:41 UTC (rev 1396)
@@ -39,10 +39,6 @@
{
private String queueName;
- // private float addRate;
-
- // private float consumeRate;
-
private int messageCount;
public QueueStats()
@@ -53,24 +49,10 @@
public QueueStats(String queueName, int messageCount)
{
this.queueName = queueName;
-
- // this.addRate = addRate;
-
- // this.consumeRate = consumeRate;
-
+
this.messageCount = messageCount;
}
-// float getAddRate()
-// {
-// return addRate;
-// }
-//
-// float getConsumeRate()
-// {
-// return consumeRate;
-// }
-
int getMessageCount()
{
return messageCount;
@@ -85,21 +67,18 @@
{
queueName = in.readUTF();
-// addRate = in.readFloat();
-//
-// consumeRate = in.readFloat();
-
messageCount = in.readInt();
}
public void write(DataOutputStream out) throws Exception
{
out.writeUTF(queueName);
-
-// out.writeFloat(addRate);
-//
-// out.writeFloat(consumeRate);
-
+
out.writeInt(messageCount);
- }
+ }
+
+ public String toString()
+ {
+ return "QueueStats[" + System.identityHashCode(this) + "] queueName: " + queueName + " messageCount: " + messageCount;
+ }
}
More information about the jboss-cvs-commits
mailing list