[jboss-cvs] JBoss Messaging SVN: r4411 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 9 12:53:41 EDT 2008


Author: timfox
Date: 2008-06-09 12:53:40 -0400 (Mon, 09 Jun 2008)
New Revision: 4411

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
   trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
Log:
ClientConnectionFactory test


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -32,27 +32,150 @@
  */
 public interface ClientConnectionFactory
 {         
+   /**
+    * Create a connection using null user and password
+    *
+    * @return The connection
+    * @throws MessagingException
+    */
    ClientConnection createConnection() throws MessagingException;
    
+   /**
+    * Create a connection specifying username and password
+    * 
+    * @param username The username
+    * @param password The password
+    * @return The connection
+    * @throws MessagingException
+    */
    ClientConnection createConnection(String username, String password) throws MessagingException;   
    
+   
+   /**
+    * Set the default consumer window size value to use for consumers created from this connection factory.
+    *     
+    * @param the window size, measured in bytes.
+    * A value of -1 signifies that consumer flow control is disabled.
+    */
    void setDefaultConsumerWindowSize(int size);
    
+   /**
+    * Get the default consumer window size value to use for consumers created from this connection factory.
+    * @param size
+    * 
+    * JBoss Messaging implements credit based consumer flow control, this value determines the initial pot of credits
+    * the server has for the consumer. The server can only send messages to the consumer as long as it has
+    * sufficient credits.
+    * 
+    * @return The default window size, measured in bytes.
+    * A value of -1 signifies that consumer flow control is disabled.
+    */
    int getDefaultConsumerWindowSize();
    
+   /**
+    * Set the default producer window size value to use for producers created from this connection factory.
+    * 
+    * @param the window size, measured in bytes.
+    * A value of -1 signifies that producer flow control is disabled.
+    */
    void setDefaultProducerWindowSize(int size);     
    
+   /**
+    * Get the default consumer window size value to use for consumers created from this connection factory.
+    * 
+    * JBoss Messaging implements credit based consumer flow control, this value determines the initial pot of credits
+    * the server has for the consumer. The server can only send messages to the consumer as long as it has
+    * sufficient credits.
+    * 
+    * @return The default window size, measured in bytes.
+    * A value of -1 signifies that consumer flow control is disabled.
+    */
    int getDefaultProducerWindowSize();
    
+   /**
+    * Set the default consumer maximum consume rate for consumers created from this connection factory.
+    * @param rate- the maximum consume rate, measured in messages / second
+    * A value of -1 signifies there is no maximum rate limit
+    */
    void setDefaultConsumerMaxRate(int rate);
    
+   /**
+    * Get the default consumer maximum consume rate for consumers created from this connection factory.
+    * @return the maximum consume rate, measured in messages / second
+    * A value of -1 signifies there is no maximum rate limit
+    */
    int getDefaultConsumerMaxRate();
    
+   /**
+    * Set the default producer maximum send rate for producers created from this connection factory.
+    * @param rate- the maximum send rate, measured in messages / second
+    * A value of -1 signifies there is no maximum rate limit
+    */
    void setDefaultProducerMaxRate(int rate);
    
+   /**
+    * Get the default producer maximum send rate for producers created from this connection factory.
+    * @return the maximum send rate, measured in messages / second
+    * A value of -1 signifies there is no maximum rate limit
+    */
    int getDefaultProducerMaxRate();
    
+   /**
+    * Get the default value of whether producers created from this connection factory will send persistent messages
+    * blocking.
+    * @return Whether persistent messages are sent blocking
+    */
+   boolean isDefaultBlockOnPersistentSend();
+   
+   /**
+    * Set the default value of whether producers created from this connection factory will send persistent messages
+    * blocking.
+    * @param blocking Whether persistent messages are sent blocking
+    */
+   void setDefaultBlockOnPersistentSend(final boolean blocking);
+   
+   /**
+    * Get the default value of whether producers created from this connection factory will send non persistent messages
+    * blocking.
+    * @return Whether non persistent messages are sent blocking
+    */
+   boolean isDefaultBlockOnNonPersistentSend();
+   
+   /**
+    * Set the default value of whether producers created from this connection factory will send non persistent messages
+    * blocking.
+    * @param blocking Whether non persistent messages are sent blocking
+    */
+   void setDefaultBlockOnNonPersistentSend(final boolean blocking);
+   
+   /**
+    * Get the default value of whether producers created from this connection factory will send acknowledgements
+    * blocking
+    * @return Whether acknowledgements are sent blocking
+    */
+   boolean isDefaultBlockOnAcknowledge();
+   
+   /**
+    * Set the default value of whether producers created from this connection factory will send acknowledgements
+    * blockiong
+    * @param blocking Whether acknowledgements are sent blocking
+    */
+   void setDefaultBlockOnAcknowledge(final boolean blocking);
+   
+   /**
+    * Get the location of the server for this connection factory
+    * @return The location
+    */
    Location getLocation();
    
+   /**
+    * Get the connection params used when creating connections using this connection factory
+    */
    ConnectionParams getConnectionParams();
+   
+   /**
+    * Set the connection params to be used when creating connections using this connection factory
+    * @param params
+    */
+   void setConnectionParams(ConnectionParams connectionParams);
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -24,6 +24,7 @@
 import org.jboss.messaging.core.client.ClientBrowser;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -21,16 +21,15 @@
  */
 package org.jboss.messaging.core.client.impl;
 
-import java.io.Serializable;
-
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.RemotingConnectionFactory;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionFactoryImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.version.Version;
@@ -62,9 +61,11 @@
 
    // Attributes -----------------------------------------------------------------------------------
    
+   private final RemotingConnectionFactory remotingConnectionFactory;
+      
    private final Location location;
 
-   private final ConnectionParams connectionParams;
+   private ConnectionParams connectionParams;
  
    private int defaultConsumerWindowSize;
    
@@ -76,55 +77,90 @@
    
    private boolean defaultBlockOnAcknowledge;
    
-   private final boolean defaultSendNonPersistentMessagesBlocking;
+   private boolean defaultBlockOnPersistentSend;
    
-   private final boolean defaultSendPersistentMessagesBlocking;
+   private boolean defaultBlockOnNonPersistentSend;
    
    
+      
    // Static ---------------------------------------------------------------------------------------
+   
+   public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
+   
+   public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
+   
+   public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
+   
+   public static final int DEFAULT_PRODUCER_MAX_RATE = -1;
+   
+   public static final boolean DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
+   
+   public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
+   
+   public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
     
    // Constructors ---------------------------------------------------------------------------------
 
-   public ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams,
+   /**
+    * Create a ClientConnectionFactoryImpl specifying all attributes
+    */
+   public ClientConnectionFactoryImpl(final RemotingConnectionFactory remotingConnectionFactory,
+                                      final Location location, final ConnectionParams connectionParams,
                                       final int defaultConsumerWindowSize, final int defaultConsumerMaxRate,
                                       final int defaultProducerWindowSize, final int defaultProducerMaxRate,
                                       final boolean defaultBlockOnAcknowledge,
                                       final boolean defaultSendNonPersistentMessagesBlocking,
                                       final boolean defaultSendPersistentMessagesBlocking)
    {
+      this.remotingConnectionFactory = remotingConnectionFactory;
       this.location = location;
       this.defaultConsumerWindowSize = defaultConsumerWindowSize;  
       this.defaultConsumerMaxRate = defaultConsumerMaxRate;
       this.defaultProducerWindowSize = defaultProducerWindowSize;
       this.defaultProducerMaxRate = defaultProducerMaxRate;
       this.defaultBlockOnAcknowledge = defaultBlockOnAcknowledge;
-      this.defaultSendNonPersistentMessagesBlocking = defaultSendNonPersistentMessagesBlocking;
-      this.defaultSendPersistentMessagesBlocking = defaultSendPersistentMessagesBlocking;
+      this.defaultBlockOnNonPersistentSend = defaultSendNonPersistentMessagesBlocking;
+      this.defaultBlockOnPersistentSend = defaultSendPersistentMessagesBlocking;
       this.connectionParams = connectionParams;
    }
    
+   /**
+    * Create a ClientConnectionFactoryImpl specify location and using all default attribute values
+    * @param location the location of the server
+    */
    public ClientConnectionFactoryImpl(final Location location)
    {
-      this(location, new ConnectionParamsImpl(), false);
+      this(location, new ConnectionParamsImpl(), false);   
    }
 
+   /**
+    * Create a ClientConnectionFactoryImpl specify location and connection params and using all other default attribute values
+    * @param location the location of the server
+    * @param connectionParams the connection parameters
+    */
    public ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams)
    {
       this(location, connectionParams, false);
    }
    
-   protected ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams, final boolean dummy)
+   
+   private ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams,
+                                       final boolean dummy)
    {
-      this.defaultConsumerWindowSize = 1024 * 1024;      
-      this.defaultConsumerMaxRate = -1;
-      this.defaultProducerWindowSize = 1024 * 1024;
-      this.defaultProducerMaxRate = -1;
+      defaultConsumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
+      defaultConsumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
+      defaultProducerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
+      defaultProducerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
+      defaultBlockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+      defaultBlockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+      defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
       this.location = location;
-      this.defaultSendNonPersistentMessagesBlocking = false;
-      this.defaultSendPersistentMessagesBlocking = false;
       this.connectionParams = connectionParams;
+      this.remotingConnectionFactory = new RemotingConnectionFactoryImpl();
    }
-   
+         
+   // ClientConnectionFactory implementation ---------------------------------------------
+
    public ClientConnection createConnection() throws MessagingException
    {
       return createConnection(null, null);
@@ -137,8 +173,9 @@
       RemotingConnection remotingConnection = null;
       try
       {
-         remotingConnection = new RemotingConnectionImpl(location, connectionParams);
+         remotingConnection = remotingConnectionFactory.createRemotingConnection(location, connectionParams);
        
+         log.info("calling start");
          remotingConnection.start();
          
          long sessionID = remotingConnection.getSessionID();
@@ -152,8 +189,8 @@
          return new ClientConnectionImpl(response.getConnectionTargetID(), remotingConnection,
                defaultConsumerWindowSize, defaultConsumerMaxRate,
                defaultProducerWindowSize, defaultProducerMaxRate,
-               defaultBlockOnAcknowledge, defaultSendNonPersistentMessagesBlocking,
-               defaultSendPersistentMessagesBlocking, response.getServerVersion());
+               defaultBlockOnAcknowledge, defaultBlockOnNonPersistentSend,
+               defaultBlockOnPersistentSend, response.getServerVersion());
       }
       catch (Throwable t)
       {
@@ -170,6 +207,7 @@
          
          if (t instanceof MessagingException)
          {
+            log.info("got messaging excetption");
             throw (MessagingException)t;
          }
          else
@@ -183,8 +221,6 @@
       }
    }
    
-   // ClientConnectionFactory implementation ---------------------------------------------
-
 	public int getDefaultConsumerWindowSize()
 	{
 		return defaultConsumerWindowSize;
@@ -194,41 +230,76 @@
    {
       defaultConsumerWindowSize = size;
    }
-
+	
 	public int getDefaultProducerWindowSize()
 	{
 		return defaultProducerWindowSize;
 	}
-			
+				
 	public void setDefaultProducerWindowSize(final int size)
    {
       defaultProducerWindowSize = size;
    }
-
+	
 	public int getDefaultProducerMaxRate()
 	{
 		return defaultProducerMaxRate;
 	}
-	
+		
 	public void setDefaultProducerMaxRate(final int rate)
 	{
 	   this.defaultProducerMaxRate = rate;
 	}
-	
+		
 	public int getDefaultConsumerMaxRate()
    {
       return defaultConsumerMaxRate;
    }
-   
+   	
    public void setDefaultConsumerMaxRate(final int rate)
    {
       this.defaultConsumerMaxRate = rate;
    }
    
+   public boolean isDefaultBlockOnPersistentSend()
+   {
+      return defaultBlockOnPersistentSend;
+   }
+   
+   public void setDefaultBlockOnPersistentSend(final boolean blocking)
+   {
+      defaultBlockOnPersistentSend = blocking;
+   }
+   
+   public boolean isDefaultBlockOnNonPersistentSend()
+   {
+      return defaultBlockOnNonPersistentSend;
+   }
+   
+   public void setDefaultBlockOnNonPersistentSend(final boolean blocking)
+   {
+      defaultBlockOnNonPersistentSend = blocking;
+   }
+   
+   public boolean isDefaultBlockOnAcknowledge()
+   {
+      return this.defaultBlockOnAcknowledge;
+   }
+   
+   public void setDefaultBlockOnAcknowledge(final boolean blocking)
+   {
+      defaultBlockOnAcknowledge = blocking;
+   }
+            
    public ConnectionParams getConnectionParams()
    {
       return connectionParams;
    }
+    
+   public void setConnectionParams(final ConnectionParams params)
+   {
+      this.connectionParams = params;
+   }
    
    public Location getLocation()
    {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -28,6 +28,7 @@
 import org.jboss.messaging.core.client.RemotingSessionListener;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -8,6 +8,7 @@
 
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 
 /**
  * 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.ProducerSendMessage;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiter;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -39,6 +39,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -1,36 +1,35 @@
 /*
-   * JBoss, Home of Professional Open Source
-   * Copyright 2005, JBoss Inc., and individual contributors as indicated
-   * by the @authors tag. See the copyright.txt in the distribution for a
-   * full listing of individual contributors.
-   *
-   * This is free software; you can redistribute it and/or modify it
-   * under the terms of the GNU Lesser General Public License as
-   * published by the Free Software Foundation; either version 2.1 of
-   * the License, or (at your option) any later version.
-   *
-   * This software is distributed in the hope that it will be useful,
-   * but WITHOUT ANY WARRANTY; without even the implied warranty of
-   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   * Lesser General Public License for more details.
-   *
-   * You should have received a copy of the GNU Lesser General Public
-   * License along with this software; if not, write to the Free
-   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-   */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.messaging.core.client.impl;
 
 import org.jboss.messaging.core.client.ConnectionParams;
 
-import java.io.Serializable;
-
 /**
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
 public class ConnectionParamsImpl implements ConnectionParams
 {
-
+   private static final long serialVersionUID = 1662480686951551534L;
+   
    protected long timeout = DEFAULT_REQRES_TIMEOUT;
    protected long keepAliveInterval = DEFAULT_KEEP_ALIVE_INTERVAL;
    protected long keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
@@ -48,7 +47,7 @@
    protected long writeQueueBlockTimeout = 10000;
    protected long writeQueueMinBytes = 32 * 1024L;
    protected long writeQueueMaxBytes = 64 * 1024L;
-
+   
    public long getTimeout()
    {
       return timeout;
@@ -128,7 +127,7 @@
    {
       this.tcpSendBufferSize = tcpSendBufferSize;
    }
-   
+
    public long getWriteQueueBlockTimeout()
    {
       return writeQueueBlockTimeout;
@@ -143,7 +142,7 @@
    {
       return writeQueueMinBytes;
    }
-   
+
    public void setWriteQueueBlockTimeout(final long timeout)
    {
       this.writeQueueBlockTimeout = timeout;
@@ -224,7 +223,7 @@
       this.trustStorePassword = trustStorePassword;
    }
 
-    public String getURI()
+   public String getURI()
    {
       StringBuffer buff = new StringBuffer();
       //buff.append(transport + "://" + host + ":" + port);
@@ -240,4 +239,28 @@
       buff.append("&").append("trustStorePath=").append(trustStorePath);
       return buff.toString();
    }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof ConnectionParams == false)
+      {
+         return false;
+      }
+      
+      ConnectionParams cp = (ConnectionParams)other;
+      
+      return cp.getTimeout() == timeout &&
+             cp.getKeepAliveTimeout() == this.keepAliveTimeout &&
+             cp.getKeepAliveInterval() == this.keepAliveInterval &&
+             cp.isInvmDisabled() == this.isInvmDisabled() &&
+             cp.isInvmDisabledModified() == this.isInvmDisabledModified() &&
+             cp.isTcpNoDelay() == this.isTcpNoDelay() &&
+             cp.getTcpReceiveBufferSize() == this.getTcpReceiveBufferSize() &&
+             cp.getTcpSendBufferSize() == this.getTcpSendBufferSize() &&
+             cp.isSSLEnabled() == this.isSSLEnabled() &&
+             cp.isSSLEnabledModified() == this.isSSLEnabledModified() &&
+             cp.getWriteQueueBlockTimeout() == this.getWriteQueueBlockTimeout() &&
+             cp.getWriteQueueMinBytes() == this.getWriteQueueMinBytes() &&
+             cp.getWriteQueueMaxBytes() == this.getWriteQueueMaxBytes();
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -33,18 +33,18 @@
 {
 	private static final long serialVersionUID = -1101852656621257742L;
 	
-	protected TransportType transport;
-   protected String host;
-   protected int port = ConfigurationImpl.DEFAULT_REMOTING_PORT;
+	private TransportType transport;
+   private String host;
+   private int port = ConfigurationImpl.DEFAULT_REMOTING_PORT;
    private int serverID;
    
-   public LocationImpl(int serverID)
+   public LocationImpl(final int serverID)
    {
       this.transport = TransportType.INVM;
       this.serverID = serverID;
    }
 
-   public LocationImpl(TransportType transport, String host, int port)
+   public LocationImpl(final TransportType transport, final String host, final int port)
    {
       assert host != null;
       assert port > 0;
@@ -55,6 +55,11 @@
       this.host = host;
       this.port = port;
    }
+   
+   public LocationImpl(final TransportType transport, final String host)
+   {
+      this(transport, host, ConfigurationImpl.DEFAULT_REMOTING_PORT);
+   }
 
    public String getLocation()
    {

Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -1,33 +0,0 @@
-package org.jboss.messaging.core.client.impl;
-
-import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-
-/**
- * 
- * A RemotingConnection
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface RemotingConnection
-{
-	public void start() throws Throwable;
-
-   public void stop();
-   
-   public long getSessionID();
- 
-   Packet sendBlocking(long targetID, long executorID, Packet packet) throws MessagingException;
-   
-   void sendOneWay(long targetID, long executorID, Packet packet) throws MessagingException;
-   
-   void setRemotingSessionListener(RemotingSessionListener newListener);
-   
-   PacketDispatcher getPacketDispatcher();
-   
-   public Location getLocation();
-}

Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -1,322 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.core.client.impl;
-
-import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
-
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.client.ConnectionParams;
-import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.NIOConnector;
-import org.jboss.messaging.core.remoting.NIOSession;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-
-/**
- * 
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class RemotingConnectionImpl implements RemotingConnection
-{
-   // Constants ------------------------------------------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
-   
-   // Static ---------------------------------------------------------------------------------------
-
-   // Attributes -----------------------------------------------------------------------------------
-
-   private final Location location;
-
-   private final ConnectionParams connectionParams;
-
-   private NIOConnector connector;
-   
-   private NIOSession session;
-   
-   private RemotingSessionListener listener;
-
-   //private transient PacketDispatcher dispatcher;
-   
-   // Constructors ---------------------------------------------------------------------------------
-
-   public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams) throws Exception
-   {
-      assert location != null;
-      assert connectionParams != null;
-      
-      this.location = location;
-      this.connectionParams = connectionParams;
-      
-      log.trace(this + " created with configuration " + location);
-   }
-
-   public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams, NIOConnector nioConnector) throws Exception
-   {
-      assert location != null;
-      assert connectionParams != null;
-
-      this.location = location;
-      this.connectionParams = connectionParams;
-      connector = nioConnector;
-      session = connector.connect();
-      log.trace(this + " created with connector " + nioConnector);
-   }
-
-   // Public ---------------------------------------------------------------------------------------
-
-   // RemotingConnection implementation ------------------------------------------------------------
-   
-   public void start() throws Throwable
-   {
-      if (log.isTraceEnabled()) { log.trace(this + " started remoting connection"); }
-
-      connector = REGISTRY.getConnector(location, connectionParams);
-      session = connector.connect();
-
-      if (log.isDebugEnabled())
-         log.debug("Using " + connector + " to connect to " + location);
-
-      log.trace(this + " started");
-   }
-
-
-   public void stop()
-   {
-      log.trace(this + " stop");
-
-      try
-      {
-         if (connector != null)
-         { 
-            if (listener != null)
-               connector.removeSessionListener(listener);
-            NIOConnector connectorFromRegistry = REGISTRY.removeConnector(location);
-            if (connectorFromRegistry != null)
-               connectorFromRegistry.disconnect();
-         }
-      }
-      catch (Throwable ignore)
-      {        
-         log.trace(this + " failed to disconnect the new client", ignore);
-      }
-      
-      connector = null;
-      
-      log.trace(this + " closed");
-   }
-
-   public long getSessionID()
-   {
-      if (session == null || !session.isConnected())
-      {
-         return -1;
-      }
-      return session.getID();
-   }
-    
-   /**
-    * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
-    */
-   public Packet sendBlocking(final long targetID, final long executorID, final Packet packet) throws MessagingException
-   {
-      checkConnected();
-      
-      long handlerID = connector.getDispatcher().generateID();
-      
-      ResponseHandler handler = new ResponseHandler(handlerID);
-      
-      connector.getDispatcher().register(handler);
-      
-      try
-      {  
-         packet.setTargetID(targetID);
-         packet.setExecutorID(executorID);
-         packet.setResponseTargetID(handlerID);
-            
-         try
-         {
-            session.write(packet);
-         }
-         catch (Exception e)
-         {
-            log.error("Caught unexpected exception", e);
-            
-            throw new MessagingException(MessagingException.INTERNAL_ERROR);
-         }
-         
-         Packet response = handler.waitForResponse(connectionParams.getTimeout());
-         
-         if (response == null)
-         {
-            throw new IllegalStateException("No response received for " + packet);
-         }
-         
-         if (response instanceof MessagingExceptionMessage)
-         {
-            MessagingExceptionMessage message = (MessagingExceptionMessage) response;
-            
-            throw message.getException();
-         }
-         else
-         {
-            return response;
-         } 
-      }
-      finally
-      {
-         connector.getDispatcher().unregister(handlerID);
-      }           
-   }
-   
-   public void sendOneWay(final long targetID, final long executorID, final Packet packet) throws MessagingException
-   {
-      assert packet != null;
-
-      packet.setTargetID(targetID);
-      packet.setExecutorID(executorID);
-      
-      try
-      {
-         session.write(packet);
-      }
-      catch (Exception e)
-      {
-         log.error("Caught unexpected exception", e);
-         
-         throw new MessagingException(MessagingException.INTERNAL_ERROR);
-      }
-   }
-   
-   public synchronized void setRemotingSessionListener(final RemotingSessionListener newListener)
-   {
-      if (listener != null && newListener != null)
-      {
-         throw new IllegalStateException("FailureListener already set to " + listener);
-      }
-
-      if (newListener != null)
-      {
-         connector.addSessionListener(newListener);
-      }
-      else 
-      {
-         connector.removeSessionListener(listener);
-      }
-      this.listener = newListener;
-   }
-   
-   public PacketDispatcher getPacketDispatcher()
-   {
-      return connector.getDispatcher();
-   }
-   
-   public Location getLocation()
-   {
-   	return location;
-   }
-
-   // Package protected ----------------------------------------------------------------------------
-
-   // Protected ------------------------------------------------------------------------------------
-
-   // Private --------------------------------------------------------------------------------------
-      
-   private static class ResponseHandler implements PacketHandler
-   {
-      private long id;
-      
-      private Packet response;
-      
-      ResponseHandler(final long id)
-      {
-         this.id = id;
-      }
-
-      public long getID()
-      {
-         return id;
-      }
-
-      public synchronized void handle(final Packet packet, final PacketReturner sender)
-      {
-         this.response = packet;
-         
-         notify();
-      }
-      
-      public synchronized Packet waitForResponse(final long timeout)
-      {
-         long toWait = timeout;
-         long start = System.currentTimeMillis();
-
-         while (response == null && toWait > 0)
-         {
-            try
-            {
-               wait(toWait);
-            }
-            catch (InterruptedException e)
-            {
-            }
-            
-            long now = System.currentTimeMillis();
-            
-            toWait -= now - start;
-            
-            start = now;
-         }
-         
-         return response;         
-      }
-      
-   }
-
-   private void checkConnected() throws MessagingException
-   {
-      if (session == null)
-      {
-         throw new IllegalStateException("Client " + this
-               + " is not connected.");
-      }
-      if (!session.isConnected())
-      {
-         throw new MessagingException(MessagingException.NOT_CONNECTED);
-      }
-   }
-}

Copied: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java (from rev 4410, trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,31 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.exception.MessagingException;
+
+/**
+ * 
+ * A RemotingConnection
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RemotingConnection
+{
+	public void start() throws Throwable;
+
+   public void stop();
+   
+   public long getSessionID();
+ 
+   Packet sendBlocking(long targetID, long executorID, Packet packet) throws MessagingException;
+   
+   void sendOneWay(long targetID, long executorID, Packet packet) throws MessagingException;
+   
+   void setRemotingSessionListener(RemotingSessionListener newListener);
+   
+   PacketDispatcher getPacketDispatcher();
+   
+   public Location getLocation();
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+
+/**
+ * 
+ * A RemotingConnectionFactory
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RemotingConnectionFactory
+{
+   RemotingConnection createRemotingConnection(final Location location, final ConnectionParams params)
+      throws Exception;
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.RemotingConnectionFactory;
+
+/**
+ * 
+ * A RemotingConnectionFactoryImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RemotingConnectionFactoryImpl implements RemotingConnectionFactory
+{
+
+   public RemotingConnection createRemotingConnection(final Location location,
+         final ConnectionParams params) throws Exception
+   {
+      return new RemotingConnectionImpl(location, params);
+   }
+
+}

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java (from rev 4410, trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,321 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+
+/**
+ * 
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class RemotingConnectionImpl implements RemotingConnection
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
+   
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private final Location location;
+
+   private final ConnectionParams connectionParams;
+
+   private NIOConnector connector;
+   
+   private NIOSession session;
+   
+   private RemotingSessionListener listener;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams) throws Exception
+   {
+      assert location != null;
+      assert connectionParams != null;
+      
+      this.location = location;
+      this.connectionParams = connectionParams;
+      
+      log.trace(this + " created with configuration " + location);
+   }
+
+   public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams, NIOConnector nioConnector) throws Exception
+   {
+      assert location != null;
+      assert connectionParams != null;
+
+      this.location = location;
+      this.connectionParams = connectionParams;
+      connector = nioConnector;
+      session = connector.connect();
+      log.trace(this + " created with connector " + nioConnector);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   // RemotingConnection implementation ------------------------------------------------------------
+   
+   public void start() throws Throwable
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " started remoting connection"); }
+
+      connector = REGISTRY.getConnector(location, connectionParams);
+      session = connector.connect();
+
+      if (log.isDebugEnabled())
+         log.debug("Using " + connector + " to connect to " + location);
+
+      log.trace(this + " started");
+   }
+
+
+   public void stop()
+   {
+      log.trace(this + " stop");
+
+      try
+      {
+         if (connector != null)
+         { 
+            if (listener != null)
+               connector.removeSessionListener(listener);
+            NIOConnector connectorFromRegistry = REGISTRY.removeConnector(location);
+            if (connectorFromRegistry != null)
+               connectorFromRegistry.disconnect();
+         }
+      }
+      catch (Throwable ignore)
+      {        
+         log.trace(this + " failed to disconnect the new client", ignore);
+      }
+      
+      connector = null;
+      
+      log.trace(this + " closed");
+   }
+
+   public long getSessionID()
+   {
+      if (session == null || !session.isConnected())
+      {
+         return -1;
+      }
+      return session.getID();
+   }
+    
+   /**
+    * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
+    */
+   public Packet sendBlocking(final long targetID, final long executorID, final Packet packet) throws MessagingException
+   {
+      checkConnected();
+      
+      long handlerID = connector.getDispatcher().generateID();
+      
+      ResponseHandler handler = new ResponseHandler(handlerID);
+      
+      connector.getDispatcher().register(handler);
+      
+      try
+      {  
+         packet.setTargetID(targetID);
+         packet.setExecutorID(executorID);
+         packet.setResponseTargetID(handlerID);
+            
+         try
+         {
+            session.write(packet);
+         }
+         catch (Exception e)
+         {
+            log.error("Caught unexpected exception", e);
+            
+            throw new MessagingException(MessagingException.INTERNAL_ERROR);
+         }
+         
+         Packet response = handler.waitForResponse(connectionParams.getTimeout());
+         
+         if (response == null)
+         {
+            throw new IllegalStateException("No response received for " + packet);
+         }
+         
+         if (response instanceof MessagingExceptionMessage)
+         {
+            MessagingExceptionMessage message = (MessagingExceptionMessage) response;
+            
+            throw message.getException();
+         }
+         else
+         {
+            return response;
+         } 
+      }
+      finally
+      {
+         connector.getDispatcher().unregister(handlerID);
+      }           
+   }
+   
+   public void sendOneWay(final long targetID, final long executorID, final Packet packet) throws MessagingException
+   {
+      assert packet != null;
+
+      packet.setTargetID(targetID);
+      packet.setExecutorID(executorID);
+      
+      try
+      {
+         session.write(packet);
+      }
+      catch (Exception e)
+      {
+         log.error("Caught unexpected exception", e);
+         
+         throw new MessagingException(MessagingException.INTERNAL_ERROR);
+      }
+   }
+   
+   public synchronized void setRemotingSessionListener(final RemotingSessionListener newListener)
+   {
+      if (listener != null && newListener != null)
+      {
+         throw new IllegalStateException("FailureListener already set to " + listener);
+      }
+
+      if (newListener != null)
+      {
+         connector.addSessionListener(newListener);
+      }
+      else 
+      {
+         connector.removeSessionListener(listener);
+      }
+      this.listener = newListener;
+   }
+   
+   public PacketDispatcher getPacketDispatcher()
+   {
+      return connector.getDispatcher();
+   }
+   
+   public Location getLocation()
+   {
+   	return location;
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+      
+   private static class ResponseHandler implements PacketHandler
+   {
+      private long id;
+      
+      private Packet response;
+      
+      ResponseHandler(final long id)
+      {
+         this.id = id;
+      }
+
+      public long getID()
+      {
+         return id;
+      }
+
+      public synchronized void handle(final Packet packet, final PacketReturner sender)
+      {
+         this.response = packet;
+         
+         notify();
+      }
+      
+      public synchronized Packet waitForResponse(final long timeout)
+      {
+         long toWait = timeout;
+         long start = System.currentTimeMillis();
+
+         while (response == null && toWait > 0)
+         {
+            try
+            {
+               wait(toWait);
+            }
+            catch (InterruptedException e)
+            {
+            }
+            
+            long now = System.currentTimeMillis();
+            
+            toWait -= now - start;
+            
+            start = now;
+         }
+         
+         return response;         
+      }
+      
+   }
+
+   private void checkConnected() throws MessagingException
+   {
+      if (session == null)
+      {
+         throw new IllegalStateException("Client " + this
+               + " is not connected.");
+      }
+      if (!session.isConnected())
+      {
+         throw new MessagingException(MessagingException.NOT_CONNECTED);
+      }
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -6,6 +6,7 @@
  */
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.util.MessagingBuffer;
 
 /**
@@ -19,6 +20,9 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(CreateConnectionRequest.class);
+
+   
    // Attributes ----------------------------------------------------
 
    private int version;
@@ -96,6 +100,23 @@
       return buf.toString();
    }
 
+   public boolean equals(Object other)
+   {
+      if (other instanceof CreateConnectionRequest == false)
+      {
+         return false;
+      }
+            
+      CreateConnectionRequest r = (CreateConnectionRequest)other;
+      
+      boolean matches = this.version == r.version &&
+                        this.remotingSessionID == r.remotingSessionID &&
+                        this.username == null ? r.username == null : this.username.equals(r.username) &&
+                        this.password == null ? r.password == null : this.password.equals(r.password);
+      
+      return matches;
+   }
+   
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -22,7 +22,6 @@
 package org.jboss.messaging.jms.client;
 
 import java.io.Serializable;
-import java.io.IOException;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -42,12 +41,12 @@
 
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionFactoryImpl;
 import org.jboss.messaging.jms.referenceable.SerializableObjectRefAddr;
 
 /**
@@ -226,6 +225,7 @@
       if (connectionFactory == null)
       {
          connectionFactory = new ClientConnectionFactoryImpl(
+               new RemotingConnectionFactoryImpl(),
                location,
                connectionParams,
                defaultConsumerWindowSize,

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java	2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java	2008-06-09 16:53:40 UTC (rev 4411)
@@ -11,12 +11,12 @@
 
 import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
 import org.jboss.messaging.core.client.impl.LocationImpl;
-import org.jboss.messaging.core.client.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.mina.MinaService;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 




More information about the jboss-cvs-commits mailing list