[jboss-cvs] JBoss Messaging SVN: r8408 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893: src/main/org/jboss/messaging/core/impl/clusterconnection and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 10 21:13:19 EDT 2011
Author: jbertram
Date: 2011-08-10 21:13:19 -0400 (Wed, 10 Aug 2011)
New Revision: 8408
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/
branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java
Log:
JBPAPP-6993
Property changes on: branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893
___________________________________________________________________
Modified: svn:mergeinfo
- /branches/Branch_1_4:8305,8375
+ /branches/Branch_1_4:8305,8375,8398
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2011-08-10 14:11:12 UTC (rev 8407)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2011-08-11 01:13:19 UTC (rev 8408)
@@ -28,6 +28,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -86,6 +88,8 @@
private int maxRetry;
private int retryInterval; //5 sec
+
+ private ExecutorService suckerReaper;
public ClusterConnectionManager(int nodeID,
String connectionFactoryUniqueName, boolean preserveOrdering,
@@ -130,6 +134,8 @@
return;
}
+ suckerReaper = Executors.newCachedThreadPool();
+
if (trace) { log.trace(this + " started"); }
started = true;
@@ -153,6 +159,8 @@
connections.clear();
+ suckerReaper.shutdownNow();
+
started = false;
if (trace) { log.trace(this + " stopped"); }
@@ -481,7 +489,7 @@
}
MessageSucker sucker = new MessageSucker(localQueue, info.session, localInfo.session,
- preserveOrdering, sourceChannelID);
+ preserveOrdering, sourceChannelID, suckerReaper);
info.addSucker(sucker);
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2011-08-10 14:11:12 UTC (rev 8407)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2011-08-11 01:13:19 UTC (rev 8408)
@@ -22,12 +22,13 @@
package org.jboss.messaging.core.impl.clusterconnection;
+import java.util.concurrent.Executor;
+
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
-import javax.transaction.TransactionManager;
import org.jboss.jms.client.JBossSession;
import org.jboss.jms.client.container.ClientConsumer;
@@ -65,8 +66,6 @@
private volatile boolean started;
- private TransactionManager tm;
-
private boolean consuming;
private ClientConsumerDelegate consumer;
@@ -79,6 +78,8 @@
protected JBossQueue jbq;
+ private Executor suckerReaper;
+
private boolean suspended = false;
public String toString()
@@ -87,7 +88,7 @@
}
protected MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
- boolean preserveOrdering, long sourceChannelID)
+ boolean preserveOrdering, long sourceChannelID, Executor suckerReaper)
{
if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " preserveOrdering:" + preserveOrdering); }
@@ -102,6 +103,8 @@
this.preserveOrdering = preserveOrdering;
this.sourceChannelID = sourceChannelID;
+
+ this.suckerReaper = suckerReaper;
}
protected synchronized void start() throws Exception
@@ -152,44 +155,52 @@
return;
}
- setConsuming(false);
-
- try
+ suckerReaper.execute(new Runnable()
{
- consumer.closing(-1);
- }
- catch (Throwable t)
- {
- // Ignore
- }
- try
- {
- consumer.close();
- }
- catch (Throwable t)
- {
- // Ignore
- }
+ public void run()
+ {
+ setConsuming(false);
- try
- {
- producer.close();
- }
- catch (Throwable t)
- {
- // Ignore
- }
+ localQueue.unregisterSucker(MessageSucker.this);
- sourceSession = null;
+ try
+ {
+ consumer.closing(-1);
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+ try
+ {
+ consumer.close();
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
- localSession = null;
+ try
+ {
+ producer.close();
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
- consumer = null;
+ sourceSession = null;
- clientConsumer = null;
+ localSession = null;
- producer = null;
+ consumer = null;
+ clientConsumer = null;
+
+ producer = null;
+ }
+ });
+
started = false;
}
}
@@ -267,46 +278,55 @@
return this.localQueue.getName();
}
- public synchronized void setConsuming(boolean consume)
+ public void setConsuming(boolean consume)
{
- if (trace) { log.trace(this + " setConsuming " + consume); }
+ if (trace) { log.trace(this + " setConsuming " + consume + " consuming " + consuming); }
if (!started)
{
return;
}
- //for supended, we set the consuming flag and do nothing.
- //later on resume, we force the sucker to be the last set consuming state.
- if (suspended)
- {
- consuming = consume;
- return;
- }
-
- try
- {
- if (consume && !consuming)
- {
- if (trace) { log.trace(this + " resuming client consumer"); }
-
- clientConsumer.resume();
-
- consuming = true;
- }
- else if (!consume && consuming)
- {
- if (trace) { log.trace(this + " pausing client consumer"); }
-
- clientConsumer.pause();
-
- consuming = false;
- }
- }
- catch (Exception e)
- {
- //We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
- }
+ synchronized (this)
+ {
+ // for supended, we set the consuming flag and do nothing.
+ // later on resume, we force the sucker to be the last set consuming state.
+ if (suspended)
+ {
+ consuming = consume;
+ return;
+ }
+
+ try
+ {
+ if (consume && !consuming)
+ {
+ if (trace)
+ {
+ log.trace(this + " resuming client consumer");
+ }
+
+ clientConsumer.resume();
+
+ consuming = true;
+ }
+ else if (!consume && consuming)
+ {
+ if (trace)
+ {
+ log.trace(this + " pausing client consumer");
+ }
+
+ clientConsumer.pause();
+
+ consuming = false;
+ }
+ }
+ catch (Exception e)
+ {
+ // We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
+ }
+ }
}
public void onMessage(Message msg)
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2011-08-10 14:11:12 UTC (rev 8407)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2011-08-11 01:13:19 UTC (rev 8408)
@@ -27,6 +27,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.jms.JMSException;
import javax.jms.Queue;
@@ -82,6 +84,8 @@
private int retryInterval;
+ private ExecutorService suckerReaper;
+
public FakeClusterConnectionManager(int remoteID,
JBossConnectionFactory theFactory,
String suckerUser,
@@ -113,6 +117,7 @@
{
return;
}
+ suckerReaper = Executors.newCachedThreadPool();
started = true;
}
@@ -172,7 +177,7 @@
if (!info.hasSucker(queue.getQueueName()))
{
- FakeMessageSucker sucker = new FakeMessageSucker(new FakeCoreQueue(queue), info.getSession(), 0, info.suckBuffer);
+ FakeMessageSucker sucker = new FakeMessageSucker(new FakeCoreQueue(queue), info.getSession(), 0, info.suckBuffer, this.suckerReaper);
info.addSucker(sucker);
@@ -635,6 +640,8 @@
connections.clear();
+ suckerReaper.shutdownNow();
+
started = false;
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java 2011-08-10 14:11:12 UTC (rev 8407)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP11_JBMESSAGING-1864_JBMESSAGING-1883_JBMESSAGING-1893/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java 2011-08-11 01:13:19 UTC (rev 8408)
@@ -23,6 +23,7 @@
package org.jboss.test.messaging.jms.clustering;
import java.util.List;
+import java.util.concurrent.Executor;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -50,9 +51,9 @@
private boolean queueNotUpdated = true;
- FakeMessageSucker(Queue localQueue, Session sourceSession, long sourceChannelID, List<TextMessage> buffer)
+ FakeMessageSucker(Queue localQueue, Session sourceSession, long sourceChannelID, List<TextMessage> buffer, Executor suckerReaper)
{
- super(localQueue, sourceSession, sourceSession, true, sourceChannelID);
+ super(localQueue, sourceSession, sourceSession, true, sourceChannelID, suckerReaper);
this.buffer = buffer;
}
More information about the jboss-cvs-commits
mailing list