[jboss-cvs] JBoss Messaging SVN: r4411 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 9 12:53:41 EDT 2008
Author: timfox
Date: 2008-06-09 12:53:40 -0400 (Mon, 09 Jun 2008)
New Revision: 4411
Added:
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
Log:
ClientConnectionFactory test
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -32,27 +32,150 @@
*/
public interface ClientConnectionFactory
{
+ /**
+ * Create a connection using null user and password
+ *
+ * @return The connection
+ * @throws MessagingException
+ */
ClientConnection createConnection() throws MessagingException;
+ /**
+ * Create a connection specifying username and password
+ *
+ * @param username The username
+ * @param password The password
+ * @return The connection
+ * @throws MessagingException
+ */
ClientConnection createConnection(String username, String password) throws MessagingException;
+
+ /**
+ * Set the default consumer window size value to use for consumers created from this connection factory.
+ *
+ * @param the window size, measured in bytes.
+ * A value of -1 signifies that consumer flow control is disabled.
+ */
void setDefaultConsumerWindowSize(int size);
+ /**
+ * Get the default consumer window size value to use for consumers created from this connection factory.
+ * @param size
+ *
+ * JBoss Messaging implements credit based consumer flow control, this value determines the initial pot of credits
+ * the server has for the consumer. The server can only send messages to the consumer as long as it has
+ * sufficient credits.
+ *
+ * @return The default window size, measured in bytes.
+ * A value of -1 signifies that consumer flow control is disabled.
+ */
int getDefaultConsumerWindowSize();
+ /**
+ * Set the default producer window size value to use for producers created from this connection factory.
+ *
+ * @param the window size, measured in bytes.
+ * A value of -1 signifies that producer flow control is disabled.
+ */
void setDefaultProducerWindowSize(int size);
+ /**
+ * Get the default consumer window size value to use for consumers created from this connection factory.
+ *
+ * JBoss Messaging implements credit based consumer flow control, this value determines the initial pot of credits
+ * the server has for the consumer. The server can only send messages to the consumer as long as it has
+ * sufficient credits.
+ *
+ * @return The default window size, measured in bytes.
+ * A value of -1 signifies that consumer flow control is disabled.
+ */
int getDefaultProducerWindowSize();
+ /**
+ * Set the default consumer maximum consume rate for consumers created from this connection factory.
+ * @param rate- the maximum consume rate, measured in messages / second
+ * A value of -1 signifies there is no maximum rate limit
+ */
void setDefaultConsumerMaxRate(int rate);
+ /**
+ * Get the default consumer maximum consume rate for consumers created from this connection factory.
+ * @return the maximum consume rate, measured in messages / second
+ * A value of -1 signifies there is no maximum rate limit
+ */
int getDefaultConsumerMaxRate();
+ /**
+ * Set the default producer maximum send rate for producers created from this connection factory.
+ * @param rate- the maximum send rate, measured in messages / second
+ * A value of -1 signifies there is no maximum rate limit
+ */
void setDefaultProducerMaxRate(int rate);
+ /**
+ * Get the default producer maximum send rate for producers created from this connection factory.
+ * @return the maximum send rate, measured in messages / second
+ * A value of -1 signifies there is no maximum rate limit
+ */
int getDefaultProducerMaxRate();
+ /**
+ * Get the default value of whether producers created from this connection factory will send persistent messages
+ * blocking.
+ * @return Whether persistent messages are sent blocking
+ */
+ boolean isDefaultBlockOnPersistentSend();
+
+ /**
+ * Set the default value of whether producers created from this connection factory will send persistent messages
+ * blocking.
+ * @param blocking Whether persistent messages are sent blocking
+ */
+ void setDefaultBlockOnPersistentSend(final boolean blocking);
+
+ /**
+ * Get the default value of whether producers created from this connection factory will send non persistent messages
+ * blocking.
+ * @return Whether non persistent messages are sent blocking
+ */
+ boolean isDefaultBlockOnNonPersistentSend();
+
+ /**
+ * Set the default value of whether producers created from this connection factory will send non persistent messages
+ * blocking.
+ * @param blocking Whether non persistent messages are sent blocking
+ */
+ void setDefaultBlockOnNonPersistentSend(final boolean blocking);
+
+ /**
+ * Get the default value of whether producers created from this connection factory will send acknowledgements
+ * blocking
+ * @return Whether acknowledgements are sent blocking
+ */
+ boolean isDefaultBlockOnAcknowledge();
+
+ /**
+ * Set the default value of whether producers created from this connection factory will send acknowledgements
+ * blockiong
+ * @param blocking Whether acknowledgements are sent blocking
+ */
+ void setDefaultBlockOnAcknowledge(final boolean blocking);
+
+ /**
+ * Get the location of the server for this connection factory
+ * @return The location
+ */
Location getLocation();
+ /**
+ * Get the connection params used when creating connections using this connection factory
+ */
ConnectionParams getConnectionParams();
+
+ /**
+ * Set the connection params to be used when creating connections using this connection factory
+ * @param params
+ */
+ void setConnectionParams(ConnectionParams connectionParams);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.client.ClientBrowser;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.ReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -21,16 +21,15 @@
*/
package org.jboss.messaging.core.client.impl;
-import java.io.Serializable;
-
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.RemotingConnectionFactory;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionFactoryImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.version.Version;
@@ -62,9 +61,11 @@
// Attributes -----------------------------------------------------------------------------------
+ private final RemotingConnectionFactory remotingConnectionFactory;
+
private final Location location;
- private final ConnectionParams connectionParams;
+ private ConnectionParams connectionParams;
private int defaultConsumerWindowSize;
@@ -76,55 +77,90 @@
private boolean defaultBlockOnAcknowledge;
- private final boolean defaultSendNonPersistentMessagesBlocking;
+ private boolean defaultBlockOnPersistentSend;
- private final boolean defaultSendPersistentMessagesBlocking;
+ private boolean defaultBlockOnNonPersistentSend;
+
// Static ---------------------------------------------------------------------------------------
+
+ public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
+
+ public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
+
+ public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
+
+ public static final int DEFAULT_PRODUCER_MAX_RATE = -1;
+
+ public static final boolean DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
+
+ public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
+
+ public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
// Constructors ---------------------------------------------------------------------------------
- public ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams,
+ /**
+ * Create a ClientConnectionFactoryImpl specifying all attributes
+ */
+ public ClientConnectionFactoryImpl(final RemotingConnectionFactory remotingConnectionFactory,
+ final Location location, final ConnectionParams connectionParams,
final int defaultConsumerWindowSize, final int defaultConsumerMaxRate,
final int defaultProducerWindowSize, final int defaultProducerMaxRate,
final boolean defaultBlockOnAcknowledge,
final boolean defaultSendNonPersistentMessagesBlocking,
final boolean defaultSendPersistentMessagesBlocking)
{
+ this.remotingConnectionFactory = remotingConnectionFactory;
this.location = location;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.defaultConsumerMaxRate = defaultConsumerMaxRate;
this.defaultProducerWindowSize = defaultProducerWindowSize;
this.defaultProducerMaxRate = defaultProducerMaxRate;
this.defaultBlockOnAcknowledge = defaultBlockOnAcknowledge;
- this.defaultSendNonPersistentMessagesBlocking = defaultSendNonPersistentMessagesBlocking;
- this.defaultSendPersistentMessagesBlocking = defaultSendPersistentMessagesBlocking;
+ this.defaultBlockOnNonPersistentSend = defaultSendNonPersistentMessagesBlocking;
+ this.defaultBlockOnPersistentSend = defaultSendPersistentMessagesBlocking;
this.connectionParams = connectionParams;
}
+ /**
+ * Create a ClientConnectionFactoryImpl specify location and using all default attribute values
+ * @param location the location of the server
+ */
public ClientConnectionFactoryImpl(final Location location)
{
- this(location, new ConnectionParamsImpl(), false);
+ this(location, new ConnectionParamsImpl(), false);
}
+ /**
+ * Create a ClientConnectionFactoryImpl specify location and connection params and using all other default attribute values
+ * @param location the location of the server
+ * @param connectionParams the connection parameters
+ */
public ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams)
{
this(location, connectionParams, false);
}
- protected ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams, final boolean dummy)
+
+ private ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams,
+ final boolean dummy)
{
- this.defaultConsumerWindowSize = 1024 * 1024;
- this.defaultConsumerMaxRate = -1;
- this.defaultProducerWindowSize = 1024 * 1024;
- this.defaultProducerMaxRate = -1;
+ defaultConsumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
+ defaultConsumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
+ defaultProducerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
+ defaultProducerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
+ defaultBlockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+ defaultBlockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+ defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
this.location = location;
- this.defaultSendNonPersistentMessagesBlocking = false;
- this.defaultSendPersistentMessagesBlocking = false;
this.connectionParams = connectionParams;
+ this.remotingConnectionFactory = new RemotingConnectionFactoryImpl();
}
-
+
+ // ClientConnectionFactory implementation ---------------------------------------------
+
public ClientConnection createConnection() throws MessagingException
{
return createConnection(null, null);
@@ -137,8 +173,9 @@
RemotingConnection remotingConnection = null;
try
{
- remotingConnection = new RemotingConnectionImpl(location, connectionParams);
+ remotingConnection = remotingConnectionFactory.createRemotingConnection(location, connectionParams);
+ log.info("calling start");
remotingConnection.start();
long sessionID = remotingConnection.getSessionID();
@@ -152,8 +189,8 @@
return new ClientConnectionImpl(response.getConnectionTargetID(), remotingConnection,
defaultConsumerWindowSize, defaultConsumerMaxRate,
defaultProducerWindowSize, defaultProducerMaxRate,
- defaultBlockOnAcknowledge, defaultSendNonPersistentMessagesBlocking,
- defaultSendPersistentMessagesBlocking, response.getServerVersion());
+ defaultBlockOnAcknowledge, defaultBlockOnNonPersistentSend,
+ defaultBlockOnPersistentSend, response.getServerVersion());
}
catch (Throwable t)
{
@@ -170,6 +207,7 @@
if (t instanceof MessagingException)
{
+ log.info("got messaging excetption");
throw (MessagingException)t;
}
else
@@ -183,8 +221,6 @@
}
}
- // ClientConnectionFactory implementation ---------------------------------------------
-
public int getDefaultConsumerWindowSize()
{
return defaultConsumerWindowSize;
@@ -194,41 +230,76 @@
{
defaultConsumerWindowSize = size;
}
-
+
public int getDefaultProducerWindowSize()
{
return defaultProducerWindowSize;
}
-
+
public void setDefaultProducerWindowSize(final int size)
{
defaultProducerWindowSize = size;
}
-
+
public int getDefaultProducerMaxRate()
{
return defaultProducerMaxRate;
}
-
+
public void setDefaultProducerMaxRate(final int rate)
{
this.defaultProducerMaxRate = rate;
}
-
+
public int getDefaultConsumerMaxRate()
{
return defaultConsumerMaxRate;
}
-
+
public void setDefaultConsumerMaxRate(final int rate)
{
this.defaultConsumerMaxRate = rate;
}
+ public boolean isDefaultBlockOnPersistentSend()
+ {
+ return defaultBlockOnPersistentSend;
+ }
+
+ public void setDefaultBlockOnPersistentSend(final boolean blocking)
+ {
+ defaultBlockOnPersistentSend = blocking;
+ }
+
+ public boolean isDefaultBlockOnNonPersistentSend()
+ {
+ return defaultBlockOnNonPersistentSend;
+ }
+
+ public void setDefaultBlockOnNonPersistentSend(final boolean blocking)
+ {
+ defaultBlockOnNonPersistentSend = blocking;
+ }
+
+ public boolean isDefaultBlockOnAcknowledge()
+ {
+ return this.defaultBlockOnAcknowledge;
+ }
+
+ public void setDefaultBlockOnAcknowledge(final boolean blocking)
+ {
+ defaultBlockOnAcknowledge = blocking;
+ }
+
public ConnectionParams getConnectionParams()
{
return connectionParams;
}
+
+ public void setConnectionParams(final ConnectionParams params)
+ {
+ this.connectionParams = params;
+ }
public Location getLocation()
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -28,6 +28,7 @@
import org.jboss.messaging.core.client.RemotingSessionListener;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -8,6 +8,7 @@
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.remoting.RemotingConnection;
/**
*
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.ProducerSendMessage;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiter;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -39,6 +39,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -1,36 +1,35 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.messaging.core.client.impl;
import org.jboss.messaging.core.client.ConnectionParams;
-import java.io.Serializable;
-
/**
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
public class ConnectionParamsImpl implements ConnectionParams
{
-
+ private static final long serialVersionUID = 1662480686951551534L;
+
protected long timeout = DEFAULT_REQRES_TIMEOUT;
protected long keepAliveInterval = DEFAULT_KEEP_ALIVE_INTERVAL;
protected long keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
@@ -48,7 +47,7 @@
protected long writeQueueBlockTimeout = 10000;
protected long writeQueueMinBytes = 32 * 1024L;
protected long writeQueueMaxBytes = 64 * 1024L;
-
+
public long getTimeout()
{
return timeout;
@@ -128,7 +127,7 @@
{
this.tcpSendBufferSize = tcpSendBufferSize;
}
-
+
public long getWriteQueueBlockTimeout()
{
return writeQueueBlockTimeout;
@@ -143,7 +142,7 @@
{
return writeQueueMinBytes;
}
-
+
public void setWriteQueueBlockTimeout(final long timeout)
{
this.writeQueueBlockTimeout = timeout;
@@ -224,7 +223,7 @@
this.trustStorePassword = trustStorePassword;
}
- public String getURI()
+ public String getURI()
{
StringBuffer buff = new StringBuffer();
//buff.append(transport + "://" + host + ":" + port);
@@ -240,4 +239,28 @@
buff.append("&").append("trustStorePath=").append(trustStorePath);
return buff.toString();
}
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof ConnectionParams == false)
+ {
+ return false;
+ }
+
+ ConnectionParams cp = (ConnectionParams)other;
+
+ return cp.getTimeout() == timeout &&
+ cp.getKeepAliveTimeout() == this.keepAliveTimeout &&
+ cp.getKeepAliveInterval() == this.keepAliveInterval &&
+ cp.isInvmDisabled() == this.isInvmDisabled() &&
+ cp.isInvmDisabledModified() == this.isInvmDisabledModified() &&
+ cp.isTcpNoDelay() == this.isTcpNoDelay() &&
+ cp.getTcpReceiveBufferSize() == this.getTcpReceiveBufferSize() &&
+ cp.getTcpSendBufferSize() == this.getTcpSendBufferSize() &&
+ cp.isSSLEnabled() == this.isSSLEnabled() &&
+ cp.isSSLEnabledModified() == this.isSSLEnabledModified() &&
+ cp.getWriteQueueBlockTimeout() == this.getWriteQueueBlockTimeout() &&
+ cp.getWriteQueueMinBytes() == this.getWriteQueueMinBytes() &&
+ cp.getWriteQueueMaxBytes() == this.getWriteQueueMaxBytes();
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/LocationImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -33,18 +33,18 @@
{
private static final long serialVersionUID = -1101852656621257742L;
- protected TransportType transport;
- protected String host;
- protected int port = ConfigurationImpl.DEFAULT_REMOTING_PORT;
+ private TransportType transport;
+ private String host;
+ private int port = ConfigurationImpl.DEFAULT_REMOTING_PORT;
private int serverID;
- public LocationImpl(int serverID)
+ public LocationImpl(final int serverID)
{
this.transport = TransportType.INVM;
this.serverID = serverID;
}
- public LocationImpl(TransportType transport, String host, int port)
+ public LocationImpl(final TransportType transport, final String host, final int port)
{
assert host != null;
assert port > 0;
@@ -55,6 +55,11 @@
this.host = host;
this.port = port;
}
+
+ public LocationImpl(final TransportType transport, final String host)
+ {
+ this(transport, host, ConfigurationImpl.DEFAULT_REMOTING_PORT);
+ }
public String getLocation()
{
Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -1,33 +0,0 @@
-package org.jboss.messaging.core.client.impl;
-
-import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-
-/**
- *
- * A RemotingConnection
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface RemotingConnection
-{
- public void start() throws Throwable;
-
- public void stop();
-
- public long getSessionID();
-
- Packet sendBlocking(long targetID, long executorID, Packet packet) throws MessagingException;
-
- void sendOneWay(long targetID, long executorID, Packet packet) throws MessagingException;
-
- void setRemotingSessionListener(RemotingSessionListener newListener);
-
- PacketDispatcher getPacketDispatcher();
-
- public Location getLocation();
-}
Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -1,322 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.client.impl;
-
-import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
-
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.client.ConnectionParams;
-import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.NIOConnector;
-import org.jboss.messaging.core.remoting.NIOSession;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-
-/**
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class RemotingConnectionImpl implements RemotingConnection
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- private final Location location;
-
- private final ConnectionParams connectionParams;
-
- private NIOConnector connector;
-
- private NIOSession session;
-
- private RemotingSessionListener listener;
-
- //private transient PacketDispatcher dispatcher;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams) throws Exception
- {
- assert location != null;
- assert connectionParams != null;
-
- this.location = location;
- this.connectionParams = connectionParams;
-
- log.trace(this + " created with configuration " + location);
- }
-
- public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams, NIOConnector nioConnector) throws Exception
- {
- assert location != null;
- assert connectionParams != null;
-
- this.location = location;
- this.connectionParams = connectionParams;
- connector = nioConnector;
- session = connector.connect();
- log.trace(this + " created with connector " + nioConnector);
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- // RemotingConnection implementation ------------------------------------------------------------
-
- public void start() throws Throwable
- {
- if (log.isTraceEnabled()) { log.trace(this + " started remoting connection"); }
-
- connector = REGISTRY.getConnector(location, connectionParams);
- session = connector.connect();
-
- if (log.isDebugEnabled())
- log.debug("Using " + connector + " to connect to " + location);
-
- log.trace(this + " started");
- }
-
-
- public void stop()
- {
- log.trace(this + " stop");
-
- try
- {
- if (connector != null)
- {
- if (listener != null)
- connector.removeSessionListener(listener);
- NIOConnector connectorFromRegistry = REGISTRY.removeConnector(location);
- if (connectorFromRegistry != null)
- connectorFromRegistry.disconnect();
- }
- }
- catch (Throwable ignore)
- {
- log.trace(this + " failed to disconnect the new client", ignore);
- }
-
- connector = null;
-
- log.trace(this + " closed");
- }
-
- public long getSessionID()
- {
- if (session == null || !session.isConnected())
- {
- return -1;
- }
- return session.getID();
- }
-
- /**
- * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
- */
- public Packet sendBlocking(final long targetID, final long executorID, final Packet packet) throws MessagingException
- {
- checkConnected();
-
- long handlerID = connector.getDispatcher().generateID();
-
- ResponseHandler handler = new ResponseHandler(handlerID);
-
- connector.getDispatcher().register(handler);
-
- try
- {
- packet.setTargetID(targetID);
- packet.setExecutorID(executorID);
- packet.setResponseTargetID(handlerID);
-
- try
- {
- session.write(packet);
- }
- catch (Exception e)
- {
- log.error("Caught unexpected exception", e);
-
- throw new MessagingException(MessagingException.INTERNAL_ERROR);
- }
-
- Packet response = handler.waitForResponse(connectionParams.getTimeout());
-
- if (response == null)
- {
- throw new IllegalStateException("No response received for " + packet);
- }
-
- if (response instanceof MessagingExceptionMessage)
- {
- MessagingExceptionMessage message = (MessagingExceptionMessage) response;
-
- throw message.getException();
- }
- else
- {
- return response;
- }
- }
- finally
- {
- connector.getDispatcher().unregister(handlerID);
- }
- }
-
- public void sendOneWay(final long targetID, final long executorID, final Packet packet) throws MessagingException
- {
- assert packet != null;
-
- packet.setTargetID(targetID);
- packet.setExecutorID(executorID);
-
- try
- {
- session.write(packet);
- }
- catch (Exception e)
- {
- log.error("Caught unexpected exception", e);
-
- throw new MessagingException(MessagingException.INTERNAL_ERROR);
- }
- }
-
- public synchronized void setRemotingSessionListener(final RemotingSessionListener newListener)
- {
- if (listener != null && newListener != null)
- {
- throw new IllegalStateException("FailureListener already set to " + listener);
- }
-
- if (newListener != null)
- {
- connector.addSessionListener(newListener);
- }
- else
- {
- connector.removeSessionListener(listener);
- }
- this.listener = newListener;
- }
-
- public PacketDispatcher getPacketDispatcher()
- {
- return connector.getDispatcher();
- }
-
- public Location getLocation()
- {
- return location;
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private static class ResponseHandler implements PacketHandler
- {
- private long id;
-
- private Packet response;
-
- ResponseHandler(final long id)
- {
- this.id = id;
- }
-
- public long getID()
- {
- return id;
- }
-
- public synchronized void handle(final Packet packet, final PacketReturner sender)
- {
- this.response = packet;
-
- notify();
- }
-
- public synchronized Packet waitForResponse(final long timeout)
- {
- long toWait = timeout;
- long start = System.currentTimeMillis();
-
- while (response == null && toWait > 0)
- {
- try
- {
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- return response;
- }
-
- }
-
- private void checkConnected() throws MessagingException
- {
- if (session == null)
- {
- throw new IllegalStateException("Client " + this
- + " is not connected.");
- }
- if (!session.isConnected())
- {
- throw new MessagingException(MessagingException.NOT_CONNECTED);
- }
- }
-}
Copied: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java (from rev 4410, trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,31 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.exception.MessagingException;
+
+/**
+ *
+ * A RemotingConnection
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RemotingConnection
+{
+ public void start() throws Throwable;
+
+ public void stop();
+
+ public long getSessionID();
+
+ Packet sendBlocking(long targetID, long executorID, Packet packet) throws MessagingException;
+
+ void sendOneWay(long targetID, long executorID, Packet packet) throws MessagingException;
+
+ void setRemotingSessionListener(RemotingSessionListener newListener);
+
+ PacketDispatcher getPacketDispatcher();
+
+ public Location getLocation();
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnectionFactory.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+
+/**
+ *
+ * A RemotingConnectionFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RemotingConnectionFactory
+{
+ RemotingConnection createRemotingConnection(final Location location, final ConnectionParams params)
+ throws Exception;
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionFactoryImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.RemotingConnectionFactory;
+
+/**
+ *
+ * A RemotingConnectionFactoryImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RemotingConnectionFactoryImpl implements RemotingConnectionFactory
+{
+
+ public RemotingConnection createRemotingConnection(final Location location,
+ final ConnectionParams params) throws Exception
+ {
+ return new RemotingConnectionImpl(location, params);
+ }
+
+}
Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java (from rev 4410, trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -0,0 +1,321 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+
+/**
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class RemotingConnectionImpl implements RemotingConnection
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private final Location location;
+
+ private final ConnectionParams connectionParams;
+
+ private NIOConnector connector;
+
+ private NIOSession session;
+
+ private RemotingSessionListener listener;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams) throws Exception
+ {
+ assert location != null;
+ assert connectionParams != null;
+
+ this.location = location;
+ this.connectionParams = connectionParams;
+
+ log.trace(this + " created with configuration " + location);
+ }
+
+ public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams, NIOConnector nioConnector) throws Exception
+ {
+ assert location != null;
+ assert connectionParams != null;
+
+ this.location = location;
+ this.connectionParams = connectionParams;
+ connector = nioConnector;
+ session = connector.connect();
+ log.trace(this + " created with connector " + nioConnector);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // RemotingConnection implementation ------------------------------------------------------------
+
+ public void start() throws Throwable
+ {
+ if (log.isTraceEnabled()) { log.trace(this + " started remoting connection"); }
+
+ connector = REGISTRY.getConnector(location, connectionParams);
+ session = connector.connect();
+
+ if (log.isDebugEnabled())
+ log.debug("Using " + connector + " to connect to " + location);
+
+ log.trace(this + " started");
+ }
+
+
+ public void stop()
+ {
+ log.trace(this + " stop");
+
+ try
+ {
+ if (connector != null)
+ {
+ if (listener != null)
+ connector.removeSessionListener(listener);
+ NIOConnector connectorFromRegistry = REGISTRY.removeConnector(location);
+ if (connectorFromRegistry != null)
+ connectorFromRegistry.disconnect();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ log.trace(this + " failed to disconnect the new client", ignore);
+ }
+
+ connector = null;
+
+ log.trace(this + " closed");
+ }
+
+ public long getSessionID()
+ {
+ if (session == null || !session.isConnected())
+ {
+ return -1;
+ }
+ return session.getID();
+ }
+
+ /**
+ * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
+ */
+ public Packet sendBlocking(final long targetID, final long executorID, final Packet packet) throws MessagingException
+ {
+ checkConnected();
+
+ long handlerID = connector.getDispatcher().generateID();
+
+ ResponseHandler handler = new ResponseHandler(handlerID);
+
+ connector.getDispatcher().register(handler);
+
+ try
+ {
+ packet.setTargetID(targetID);
+ packet.setExecutorID(executorID);
+ packet.setResponseTargetID(handlerID);
+
+ try
+ {
+ session.write(packet);
+ }
+ catch (Exception e)
+ {
+ log.error("Caught unexpected exception", e);
+
+ throw new MessagingException(MessagingException.INTERNAL_ERROR);
+ }
+
+ Packet response = handler.waitForResponse(connectionParams.getTimeout());
+
+ if (response == null)
+ {
+ throw new IllegalStateException("No response received for " + packet);
+ }
+
+ if (response instanceof MessagingExceptionMessage)
+ {
+ MessagingExceptionMessage message = (MessagingExceptionMessage) response;
+
+ throw message.getException();
+ }
+ else
+ {
+ return response;
+ }
+ }
+ finally
+ {
+ connector.getDispatcher().unregister(handlerID);
+ }
+ }
+
+ public void sendOneWay(final long targetID, final long executorID, final Packet packet) throws MessagingException
+ {
+ assert packet != null;
+
+ packet.setTargetID(targetID);
+ packet.setExecutorID(executorID);
+
+ try
+ {
+ session.write(packet);
+ }
+ catch (Exception e)
+ {
+ log.error("Caught unexpected exception", e);
+
+ throw new MessagingException(MessagingException.INTERNAL_ERROR);
+ }
+ }
+
+ public synchronized void setRemotingSessionListener(final RemotingSessionListener newListener)
+ {
+ if (listener != null && newListener != null)
+ {
+ throw new IllegalStateException("FailureListener already set to " + listener);
+ }
+
+ if (newListener != null)
+ {
+ connector.addSessionListener(newListener);
+ }
+ else
+ {
+ connector.removeSessionListener(listener);
+ }
+ this.listener = newListener;
+ }
+
+ public PacketDispatcher getPacketDispatcher()
+ {
+ return connector.getDispatcher();
+ }
+
+ public Location getLocation()
+ {
+ return location;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ private static class ResponseHandler implements PacketHandler
+ {
+ private long id;
+
+ private Packet response;
+
+ ResponseHandler(final long id)
+ {
+ this.id = id;
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public synchronized void handle(final Packet packet, final PacketReturner sender)
+ {
+ this.response = packet;
+
+ notify();
+ }
+
+ public synchronized Packet waitForResponse(final long timeout)
+ {
+ long toWait = timeout;
+ long start = System.currentTimeMillis();
+
+ while (response == null && toWait > 0)
+ {
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ return response;
+ }
+
+ }
+
+ private void checkConnected() throws MessagingException
+ {
+ if (session == null)
+ {
+ throw new IllegalStateException("Client " + this
+ + " is not connected.");
+ }
+ if (!session.isConnected())
+ {
+ throw new MessagingException(MessagingException.NOT_CONNECTED);
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -6,6 +6,7 @@
*/
package org.jboss.messaging.core.remoting.impl.wireformat;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.util.MessagingBuffer;
/**
@@ -19,6 +20,9 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(CreateConnectionRequest.class);
+
+
// Attributes ----------------------------------------------------
private int version;
@@ -96,6 +100,23 @@
return buf.toString();
}
+ public boolean equals(Object other)
+ {
+ if (other instanceof CreateConnectionRequest == false)
+ {
+ return false;
+ }
+
+ CreateConnectionRequest r = (CreateConnectionRequest)other;
+
+ boolean matches = this.version == r.version &&
+ this.remotingSessionID == r.remotingSessionID &&
+ this.username == null ? r.username == null : this.username.equals(r.username) &&
+ this.password == null ? r.password == null : this.password.equals(r.password);
+
+ return matches;
+ }
+
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -22,7 +22,6 @@
package org.jboss.messaging.jms.client;
import java.io.Serializable;
-import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -42,12 +41,12 @@
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionFactoryImpl;
import org.jboss.messaging.jms.referenceable.SerializableObjectRefAddr;
/**
@@ -226,6 +225,7 @@
if (connectionFactory == null)
{
connectionFactory = new ClientConnectionFactoryImpl(
+ new RemotingConnectionFactoryImpl(),
location,
connectionParams,
defaultConsumerWindowSize,
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java 2008-06-07 20:17:50 UTC (rev 4410)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java 2008-06-09 16:53:40 UTC (rev 4411)
@@ -11,12 +11,12 @@
import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
import org.jboss.messaging.core.client.impl.LocationImpl;
-import org.jboss.messaging.core.client.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.mina.MinaService;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
More information about the jboss-cvs-commits
mailing list