[hornetq-commits] JBoss hornetq SVN: r10597 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu May 5 16:08:16 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-05 16:08:16 -0400 (Thu, 05 May 2011)
New Revision: 10597
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
JBPAPP-6466 - avoiding Timeout message that is not needed
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-05-05 17:20:45 UTC (rev 10596)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-05-05 20:08:16 UTC (rev 10597)
@@ -27,7 +27,6 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -573,73 +572,44 @@
public void close() throws Exception
{
final long tx = store.generateUniqueID();
-
- final ArrayList<Exception> ex = new ArrayList<Exception>();
-
- final AtomicBoolean isPersistent = new AtomicBoolean(false);
-
- // We can't delete the records at the caller's thread
- // because an executor may be holding the synchronized on PageCursorImpl
- // what would lead to a dead lock
- // so, we delete it inside the executor also
- // and wait for the result
- // The caller will be treating eventual IO exceptions and dispatching to the original thread's caller
- executor.execute(new Runnable()
+ try
{
-
- public void run()
+
+ boolean isPersistent = false;
+
+ synchronized (PageSubscriptionImpl.this)
{
- try
+ for (PageCursorInfo cursor : consumedPages.values())
{
- synchronized (PageSubscriptionImpl.this)
+ for (PagePosition info : cursor.acks)
{
- for (PageCursorInfo cursor : consumedPages.values())
+ if (info.getRecordID() != 0)
{
- for (PagePosition info : cursor.acks)
- {
- if (info.getRecordID() != 0)
- {
- isPersistent.set(true);
- store.deleteCursorAcknowledgeTransactional(tx, info.getRecordID());
- }
- }
+ isPersistent = true;
+ store.deleteCursorAcknowledgeTransactional(tx, info.getRecordID());
}
}
}
- catch (Exception e)
- {
- ex.add(e);
- PageSubscriptionImpl.log.warn(e.getMessage(), e);
- }
}
- });
-
- Future future = new Future();
-
- executor.execute(future);
-
- while (!future.await(5000))
- {
- PageSubscriptionImpl.log.warn("Timeout on waiting cursor " + this + " to be closed");
- }
-
- if (isPersistent.get())
- {
- // Another reason to perform the commit at the main thread is because the OperationContext may only send the
- // result to the client when
- // the IO on commit is done
- if (ex.size() == 0)
+
+ if (isPersistent)
{
store.commit(tx);
}
- else
+
+ cursorProvider.close(this);
+ }
+ catch (Exception e)
+ {
+ try
{
store.rollback(tx);
- throw ex.get(0);
}
+ catch (Exception ignored)
+ {
+ // exception of the exception.. nothing that can be done here
+ }
}
-
- cursorProvider.close(this);
}
/* (non-Javadoc)
More information about the hornetq-commits
mailing list