[jboss-cvs] JBoss Messaging SVN: r7797 - branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Sep 3 07:53:55 EDT 2009


Author: gaohoward
Date: 2009-09-03 07:53:55 -0400 (Thu, 03 Sep 2009)
New Revision: 7797

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
Log:
JBMESSAGING-1680


Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2009-09-02 12:20:20 UTC (rev 7796)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2009-09-03 11:53:55 UTC (rev 7797)
@@ -38,7 +38,11 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Vector;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -118,15 +122,11 @@
    }
      
    public void start() throws Exception
-   {		
+   {
    	this.controlChannel = jChannelFactory.createControlChannel();
-   	
-      this.dataChannel = jChannelFactory.createDataChannel();
       
       // We don't want to receive local messages on any of the channels
       controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
-      dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
       
       MessageListener messageListener = new ControlMessageListener();
       
@@ -135,13 +135,12 @@
       RequestHandler requestHandler = new ControlRequestHandler();
       
       dispatcher = new MessageDispatcher(controlChannel, messageListener, membershipListener, requestHandler, true);
-      	      
-      Receiver dataReceiver = new DataReceiver();
       
-      dataChannel.setReceiver(dataReceiver);
+      starting = true;
       
-      starting = true;
-         
+      //first kickoff a thread to start the dataChannel
+      Future<String> future = connectDataChannel();
+      
       controlChannel.connect(groupName + CONTROL_SUFFIX);
       
       if (!((JChannel)controlChannel).flushSupported())
@@ -188,9 +187,25 @@
    	}   	
       
       //Now connect the data channel.
-   	
-      dataChannel.connect(groupName + DATA_SUFFIX);
+      future.get();
    }
+
+   private Future<String> connectDataChannel()
+   {
+      Callable<String> dataRunner = new Callable<String> () {
+         public String call() throws Exception
+         {
+            dataChannel = jChannelFactory.createDataChannel();
+            dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+            Receiver dataReceiver = new DataReceiver();
+            dataChannel.setReceiver(dataReceiver);
+            dataChannel.connect(groupName + DATA_SUFFIX);
+            return "OK";
+         }
+      };
+      ExecutorService pool = Executors.newFixedThreadPool(1);
+      return pool.submit(dataRunner);
+   }
    
    public void stop() throws Exception
    {	
@@ -639,5 +654,5 @@
             throw e2;
          }
       }
-   }  
+   }
 }




More information about the jboss-cvs-commits mailing list