[jboss-cvs] JBoss Messaging SVN: r2452 - in trunk: src/main/org/jboss/jms/client/container and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 26 19:43:46 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-02-26 19:43:45 -0500 (Mon, 26 Feb 2007)
New Revision: 2452
Modified:
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/FailoverValve2.java
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/tests/etc/container-qalab.xml
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
minor
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -100,7 +100,8 @@
remotingConnection.setFailed();
}
- //Note - failover doesn't occur until _after_ the above check - so the next comment belongs here
+ // Note - failover doesn't occur until _after_ the above check - so the next comment
+ // belongs here
log.debug(this + " starting client-side failover");
// generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
Modified: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -53,18 +53,18 @@
private boolean locked;
+ // Constructors ---------------------------------------------------------------------------------
+
public FailoverValve2()
{
trace = log.isTraceEnabled();
-
+
if (trace)
{
threads = new HashSet();
}
}
- // Constructors ---------------------------------------------------------------------------------
-
// Public ---------------------------------------------------------------------------------------
public synchronized void enter()
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -174,10 +174,10 @@
{
if (isClosing)
{
- //We make sure we remove ourself AFTER the invocation has been made
- //otherwise in a failover situation we would end up divorced from the hierarchy
- //and failover will not occur properly since failover would not be able to
- //traverse the hierarchy and update the delegates properly
+ // We make sure we remove ourself AFTER the invocation has been made otherwise in a
+ // failover situation we would end up divorced from the hierarchy and failover will not
+ // occur properly since failover would not be able to traverse the hierarchy and update
+ // the delegates properly
removeSelf(invocation);
closing();
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -34,7 +34,6 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.util.MessageQueueNameHelper;
-import org.jboss.logging.Logger;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -52,17 +51,15 @@
*/
public class ConsumerAspect
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- private static final Logger log = Logger.getLogger(ConsumerAspect.class);
-
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public Object handleCreateConsumerDelegate(Invocation invocation) throws Throwable
{
@@ -87,7 +84,9 @@
String queueName = null;
if (consumerState.getSubscriptionName() != null)
{
- queueName = MessageQueueNameHelper.createSubscriptionName(connectionState.getClientID(), consumerState.getSubscriptionName());
+ queueName = MessageQueueNameHelper.
+ createSubscriptionName(connectionState.getClientID(),
+ consumerState.getSubscriptionName());
}
else if (consumerState.getDestination().isQueue())
{
@@ -198,11 +197,11 @@
return getState(invocation).getSelector();
}
- // Package protected ---------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
private ConsumerState getState(Invocation inv)
{
@@ -215,5 +214,5 @@
return state.getMessageCallbackHandler();
}
- // Inner classes -------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -259,36 +259,6 @@
});
}
- private void handleMessageInternal(Object message) throws Exception
- {
- MessageProxy proxy = (MessageProxy) message;
-
- if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
-
- synchronized (mainLock)
- {
- if (closed)
- {
- // Ignore
- if (trace) { log.trace(this + " is closed, so ignore message"); }
- return;
- }
-
- proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
- //Add it to the buffer
- buffer.addLast(proxy, proxy.getJMSPriority());
-
- lastDeliveryId = proxy.getDeliveryId();
-
- if (trace) { log.trace(this + " added message(s) to the buffer"); }
-
- messageAdded();
-
- checkStop();
- }
- }
-
public void setMessageListener(MessageListener listener) throws JMSException
{
synchronized (mainLock)
@@ -565,7 +535,37 @@
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
-
+
+ private void handleMessageInternal(Object message) throws Exception
+ {
+ MessageProxy proxy = (MessageProxy) message;
+
+ if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
+
+ synchronized (mainLock)
+ {
+ if (closed)
+ {
+ // Ignore
+ if (trace) { log.trace(this + " is closed, so ignore message"); }
+ return;
+ }
+
+ proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+
+ //Add it to the buffer
+ buffer.addLast(proxy, proxy.getJMSPriority());
+
+ lastDeliveryId = proxy.getDeliveryId();
+
+ if (trace) { log.trace(this + " added message(s) to the buffer"); }
+
+ messageAdded();
+
+ checkStop();
+ }
+ }
+
private void checkStop()
{
int size = buffer.size();
@@ -640,9 +640,7 @@
log.warn("Thread interrupted", e);
}
}
-
-
private void queueRunner(ListenerRunner runner)
{
try
Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -41,7 +41,6 @@
import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
/**
- *
* State corresponding to a connection. This state is acessible inside aspects/interceptors.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -36,18 +36,16 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager.ReferenceInfo;
/**
- * A PagingChannel
+ * This channel implementation automatically pages message references to and from storage to prevent
+ * more than a maximum number of references being stored in memory at once.
*
- * This channel implementation automatically pages message references to and from storage to prevent more
- * than a maximum number of references being stored in memory at once.
- *
- * This allows us to support logical channels holding many millions of messages without running out of memory.
+ * This allows us to support logical channels holding many millions of messages without running out
+ * of memory.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
* $Id$
- *
*/
public abstract class PagingChannelSupport extends ChannelSupport
{
@@ -88,17 +86,7 @@
protected long nextPagingOrder;
/**
- * Constructor with default paging params
- * @param channelID
- * @param ms
- * @param pm
- * @param mm
- * @param acceptReliableMessages
- * @param recoverable
- * @param fullSize
- * @param pageSize
- * @param downCacheSize
- * @param executor
+ * Constructor with default paging params.
*/
public PagingChannelSupport(long channelID, MessageStore ms, PersistenceManager pm,
boolean acceptReliableMessages, boolean recoverable,
@@ -112,16 +100,7 @@
}
/**
- * Constructor specifying paging params
- * @param channelID
- * @param ms
- * @param pm
- * @param acceptReliableMessages
- * @param recoverable
- * @param executor
- * @param fullSize
- * @param pageSize
- * @param downCacheSize
+ * Constructor specifying paging params.
*/
public PagingChannelSupport(long channelID, MessageStore ms, PersistenceManager pm,
boolean acceptReliableMessages, boolean recoverable,
@@ -528,8 +507,6 @@
nextPagingOrder = ili.getMaxPageOrdering().longValue() + 1;
paging = true;
-
- log.info("set paging to true");
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -64,9 +64,8 @@
import org.jboss.messaging.util.Util;
/**
+ * JDBC implementation of PersistenceManager.
*
- * JDBC implementation of PersistenceManager
- *
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -28,7 +28,7 @@
import org.jboss.messaging.core.tx.Transaction;
/**
- * The interface to the persistence manager
+ * The interface to the persistence manager.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -64,13 +64,15 @@
void updatePageOrder(long channelID, List references) throws Exception;
- void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception;
+ void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num)
+ throws Exception;
List getPagedReferenceInfos(long channelID, long orderStart, int number) throws Exception;
InitialLoadInfo loadFromStart(long channelID, int fullSize) throws Exception;
- InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception;
+ InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad,
+ long firstPagingOrder, long nextPagingOrder) throws Exception;
List getMessages(List messageIds) throws Exception;
@@ -86,9 +88,8 @@
boolean referenceExists(long messageID) throws Exception;
- // Interface value classes
- //---------------------------------------------------------------
-
+ // Interface value classes ----------------------------------------------------------------------
+
class MessageChannelPair
{
private Message message;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -21,9 +21,6 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.util.Iterator;
-import java.util.Map;
-
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.Filter;
@@ -34,7 +31,6 @@
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager.ReferenceInfo;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.Future;
@@ -114,11 +110,10 @@
public QueueStats getStats()
{
- //Currently we only return the current message reference count for the channel
- //Note we are only interested in the number of refs in the main queue, not
- //in any deliveries
- //Also we are only interested in the value obtained after delivery is complete.
- //This is so we don't end up with transient values since delivery is half way through
+ // Currently we only return the current message reference count for the channel. Note we are
+ // only interested in the number of refs in the main queue, not in any deliveries. Also we are
+ // only interested in the value obtained after delivery is complete. This is so we don't end
+ // up with transient values since delivery is half way through.
int cnt = getRefCount();
@@ -126,8 +121,8 @@
{
lastCount = cnt;
- //We only return stats if it has changed since last time - this is so when we only
- //broadcast data when necessary
+ // We only return stats if it has changed since last time - this is so when we only
+ // broadcast data when necessary.
return new QueueStats(name, cnt);
}
else
@@ -181,7 +176,8 @@
}
public void handlePullMessagesResult(RemoteQueueStub remoteQueue, Message message,
- long holdingTxId, boolean failBeforeCommit, boolean failAfterCommit) throws Exception
+ long holdingTxId, boolean failBeforeCommit,
+ boolean failAfterCommit) throws Exception
{
//This needs to be run on a different thread to the one used by JGroups to deliver the message
//to avoid deadlock
@@ -192,7 +188,8 @@
}
//TODO it's not ideal that we need to pass in a PullMessagesRequest
- public void handleGetDeliveriesRequest(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx) throws Exception
+ public void handleGetDeliveriesRequest(int returnNodeId, int number, TransactionId txId,
+ PullMessagesRequest tx) throws Exception
{
//This needs to be run on a different thread to the one used by JGroups to deliver the message
//to avoid deadlock
@@ -224,30 +221,25 @@
return ((Integer)result.getResult()).intValue();
}
- /*
- * Merge the contents of one queue with another - this happens at failover when
- * a queue is failed over to another node, but a queue with the same name already exists
- * In this case we merge the two queues
+ /**
+ * Merge the contents of one queue with another - this happens at failover when a queue is failed
+ * over to another node, but a queue with the same name already exists. In this case we merge the
+ * two queues.
*/
public void mergeIn(RemoteQueueStub remoteQueue) throws Exception
{
if (trace) { log.trace("Merging queue " + remoteQueue + " into " + this); }
- log.info("queue is paging:" + this.paging + " message refs size " +
- this.messageRefs.size() + " fullsize:" + this.fullSize +
- " delivering:" + this.deliveringCount.get());
-
synchronized (refLock)
{
flushDownCache();
PersistenceManager.InitialLoadInfo ili =
- pm.mergeAndLoad(remoteQueue.getChannelID(), channelID, fullSize - messageRefs.size(), firstPagingOrder, nextPagingOrder);
+ pm.mergeAndLoad(remoteQueue.getChannelID(), channelID, fullSize - messageRefs.size(),
+ firstPagingOrder, nextPagingOrder);
if (trace) { log.trace("Loaded " + ili.getRefInfos().size() + " refs"); }
-
- log.info("firstpageord:" + ili.getMinPageOrdering() + " lastpageord:" + ili.getMaxPageOrdering());
-
+
doLoad(ili);
deliverInternal();
Modified: trunk/tests/etc/container-qalab.xml
===================================================================
--- trunk/tests/etc/container-qalab.xml 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/tests/etc/container-qalab.xml 2007-02-27 00:43:45 UTC (rev 2452)
@@ -35,7 +35,6 @@
<password>messaging</password>
</database-configuration>
-
<database-configuration name="oracle">
<url>jdbc:oracle:thin:@dev01-priv:1521:qadb01</url>
<driver>oracle.jdbc.driver.OracleDriver</driver>
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-02-27 00:43:45 UTC (rev 2452)
@@ -944,10 +944,12 @@
* Simulates a queue deployment (copying the queue descriptor in the deploy directory).
*/
public static void deployQueue(String name, String jndiName, int fullSize, int pageSize,
- int downCacheSize, int serverIndex, boolean clustered) throws Exception
+ int downCacheSize, int serverIndex, boolean clustered)
+ throws Exception
{
insureStarted();
- servers[serverIndex].getServer().deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+ servers[serverIndex].getServer().
+ deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
}
/**
More information about the jboss-cvs-commits
mailing list