[jboss-cvs] JBoss Messaging SVN: r1381 - trunk/src/main/org/jboss/messaging/core
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 28 17:13:20 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-09-28 17:13:19 -0400 (Thu, 28 Sep 2006)
New Revision: 1381
Modified:
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
resolving conflict
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-28 21:01:53 UTC (rev 1380)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-28 21:13:19 UTC (rev 1381)
@@ -97,9 +97,9 @@
protected Object refLock;
protected Object deliveryLock;
-
+
protected boolean active = true;
-
+
// Constructors --------------------------------------------------
protected ChannelSupport(long channelID, MessageStore ms,
@@ -139,7 +139,7 @@
deliveryLock = new Object();
}
-
+
// Receiver implementation ---------------------------------------
public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
@@ -148,13 +148,13 @@
{
return null;
}
-
+
checkClosed();
-
+
Future result = new Future();
if (tx == null)
- {
+ {
try
{
// Instead of executing directly, we add the handle request to the event queue.
@@ -167,15 +167,15 @@
{
log.warn("Thread interrupted", e);
}
-
+
return (Delivery)result.getResult();
}
else
{
return handleInternal(sender, ref, tx, true, false);
}
- }
-
+ }
+
// DeliveryObserver implementation --------------------------
public void acknowledge(Delivery d, Transaction tx) throws Throwable
@@ -184,12 +184,12 @@
this.acknowledgeInternal(d, tx, true, false);
}
-
-
+
+
public void cancel(Delivery d) throws Throwable
{
// TODO We should also consider executing cancels on the event queue
- cancelInternal(d);
+ cancelInternal(d);
}
// Distributor implementation ------------------------------------
@@ -201,16 +201,16 @@
boolean added = router.add(r);
if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
-
+
receiversReady = true;
-
+
return added;
}
public boolean remove(Receiver r)
{
boolean removed = router.remove(r);
-
+
if (removed && !router.iterator().hasNext())
{
receiversReady = false;
@@ -235,7 +235,7 @@
{
return router.iterator();
}
-
+
public int numberOfReceivers()
{
return router.numberOfReceivers();
@@ -266,7 +266,7 @@
public List browse(Filter filter)
{
if (trace) { log.trace(this + " browse" + (filter == null ? "" : ", filter = " + filter)); }
-
+
synchronized (deliveryLock)
{
synchronized (refLock)
@@ -276,14 +276,14 @@
// Also is very inefficient since it makes a copy
// The way to implement this properly is to use the Prioritized deque iterator
// combined with an iterator over the refs in storage
-
+
//TODO use the ref queue iterator
List references = delivering(filter);
-
- List undel = undelivered(filter);
+ List undel = undelivered(filter);
+
references.addAll(undel);
-
+
// dereference pass
ArrayList messages = new ArrayList(references.size());
for (Iterator i = references.iterator(); i.hasNext();)
@@ -293,18 +293,18 @@
}
return messages;
}
- }
+ }
}
public void deliver(boolean synchronous)
{
- checkClosed();
-
+ checkClosed();
+
// We put a delivery request on the event queue.
try
{
Future future = null;
-
+
if (synchronous)
{
future = new Future();
@@ -312,9 +312,9 @@
//TODO we should keep track of how many deliveries are currently in the queue
//so we don't execute another delivery when one is in the queue, since
//this is pointless
-
+
this.executor.execute(new DeliveryRunnable(future));
-
+
if (synchronous)
{
// Wait to complete
@@ -333,25 +333,25 @@
{
router.clear();
router = null;
- }
- }
-
+ }
+ }
+
/*
* This method clears the channel.
* Basically it acknowledges any outstanding deliveries and consumes the rest of the messages in the channel.
* We can't just delete the corresponding references directly from the database since
* a) We might be paging
* b) The message might remain in the message store causing a leak
- *
+ *
*/
public void removeAllReferences() throws Throwable
- {
+ {
synchronized (refLock)
{
synchronized (deliveryLock)
{
//Ack the deliveries
-
+
//Clone to avoid ConcurrentModificationException
Set dels = new HashSet(deliveries);
@@ -359,26 +359,26 @@
while (iter.hasNext())
{
SimpleDelivery d = (SimpleDelivery) iter.next();
-
+
d.acknowledge(null);
}
-
+
//Now we consume the rest of the messages
//This may take a while if we have a lot of messages including perhaps millions
//paged in the database - but there's no obvious other way to do it.
//We cannot just delete them directly from the database - because we may end up with messages leaking
//in the message store,
//also we might get race conditions when other channels are updating the same message in the db
-
+
//Note - we don't do this in a tx - because the tx could be too big if we have millions of refs
//paged in storage
-
+
MessageReference ref;
while ((ref = removeFirstInMemory()) != null)
{
SimpleDelivery del = new SimpleDelivery(this, ref, false);
-
- del.acknowledge(null);
+
+ del.acknowledge(null);
}
}
}
@@ -444,7 +444,7 @@
* Returns the count of messages stored AND being delivered.
*/
public int messageCount()
- {
+ {
synchronized (refLock)
{
synchronized (deliveryLock)
@@ -453,7 +453,7 @@
}
}
}
-
+
public void activate()
{
synchronized (refLock)
@@ -464,7 +464,7 @@
}
}
}
-
+
public void deactivate()
{
synchronized (refLock)
@@ -475,7 +475,7 @@
}
}
}
-
+
public boolean isActive()
{
synchronized (refLock)
@@ -486,9 +486,9 @@
}
}
}
-
- // Public --------------------------------------------------------
+ // Public --------------------------------------------------------
+
public int memoryRefCount()
{
synchronized (refLock)
@@ -517,12 +517,12 @@
/*
* This methods delivers as many messages as possible to the router until no
* more deliveries are returned. This method should never be called at the
- * same time as handle.
- *
+ * same time as handle.
+ *
* @see org.jboss.messaging.core.Channel#deliver()
*/
protected void deliverInternal(boolean handle) throws Throwable
- {
+ {
try
{
// The iterator is used to iterate through the refs in the channel in the case that they
More information about the jboss-cvs-commits
mailing list