[jboss-cvs] JBoss Messaging SVN: r5730 - trunk/src/main/org/jboss/messaging/core/server/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 26 14:47:03 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-26 14:47:03 -0500 (Mon, 26 Jan 2009)
New Revision: 5730
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
Just running auto-cleanup... no code changes
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-26 19:45:48 UTC (rev 5729)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-26 19:47:03 UTC (rev 5730)
@@ -96,16 +96,16 @@
private boolean promptDelivery;
- private AtomicInteger messagesAdded = new AtomicInteger(0);
+ private final AtomicInteger messagesAdded = new AtomicInteger(0);
- private AtomicInteger deliveringCount = new AtomicInteger(0);
+ private final AtomicInteger deliveringCount = new AtomicInteger(0);
- private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
+ private final AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
private final Runnable deliverRunner = new DeliverRunner();
private final PagingManager pagingManager;
-
+
private volatile PagingStore pagingStore;
private final StorageManager storageManager;
@@ -116,8 +116,8 @@
private int consumersToFailover = -1;
- private final SimpleString routeToPropertyName;
-
+ private final SimpleString routeToPropertyName;
+
public QueueImpl(final long persistenceID,
final SimpleString name,
final Filter filter,
@@ -143,16 +143,16 @@
this.storageManager = storageManager;
this.queueSettingsRepository = queueSettingsRepository;
-
- this.routeToPropertyName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
+ routeToPropertyName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
+
if (postOffice == null)
{
- this.pagingManager = null;
+ pagingManager = null;
}
else
{
- this.pagingManager = postOffice.getPagingManager();
+ pagingManager = postOffice.getPagingManager();
}
direct = true;
@@ -164,14 +164,14 @@
public boolean accept(final ServerMessage message) throws Exception
{
-// if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
-// {
-// if (message.removeProperty(routeToPropertyName) == null)
-// {
-// return false;
-// }
-// }
-
+ // if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
+ // {
+ // if (message.removeProperty(routeToPropertyName) == null)
+ // {
+ // return false;
+ // }
+ // }
+
if (filter != null && !filter.match(message))
{
return false;
@@ -179,21 +179,21 @@
else
{
int count = message.incrementRefCount();
-
+
if (count == 1)
{
PagingStore store = pagingManager.getPageStore(message.getDestination());
-
+
store.addSize(message.getMemoryEstimate());
}
-
+
boolean durableRef = message.isDurable() && durable;
-
+
if (durableRef)
{
message.incrementDurableRefCount();
}
-
+
return true;
}
}
@@ -212,9 +212,9 @@
{
return false;
}
-
+
public void route(final ServerMessage message, final Transaction tx) throws Exception
- {
+ {
// Temp
SimpleString routeToHeader = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
message.removeProperty(routeToHeader);
@@ -223,97 +223,99 @@
// If durable, must be persisted before anything is routed
MessageReference ref = message.createReference(this);
-
+
PagingStore store = pagingManager.getPageStore(message.getDestination());
-
+
store.addSize(ref.getMemoryEstimate());
-
+
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
+
if (scheduledDeliveryTime != null)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
+
if (tx == null)
- {
+ {
if (durableRef)
{
if (!message.isStored())
{
storageManager.storeMessage(message);
-
+
message.setStored();
}
-
+
storageManager.storeReference(ref.getQueue().getPersistenceID(), message.getMessageID());
}
-
+
if (scheduledDeliveryTime != null && durableRef)
{
- storageManager.updateScheduledDeliveryTime(ref);
- }
+ storageManager.updateScheduledDeliveryTime(ref);
+ }
addLast(ref);
}
else
- {
+ {
if (durableRef)
{
if (!message.isStored())
- {
- storageManager.storeMessageTransactional(tx.getID(), message);
-
+ {
+ storageManager.storeMessageTransactional(tx.getID(), message);
+
message.setStored();
}
-
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
- storageManager.storeReferenceTransactional(tx.getID(), ref.getQueue().getPersistenceID(), message.getMessageID());
- }
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
+ storageManager.storeReferenceTransactional(tx.getID(),
+ ref.getQueue().getPersistenceID(),
+ message.getMessageID());
+ }
+
if (scheduledDeliveryTime != null && durableRef)
{
- storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), ref);
- }
+ storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), ref);
+ }
getRefsOperation(tx).addRef(ref);
}
}
-
+
public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
{
// Temp
SimpleString routeToHeader = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
message.removeProperty(routeToHeader);
-
+
MessageReference ref = message.createReference(this);
-
+
int count = message.incrementRefCount();
-
+
PagingStore store = pagingManager.getPageStore(message.getDestination());
-
+
if (count == 1)
{
store.addSize(message.getMemoryEstimate());
}
store.addSize(ref.getMemoryEstimate());
-
+
boolean durableRef = message.isDurable() && durable;
-
+
if (durableRef)
{
message.incrementDurableRefCount();
}
-
+
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
+
if (scheduledDeliveryTime != null)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
+
if (tx == null)
{
addLast(ref);
@@ -321,10 +323,10 @@
else
{
getRefsOperation(tx).addRef(ref);
- }
+ }
message.setStored();
-
+
return ref;
}
@@ -352,7 +354,7 @@
public void setPersistenceID(final long id)
{
- this.persistenceID = id;
+ persistenceID = id;
}
public Filter getFilter()
@@ -407,7 +409,7 @@
{
return distributionPolicy.getConsumerCount();
}
-
+
public synchronized List<Consumer> getConsumers()
{
return new ArrayList<Consumer>(distributionPolicy.getConsumers());
@@ -839,9 +841,9 @@
public synchronized void setBackup()
{
- this.backup = true;
+ backup = true;
- this.direct = false;
+ direct = false;
}
public synchronized boolean activate()
@@ -898,7 +900,8 @@
// Public
// -----------------------------------------------------------------------------
- public boolean equals(Object other)
+ @Override
+ public boolean equals(final Object other)
{
if (this == other)
{
@@ -910,6 +913,7 @@
return name.equals(qother.name);
}
+ @Override
public int hashCode()
{
return name.hashCode();
@@ -953,7 +957,6 @@
// FIXME - this won't work with replication!!!!!!!!!!!
long newMessageId = storageManager.generateUniqueID();
-
ServerMessage copy = message.copy(newMessageId);
SimpleString originalQueue = copy.getDestination();
@@ -1123,7 +1126,10 @@
// If the queue is empty, we need to check if there are pending messages, and throw a warning
if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
{
- log.warn("The Queue " + this.name + " is empty, however there are pending messages on Paging for the address " + pagingStore.getStoreName() + " waiting message ACK before they could be routed");
+ log.warn("The Queue " + name +
+ " is empty, however there are pending messages on Paging for the address " +
+ pagingStore.getStoreName() +
+ " waiting message ACK before they could be routed");
}
}
// We delivered all the messages - go into direct delivery
@@ -1133,7 +1139,7 @@
}
return;
}
-
+
if (pagingStore == null)
{
// TODO: It would be better if we could initialize the pagingStore during the construction
More information about the jboss-cvs-commits
mailing list