[jboss-cvs] JBoss Messaging SVN: r7797 - branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 3 07:53:55 EDT 2009
Author: gaohoward
Date: 2009-09-03 07:53:55 -0400 (Thu, 03 Sep 2009)
New Revision: 7797
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
Log:
JBMESSAGING-1680
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2009-09-02 12:20:20 UTC (rev 7796)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2009-09-03 11:53:55 UTC (rev 7797)
@@ -38,7 +38,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -118,15 +122,11 @@
}
public void start() throws Exception
- {
+ {
this.controlChannel = jChannelFactory.createControlChannel();
-
- this.dataChannel = jChannelFactory.createDataChannel();
// We don't want to receive local messages on any of the channels
controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
- dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
MessageListener messageListener = new ControlMessageListener();
@@ -135,13 +135,12 @@
RequestHandler requestHandler = new ControlRequestHandler();
dispatcher = new MessageDispatcher(controlChannel, messageListener, membershipListener, requestHandler, true);
-
- Receiver dataReceiver = new DataReceiver();
- dataChannel.setReceiver(dataReceiver);
+ starting = true;
- starting = true;
-
+ //first kickoff a thread to start the dataChannel
+ Future<String> future = connectDataChannel();
+
controlChannel.connect(groupName + CONTROL_SUFFIX);
if (!((JChannel)controlChannel).flushSupported())
@@ -188,9 +187,25 @@
}
//Now connect the data channel.
-
- dataChannel.connect(groupName + DATA_SUFFIX);
+ future.get();
}
+
+ private Future<String> connectDataChannel()
+ {
+ Callable<String> dataRunner = new Callable<String> () {
+ public String call() throws Exception
+ {
+ dataChannel = jChannelFactory.createDataChannel();
+ dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+ Receiver dataReceiver = new DataReceiver();
+ dataChannel.setReceiver(dataReceiver);
+ dataChannel.connect(groupName + DATA_SUFFIX);
+ return "OK";
+ }
+ };
+ ExecutorService pool = Executors.newFixedThreadPool(1);
+ return pool.submit(dataRunner);
+ }
public void stop() throws Exception
{
@@ -639,5 +654,5 @@
throw e2;
}
}
- }
+ }
}
More information about the jboss-cvs-commits
mailing list