[jboss-cvs] JBoss Messaging SVN: r3680 - in trunk: tests and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 8 02:27:04 EST 2008


Author: timfox
Date: 2008-02-08 02:27:04 -0500 (Fri, 08 Feb 2008)
New Revision: 3680

Modified:
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
Log:
Some improvements to ClientConsumerImpl


Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-08 03:54:31 UTC (rev 3679)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-08 07:27:04 UTC (rev 3680)
@@ -1,24 +1,24 @@
 /*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.jms.client.impl;
 
 import org.jboss.jms.client.api.MessageHandler;
@@ -41,25 +41,28 @@
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
+ * 
  * @version <tt>$Revision: 3603 $</tt>
- *
+ * 
  * $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $
  */
 public class ClientConsumerImpl implements ClientConsumerInternal
 {
-   // Constants ------------------------------------------------------------------------------------
+   // Constants
+   // ------------------------------------------------------------------------------------
 
-	private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
-	
-	private static final boolean trace = log.isTraceEnabled();
-	
-   // Attributes -----------------------------------------------------------------------------------
+   private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
 
-	private String id;
-	private ClientSessionInternal session;
+   private static final boolean trace = log.isTraceEnabled();
+
+   // Attributes
+   // -----------------------------------------------------------------------------------
+
+   private String id;
+   private ClientSessionInternal session;
    private int bufferSize;
-   private PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(10);
+   private PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(
+         10);
    private volatile Thread receiverThread;
    private MessageHandler handler;
    private volatile boolean closed;
@@ -70,208 +73,194 @@
    private MessagingRemotingConnection remotingConnection;
    private String queueName;
    private long ignoreDeliveryMark = -1;
-   
-   //FIXME - revisit closed and closing flags
-   
-   // Static ---------------------------------------------------------------------------------------
-      
-   // Constructors ---------------------------------------------------------------------------------
 
-   public ClientConsumerImpl(ClientSessionInternal session, String id, int bufferSize,
-                             QueuedExecutor sessionExecutor,
-                             MessagingRemotingConnection remotingConnection,
-                             String queueName)
+   // FIXME - revisit closed and closing flags
+
+   // Static
+   // ---------------------------------------------------------------------------------------
+
+   // Constructors
+   // ---------------------------------------------------------------------------------
+
+   public ClientConsumerImpl(ClientSessionInternal session, String id,
+         int bufferSize, QueuedExecutor sessionExecutor,
+         MessagingRemotingConnection remotingConnection, String queueName)
    {
       this.id = id;
       this.session = session;
       this.bufferSize = bufferSize;
       this.sessionExecutor = sessionExecutor;
-      this.remotingConnection = remotingConnection;    
+      this.remotingConnection = remotingConnection;
       this.queueName = queueName;
    }
 
-   // ClientConsumer implementation -----------------------------------------------------------------
-      
+   // ClientConsumer implementation
+   // -----------------------------------------------------------------
+
    public Message receive(long timeout) throws MessagingException
-   {      
+   {
       checkClosed();
-      
-      DeliverMessage m = null;      
-      
+
+      DeliverMessage m = null;
+
       synchronized (mainLock)
-      {        
-         if (trace) { log.trace(this + " receiving, timeout = " + timeout); }
-         
-         if (closed)
-         {
-            if (trace) { log.trace(this + " closed, returning null"); }
-            return null;
-         }
-         
+      {
+         if (closed) { return null; }
+
          if (handler != null)
          {
-            throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
+            throw new MessagingException(
+               MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
          }
-                       
+
          receiverThread = Thread.currentThread();
-               
-         long startTimestamp = System.currentTimeMillis();
-                  
+
+         if (timeout == 0)            
+         {
+            //Effectively infinite
+            timeout = Long.MAX_VALUE;
+         }
+         
+         long start = System.currentTimeMillis();
+         
+         long toWait = timeout;
+
          try
          {
-            while(true)
-            {                             
-               if (timeout == 0)
+            while (true)
+            {
+               while (!closed && buffer.isEmpty() && toWait > 0)
                {
-                  if (trace) { log.trace(this + ": receive, no timeout"); }
-                  
-                  m = getMessage(0);                     
-                  
-                  if (m == null)
+                  try
                   {
-                     return null;
+                     mainLock.wait(toWait);
                   }
-               }
-               else if (timeout == -1)
-               {
-                  //ReceiveNoWait
-                  if (trace) { log.trace(this + ": receive, noWait"); }
-                  
-                  m = getMessage(-1);                     
-                  
-                  if (m == null)
+                  catch (InterruptedException e)
                   {
-                     if (trace) { log.trace(this + ": no message available"); }
-                     return null;
                   }
+                  
+                  long now = System.currentTimeMillis();
+                  
+                  toWait -= now - start;
+                  
+                  start = now;
                }
-               else
-               {
-                  if (trace) { log.trace(this + ": receive, timeout " + timeout + " ms, blocking poll on queue"); }
+                       
+               if (!closed && !buffer.isEmpty())
+               {                              
+                  m = buffer.removeFirst();
                   
-                  m = getMessage(timeout);
+                  boolean expired = m.getMessage().isExpired();
+                  
+                  session.delivered(m.getDeliveryID(), expired);
+                  
+                  checkSendChangeRate();
                                     
-                  if (m == null)
+                  if (expired)
                   {
-                     // timeout expired
-                     if (trace) { log.trace(this + ": " + timeout + " ms timeout expired"); }
-                     
-                     return null;
+                     if (toWait > 0)
+                     {
+                        continue;
+                     }
+                     else
+                     {
+                        return null;
+                     }
                   }
+                                    
+                  return m.getMessage();
                }
-                              
-               if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
-                       
-               boolean expired = m.getMessage().isExpired();
-               
-               session.delivered(m.getDeliveryID(), expired);
-               
-               if (!expired)
+               else
                {
-                  break;
+                  return null;
                }
-
-               if (trace) { log.trace("Message has expired " + m); }
-               
-               if (timeout != 0)
-               {
-                  timeout -= System.currentTimeMillis() - startTimestamp;
-                  if (timeout == 0)
-                  {
-                     // As 0 means waitForever, we make it noWait
-                     timeout = -1;
-                  }
-
-               }
-            }           
+            }
          }
          finally
          {
-            receiverThread = null;            
+            receiverThread = null;
          }
-      } 
-         
-      return m.getMessage();
-   } 
-   
+      }
+   }
+
    public Message receiveImmediate() throws MessagingException
-   { 
+   {
       return receive(-1);
    }
-   
+
    public MessageHandler getMessageHandler() throws MessagingException
    {
       checkClosed();
-      
+
       return handler;
    }
-   
+
    public void setMessageHandler(MessageHandler handler) throws MessagingException
-   {  
+   {
       checkClosed();
-      
+
       synchronized (mainLock)
       {
-         if (receiverThread != null)
-         {
-            throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot set MessageHandler - consumer is in receive(...)");
-         }
-         
+         if (receiverThread != null) { throw new MessagingException(
+               MessagingException.ILLEGAL_STATE,
+               "Cannot set MessageHandler - consumer is in receive(...)"); }
+
          this.handler = handler;
-                            
+
          if (handler != null && !buffer.isEmpty())
-         {  
+         {
             listenerRunning = true;
-            
+
             this.queueRunner(new ListenerRunner());
-         }        
-      }   
+         }
+      }
    }
-   
+
    public String getQueueName()
    {
       return queueName;
    }
-      
+
    public synchronized void close() throws MessagingException
    {
-      if (closed)
-      {
-         return;
-      }
-               
+      if (closed) { return; }
+
       try
       {
-         //Important! We set the handler to null so the next ListenerRunner won't run
+         // Important! We set the handler to null so the next ListenerRunner
+         // won't run
          if (handler != null)
          {
             setMessageHandler(null);
          }
-         
-         //Now we wait for any current handler runners to run.
-         waitForOnMessageToComplete();   
-         
-         //TODO sort out these close and closing flags
-         
+
+         // Now we wait for any current handler runners to run.
+         waitForOnMessageToComplete();
+
+         // TODO sort out these close and closing flags
+
          synchronized (mainLock)
-         {                     
+         {
             closed = true;
-                       
+
             if (receiverThread != null)
-            {            
+            {
                // Wake up any receive() thread that might be waiting
                mainLock.notify();
-            }   
-            
+            }
+
             this.handler = null;
          }
-         
+
          remotingConnection.send(id, new CloseMessage());
-         
+
          PacketDispatcher.client.unregister(id);
-                                      
-         if (trace) { log.trace(this + " closed"); }
-                  
+
+         if (trace)
+         {
+            log.trace(this + " closed");
+         }
+
       }
       finally
       {
@@ -285,81 +274,92 @@
       return closed;
    }
 
-   // ClientConsumerInternal implementation --------------------------------------------------------------
-   
+   // ClientConsumerInternal implementation
+   // --------------------------------------------------------------
+
    public String getID()
    {
       return id;
    }
-      
+
    public void changeRate(float newRate) throws MessagingException
    {
       checkClosed();
-      
+
       remotingConnection.send(id, new ConsumerChangeRateMessage(newRate), true);
    }
-   
+
    public void handleMessage(final DeliverMessage message) throws Exception
    {
       synchronized (mainLock)
       {
          if (closed)
          {
-            //This is ok - we just ignore the message
+            // This is ok - we just ignore the message
             return;
          }
-                  
+
          if (ignoreDeliveryMark >= 0)
          {
             long delID = message.getDeliveryID();
-            
+
             if (delID > ignoreDeliveryMark)
             {
-               //Ignore - the session is recovering and these are inflight messages
+               // Ignore - the session is recovering and these are inflight
+               // messages
                return;
             }
             else
             {
-               //We have hit the begining of the recovered messages - we can stop ignoring
+               // We have hit the begining of the recovered messages - we can
+               // stop ignoring
                ignoreDeliveryMark = -1;
-            }            
+            }
          }
-         
-         //Add it to the buffer    
+
+         // Add it to the buffer
          Message coreMessage = message.getMessage();
-         
+
          coreMessage.setDeliveryCount(message.getDeliveryCount());
-         
+
          buffer.addLast(message, coreMessage.getPriority());
 
-         if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+         if (trace)
+         {
+            log.trace(this + " added message(s) to the buffer are now "
+                  + buffer.size() + " messages");
+         }
 
          messageAdded();
       }
    }
-   
+
    public void recover(long lastDeliveryID)
    {
       synchronized (mainLock)
       {
          ignoreDeliveryMark = lastDeliveryID;
-         
+
          buffer.clear();
-      }            
+      }
    }
-   
-   // Public ---------------------------------------------------------------------------------------
-     
-   // Package protected ----------------------------------------------------------------------------
-   
-   // Protected ------------------------------------------------------------------------------------
-            
-   // Private --------------------------------------------------------------------------------------
-   
+
+   // Public
+   // ---------------------------------------------------------------------------------------
+
+   // Package protected
+   // ----------------------------------------------------------------------------
+
+   // Protected
+   // ------------------------------------------------------------------------------------
+
+   // Private
+   // --------------------------------------------------------------------------------------
+
    private void checkSendChangeRate() throws MessagingException
    {
       consumeCount++;
-      
+
       if (consumeCount == bufferSize)
       {
          consumeCount = 0;
@@ -367,21 +367,24 @@
          changeRate(1.0f);
       }
    }
-     
+
    private void waitForOnMessageToComplete()
    {
       // Wait for any onMessage() executions to complete
 
       if (Thread.currentThread().equals(sessionExecutor.getThread()))
       {
-         // the current thread already closing this ClientConsumer (this happens when the
-         // session is closed from within the MessageListener.onMessage(), for example), so no need
-         // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+         // the current thread already closing this ClientConsumer (this happens
+         // when the
+         // session is closed from within the MessageListener.onMessage(), for
+         // example), so no need
+         // to register another Closer (see
+         // http://jira.jboss.org/jira/browse/JBMESSAGING-542)
          return;
       }
 
       Future result = new Future();
-      
+
       try
       {
          sessionExecutor.execute(new Closer(result));
@@ -389,7 +392,7 @@
          result.getResult();
       }
       catch (InterruptedException e)
-      {         
+      {
       }
    }
 
@@ -400,138 +403,74 @@
          this.sessionExecutor.execute(runner);
       }
       catch (InterruptedException e)
-      {         
+      {
       }
    }
-   
+
    private void messageAdded()
    {
       boolean notified = false;
-      
-      if (trace) { log.trace("Receiver thread:" + receiverThread + " handler:" + handler + " listenerRunning:" + listenerRunning + 
-            " sessionExecutor:" + sessionExecutor); }
-      
+
+      if (trace)
+      {
+         log.trace("Receiver thread:" + receiverThread + " handler:" + handler
+               + " listenerRunning:" + listenerRunning + " sessionExecutor:"
+               + sessionExecutor);
+      }
+
       // If we have a thread waiting on receive() we notify it
       if (receiverThread != null)
       {
-         if (trace) { log.trace(this + " notifying receiver/waiter thread"); }   
-         
+         if (trace)
+         {
+            log.trace(this + " notifying receiver/waiter thread");
+         }
+
          mainLock.notifyAll();
-         
+
          notified = true;
-      }     
+      }
       else if (handler != null)
-      { 
+      {
          // We have a message handler
          if (!listenerRunning)
          {
             listenerRunning = true;
 
-            if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
-            
+            if (trace)
+            {
+               log.trace(this + " scheduled a new ListenerRunner");
+            }
+
             this.queueRunner(new ListenerRunner());
-         }     
-         
-         //TODO - Execute onMessage on same thread for even better throughput 
+         }
+
+         // TODO - Execute onMessage on same thread for even better throughput
       }
-      
+
       // Make sure we notify any thread waiting for last delivery
       if (!notified)
       {
-         if (trace) { log.trace("Notifying"); }
-         
+         if (trace)
+         {
+            log.trace("Notifying");
+         }
+
          mainLock.notifyAll();
       }
    }
-   
-   private long waitOnLock(Object lock, long waitTime) throws InterruptedException
-   {
-      long start = System.currentTimeMillis();
-      
-      // Wait for last message to arrive
-      lock.wait(waitTime);
-     
-      long waited = System.currentTimeMillis() - start;
-      
-      if (waited < waitTime)
-      {
-         waitTime = waitTime - waited;
-         
-         return waitTime;
-      }
-      else
-      {
-         return 0;
-      }     
-   }
-        
-   private DeliverMessage getMessage(long timeout) throws MessagingException
-   {
-      if (timeout == -1)
-      {
-         // receiveNoWait so don't wait
-      }
-      else
-      {         
-         try
-         {         
-            if (timeout == 0)
-            {
-               // wait for ever potentially
-               while (!closed && buffer.isEmpty())
-               {
-                  if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
 
-                  mainLock.wait();
+  
 
-                  if (trace) { log.trace(this + " done waiting on main lock"); }
-               }
-            }
-            else
-            {
-               // wait with timeout
-               long toWait = timeout;
-             
-               while (!closed && buffer.isEmpty() && toWait > 0)
-               {
-                  if (trace) { log.trace(this + " waiting on main lock, timeout " + toWait + " ms"); }
-
-                  toWait = waitOnLock(mainLock, toWait);
-
-                  if (trace) { log.trace(this + " done waiting on lock, buffer is " + (buffer.isEmpty() ? "" : "NOT ") + "empty"); }
-               }
-            }
-         }
-         catch (InterruptedException e)
-         {
-            if (trace) { log.trace("InterruptedException, " + this + ".getMessage() returning null"); }
-            return null;
-         } 
-      }
-
-      DeliverMessage m = null;
-             
-      if (!closed && !buffer.isEmpty())
-      {
-         m = buffer.removeFirst();
-         
-         checkSendChangeRate();
-      }
-
-      return m;
-   }
-   
    private void checkClosed() throws MessagingException
    {
-      if (closed)
-      {
-         throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
-      }
+      if (closed) { throw new MessagingException(
+            MessagingException.OBJECT_CLOSED, "Consumer is closed"); }
    }
 
-   
-   // Inner classes --------------------------------------------------------------------------------
-         
+   // Inner classes
+   // --------------------------------------------------------------------------------
+
    /*
     * This class is used to put on the handler executor to wait for onMessage
     * invocations to complete when closing
@@ -539,100 +478,118 @@
    private class Closer implements Runnable
    {
       Future result;
-      
+
       Closer(Future result)
       {
          this.result = result;
       }
-      
+
       public void run()
       {
          result.setResult(null);
       }
    }
-   
+
    /*
     * This class handles the execution of onMessage methods
     */
    private class ListenerRunner implements Runnable
    {
       public void run()
-      {         
+      {
          try
          {
             DeliverMessage msg = null;
-            
+
             MessageHandler theListener = null;
-            
+
             synchronized (mainLock)
             {
                if (handler == null || buffer.isEmpty())
                {
                   listenerRunning = false;
-                  
-                  if (trace) { log.trace("no handler or buffer is empty, returning"); }
-                  
+
+                  if (trace)
+                  {
+                     log.trace("no handler or buffer is empty, returning");
+                  }
+
                   return;
                }
-               
+
                theListener = handler;
-               
+
                // remove a message from the buffer
-   
-               msg = buffer.removeFirst();                
-               
+
+               msg = buffer.removeFirst();
+
                checkSendChangeRate();
             }
-            
+
             /*
-             * Bug here is as follows:
-             * The next runner gets scheduled BEFORE the on message is executed
-             * so if the onmessage fails on acking it will be put on hold
-             * and failover will kick in, this will clear the executor
-             * so the next queud one disappears at everything grinds to a halt
+             * Bug here is as follows: The next runner gets scheduled BEFORE the
+             * on message is executed so if the onmessage fails on acking it
+             * will be put on hold and failover will kick in, this will clear
+             * the executor so the next queud one disappears at everything
+             * grinds to a halt
              * 
-             * Solution - don't use a session executor - have a session thread instead much nicer
+             * Solution - don't use a session executor - have a session thread
+             * instead much nicer
              */
-                                   
+
             if (msg != null)
-            {     
+            {
                boolean expired = msg.getMessage().isExpired();
-                           
+
                session.delivered(msg.getDeliveryID(), expired);
-               
+
                if (!expired)
                {
                   theListener.onMessage(msg.getMessage());
                }
             }
-            
+
             synchronized (mainLock)
             {
                if (!buffer.isEmpty())
                {
-                  //Queue up the next runner to run
-                  
-                  if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
-                  
+                  // Queue up the next runner to run
+
+                  if (trace)
+                  {
+                     log
+                           .trace("More messages in buffer so queueing next onMessage to run");
+                  }
+
                   queueRunner(this);
-                  
-                  if (trace) { log.trace("Queued next onMessage to run"); }
+
+                  if (trace)
+                  {
+                     log.trace("Queued next onMessage to run");
+                  }
                }
                else
                {
-                  if (trace) { log.trace("no more messages in buffer, marking handler as not running"); }
-                  
-                  listenerRunning  = false;
-               }   
+                  if (trace)
+                  {
+                     log
+                           .trace("no more messages in buffer, marking handler as not running");
+                  }
+
+                  listenerRunning = false;
+               }
             }
-                     
-            if (trace) { log.trace("Exiting run()"); }
+
+            if (trace)
+            {
+               log.trace("Exiting run()");
+            }
          }
          catch (MessagingException e)
          {
             log.error("Failure in ListenerRunner", e);
          }
-      }      
-   }   
-  
+      }
+   }
+
 }

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2008-02-08 03:54:31 UTC (rev 3679)
+++ trunk/tests/build.xml	2008-02-08 07:27:04 UTC (rev 3680)
@@ -508,6 +508,9 @@
 	       <exclude name="**/jms/ClientExitTest.class"/>
 	       <exclude name="**/jms/ConnectionConsumerTest.class"/>
 	       
+	       
+	       <exclude name="**/messaging/core/remoting/impl/**/*Test.class"/>
+	       
                <include name="**/messaging/util/**/${test-mask}.class"/>
                <exclude name="**/jms/MemLeakTest.class"/>
                <exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>

Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java	2008-02-08 03:54:31 UTC (rev 3679)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java	2008-02-08 07:27:04 UTC (rev 3680)
@@ -290,7 +290,9 @@
       receiverThread.start();
 
       Thread.sleep(3000);
-      receiverThread.interrupt();
+      //receiverThread.interrupt();
+      
+      queueConsumer.close();
 
       // wait for the reading thread to conclude
       latch.acquire();




More information about the jboss-cvs-commits mailing list