[jboss-cvs] JBoss Messaging SVN: r3773 - in trunk/src/main/org/jboss: jms/client/remoting and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 22 12:56:43 EST 2008
Author: timfox
Date: 2008-02-22 12:56:42 -0500 (Fri, 22 Feb 2008)
New Revision: 3773
Added:
trunk/src/main/org/jboss/jms/client/remoting/RemotingConnection.java
trunk/src/main/org/jboss/jms/client/remoting/RemotingConnectionImpl.java
Removed:
trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java
Modified:
trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientConnectionInternal.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
Log:
More adjustments to ClientConsumer and separated RemotingConnection out into interface and impl
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -22,8 +22,9 @@
package org.jboss.jms.client.impl;
import org.jboss.jms.client.api.ClientBrowser;
-import org.jboss.jms.client.remoting.MessagingRemotingConnection;
+import org.jboss.jms.client.remoting.RemotingConnection;
import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBrowserHasNextMessageMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBrowserHasNextMessageResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBrowserNextMessageBlockMessage;
@@ -31,7 +32,6 @@
import org.jboss.messaging.core.remoting.wireformat.SessionBrowserNextMessageMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBrowserNextMessageResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBrowserResetMessage;
-import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.util.MessagingException;
/**
@@ -53,7 +53,7 @@
private ClientSessionInternal session;
- private MessagingRemotingConnection remotingConnection;
+ private RemotingConnection remotingConnection;
private volatile boolean closed;
@@ -61,7 +61,7 @@
// Constructors ---------------------------------------------------------------------------------
- public ClientBrowserImpl(MessagingRemotingConnection remotingConnection, ClientSessionInternal session, String id)
+ public ClientBrowserImpl(RemotingConnection remotingConnection, ClientSessionInternal session, String id)
{
this.remotingConnection = remotingConnection;
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -25,7 +25,8 @@
import org.jboss.jms.client.api.ClientConnection;
import org.jboss.jms.client.api.ClientConnectionFactory;
-import org.jboss.jms.client.remoting.MessagingRemotingConnection;
+import org.jboss.jms.client.remoting.RemotingConnectionImpl;
+import org.jboss.jms.client.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.RemotingConfiguration;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
@@ -104,10 +105,10 @@
byte v = version.getProviderIncrementingVersion();
- MessagingRemotingConnection remotingConnection = null;
+ RemotingConnection remotingConnection = null;
try
{
- remotingConnection = new MessagingRemotingConnection(remotingConfig);
+ remotingConnection = new RemotingConnectionImpl(remotingConfig);
remotingConnection.start();
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -28,7 +28,7 @@
import org.jboss.jms.client.api.ClientSession;
import org.jboss.jms.client.api.FailureListener;
-import org.jboss.jms.client.remoting.MessagingRemotingConnection;
+import org.jboss.jms.client.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionMessage;
import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
@@ -63,7 +63,7 @@
private int serverID;
- private MessagingRemotingConnection remotingConnection;
+ private RemotingConnection remotingConnection;
private boolean strictTck;
@@ -76,7 +76,7 @@
// Constructors ---------------------------------------------------------------------------------
public ClientConnectionImpl(String id, int serverID, boolean strictTck,
- MessagingRemotingConnection connection)
+ RemotingConnection connection)
{
this.id = id;
@@ -160,7 +160,7 @@
return serverID;
}
- public MessagingRemotingConnection getRemotingConnection()
+ public RemotingConnection getRemotingConnection()
{
return remotingConnection;
}
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionInternal.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionInternal.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -7,7 +7,7 @@
package org.jboss.jms.client.impl;
import org.jboss.jms.client.api.ClientConnection;
-import org.jboss.jms.client.remoting.MessagingRemotingConnection;
+import org.jboss.jms.client.remoting.RemotingConnection;
/**
*
@@ -20,7 +20,7 @@
{
int getServerID();
- MessagingRemotingConnection getRemotingConnection();
+ RemotingConnection getRemotingConnection();
void removeChild(String id);
}
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -26,7 +26,7 @@
import java.util.concurrent.TimeUnit;
import org.jboss.jms.client.api.MessageHandler;
-import org.jboss.jms.client.remoting.MessagingRemotingConnection;
+import org.jboss.jms.client.remoting.RemotingConnection;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.PriorityLinkedList;
import org.jboss.messaging.core.impl.PriorityLinkedListImpl;
@@ -61,48 +61,50 @@
// Attributes
// -----------------------------------------------------------------------------------
- private String id;
+ private final ClientSessionInternal session;
+
+ private final String id;
- private ClientSessionInternal session;
+ private final ExecutorService sessionExecutor;
- private PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(10);
+ private final RemotingConnection remotingConnection;
+ private final boolean direct;
+
+ private final int tokenBatchSize;
+
+ private final PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(10);
+
private volatile Thread receiverThread;
+ private volatile Thread onMessageThread;
+
private volatile MessageHandler handler;
private volatile boolean closed;
-
- private ExecutorService sessionExecutor;
+
+ private volatile long ignoreDeliveryMark = -1;
- private MessagingRemotingConnection remotingConnection;
-
- private long ignoreDeliveryMark = -1;
-
- private boolean direct;
-
- private Thread onMessageThread;
-
- private int tokensToSend;
-
- private int tokenBatchSize;
-
- // Static
- // ---------------------------------------------------------------------------------------
+ private volatile int tokensToSend;
// Constructors
// ---------------------------------------------------------------------------------
- public ClientConsumerImpl(ClientSessionInternal session, String id,
- ExecutorService sessionExecutor,
- MessagingRemotingConnection remotingConnection,
- boolean direct, int tokenBatchSize)
+ public ClientConsumerImpl(final ClientSessionInternal session, final String id,
+ final ExecutorService sessionExecutor,
+ final RemotingConnection remotingConnection,
+ final boolean direct, final int tokenBatchSize)
{
this.id = id;
+
this.session = session;
+
this.sessionExecutor = sessionExecutor;
+
this.remotingConnection = remotingConnection;
+
this.direct = direct;
+
this.tokenBatchSize = tokenBatchSize;
}
@@ -310,7 +312,10 @@
{
//Execute using executor
- buffer.addLast(message, message.getMessage().getPriority());
+ synchronized (buffer)
+ {
+ buffer.addLast(message, message.getMessage().getPriority());
+ }
sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
}
@@ -328,7 +333,7 @@
}
}
- public synchronized void recover(long lastDeliveryID)
+ public void recover(long lastDeliveryID)
{
ignoreDeliveryMark = lastDeliveryID;
@@ -371,7 +376,7 @@
if (Thread.currentThread() == onMessageThread)
{
- // If called from inside onMessage then return immediately - otherwise would block forever
+ // If called from inside onMessage then return immediately - otherwise would block
return;
}
@@ -401,7 +406,7 @@
throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
}
}
-
+
private void callOnMessage()
{
try
@@ -415,8 +420,13 @@
//ordering. If we just added a Runnable with the message to the executor immediately as we get it
//we could not do that
- DeliverMessage message = buffer.removeFirst();
+ DeliverMessage message;
+ synchronized (buffer)
+ {
+ message = buffer.removeFirst();
+ }
+
if (message != null)
{
boolean expired = message.getMessage().isExpired();
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -37,7 +37,7 @@
import org.jboss.jms.client.api.ClientBrowser;
import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.client.api.ClientProducer;
-import org.jboss.jms.client.remoting.MessagingRemotingConnection;
+import org.jboss.jms.client.remoting.RemotingConnection;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -119,7 +119,7 @@
private ExecutorService executor;
- private MessagingRemotingConnection remotingConnection;
+ private RemotingConnection remotingConnection;
private ClientConnectionInternal connection;
Deleted: trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -1,225 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.remoting;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
-
-import org.jboss.jms.client.api.FailureListener;
-import org.jboss.messaging.core.remoting.NIOConnector;
-import org.jboss.messaging.core.remoting.NIOSession;
-import org.jboss.messaging.core.remoting.RemotingConfiguration;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.util.MessagingException;
-
-/**
- *
- * Encapsulates the state and behaviour from MINA needed for a JMS connection.
- *
- * Each JMS connection maintains a single Client instance for invoking on the server.
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class MessagingRemotingConnection
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(MessagingRemotingConnection.class);
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- private RemotingConfiguration remotingConfig;
-
- private NIOConnector connector;
-
- private NIOSession session;
-
- private FailureListener listener;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public MessagingRemotingConnection(RemotingConfiguration remotingConfig) throws Exception
- {
- assert remotingConfig != null;
-
- this.remotingConfig = remotingConfig;
-
- log.trace(this + " created with configuration " + remotingConfig);
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void start() throws Throwable
- {
- if (log.isTraceEnabled()) { log.trace(this + " started remoting connection"); }
-
- connector = REGISTRY.getConnector(remotingConfig);
- session = connector.connect();
-
- if (log.isDebugEnabled())
- log.debug("Using " + connector + " to connect to " + remotingConfig);
-
- log.trace(this + " started");
- }
-
- public void stop()
- {
- log.trace(this + " stop");
-
- try
- {
- if (connector != null)
- {
- if (listener != null)
- connector.removeFailureListener(listener);
- NIOConnector connectorFromRegistry = REGISTRY.removeConnector(remotingConfig);
- if (connectorFromRegistry != null)
- connectorFromRegistry.disconnect();
- }
- }
- catch (Throwable ignore)
- {
- log.trace(this + " failed to disconnect the new client", ignore);
- }
-
- connector = null;
-
- log.trace(this + " closed");
- }
-
- public String getSessionID()
- {
- if (session == null || !session.isConnected())
- {
- return null;
- }
- return session.getID();
- }
-
- /**
- * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
- */
- public AbstractPacket send(String id, AbstractPacket packet) throws MessagingException
- {
- return send(id, packet, false);
- }
-
- public AbstractPacket send(String id, AbstractPacket packet, boolean oneWay) throws MessagingException
- {
- assert packet != null;
-
- packet.setTargetID(id);
-
- AbstractPacket response;
-
- try
- {
- response = (AbstractPacket) send(packet, oneWay);
- }
- catch (Exception e)
- {
- log.error("Caught unexpected exception", e);
-
- throw new MessagingException(MessagingException.INTERNAL_ERROR);
- }
-
- if (oneWay == false && response == null)
- {
- throw new IllegalStateException("No response received for " + packet);
- }
-
- if (response instanceof MessagingExceptionMessage)
- {
- MessagingExceptionMessage message = (MessagingExceptionMessage) response;
-
- throw message.getException();
- }
- else
- {
- return response;
- }
- }
-
- public synchronized void setFailureListener(FailureListener newListener)
- {
- if (listener != null && newListener != null)
- {
- throw new IllegalStateException("FailureListener already set to " + listener);
- }
-
- if (newListener != null)
- {
- connector.addFailureListener(newListener);
- }
- else
- {
- connector.removeFailureListener(listener);
- }
- this.listener = newListener;
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private AbstractPacket send(AbstractPacket packet, boolean oneWay)
- throws Exception
- {
- assert packet != null;
- checkConnected();
- packet.setOneWay(oneWay);
-
- if (oneWay)
- {
- session.write(packet);
- return null;
- } else
- {
- AbstractPacket response = (AbstractPacket) session.writeAndBlock(packet,
- remotingConfig.getTimeout(), SECONDS);
- return response;
- }
- }
-
- private void checkConnected() throws MessagingException
- {
- if (session == null)
- {
- throw new IllegalStateException("Client " + this
- + " is not connected.");
- }
- if (!session.isConnected())
- {
- throw new MessagingException(MessagingException.NOT_CONNECTED);
- }
- }
-}
Added: trunk/src/main/org/jboss/jms/client/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/RemotingConnection.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/remoting/RemotingConnection.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -0,0 +1,27 @@
+package org.jboss.jms.client.remoting;
+
+import org.jboss.jms.client.api.FailureListener;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.util.MessagingException;
+
+/**
+ *
+ * A RemotingConnection
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RemotingConnection
+{
+ public void start() throws Throwable;
+
+ public void stop();
+
+ public String getSessionID();
+
+ AbstractPacket send(String id, AbstractPacket packet) throws MessagingException;
+
+ AbstractPacket send(String id, AbstractPacket packet, boolean oneWay) throws MessagingException;
+
+ void setFailureListener(FailureListener newListener);
+}
Copied: trunk/src/main/org/jboss/jms/client/remoting/RemotingConnectionImpl.java (from rev 3758, trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java)
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/RemotingConnectionImpl.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/remoting/RemotingConnectionImpl.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.client.remoting;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
+
+import org.jboss.jms.client.api.FailureListener;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.RemotingConfiguration;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.util.Logger;
+import org.jboss.messaging.util.MessagingException;
+
+/**
+ *
+ * Encapsulates the state and behaviour from MINA needed for a JMS connection.
+ *
+ * Each JMS connection maintains a single Client instance for invoking on the server.
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class RemotingConnectionImpl implements RemotingConnection
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private RemotingConfiguration remotingConfig;
+
+ private NIOConnector connector;
+
+ private NIOSession session;
+
+ private FailureListener listener;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public RemotingConnectionImpl(RemotingConfiguration remotingConfig) throws Exception
+ {
+ assert remotingConfig != null;
+
+ this.remotingConfig = remotingConfig;
+
+ log.trace(this + " created with configuration " + remotingConfig);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void start() throws Throwable
+ {
+ if (log.isTraceEnabled()) { log.trace(this + " started remoting connection"); }
+
+ connector = REGISTRY.getConnector(remotingConfig);
+ session = connector.connect();
+
+ if (log.isDebugEnabled())
+ log.debug("Using " + connector + " to connect to " + remotingConfig);
+
+ log.trace(this + " started");
+ }
+
+ public void stop()
+ {
+ log.trace(this + " stop");
+
+ try
+ {
+ if (connector != null)
+ {
+ if (listener != null)
+ connector.removeFailureListener(listener);
+ NIOConnector connectorFromRegistry = REGISTRY.removeConnector(remotingConfig);
+ if (connectorFromRegistry != null)
+ connectorFromRegistry.disconnect();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ log.trace(this + " failed to disconnect the new client", ignore);
+ }
+
+ connector = null;
+
+ log.trace(this + " closed");
+ }
+
+ public String getSessionID()
+ {
+ if (session == null || !session.isConnected())
+ {
+ return null;
+ }
+ return session.getID();
+ }
+
+ /**
+ * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
+ */
+ public AbstractPacket send(String id, AbstractPacket packet) throws MessagingException
+ {
+ return send(id, packet, false);
+ }
+
+ public AbstractPacket send(String id, AbstractPacket packet, boolean oneWay) throws MessagingException
+ {
+ assert packet != null;
+
+ packet.setTargetID(id);
+
+ AbstractPacket response;
+
+ try
+ {
+ response = (AbstractPacket) send(packet, oneWay);
+ }
+ catch (Exception e)
+ {
+ log.error("Caught unexpected exception", e);
+
+ throw new MessagingException(MessagingException.INTERNAL_ERROR);
+ }
+
+ if (oneWay == false && response == null)
+ {
+ throw new IllegalStateException("No response received for " + packet);
+ }
+
+ if (response instanceof MessagingExceptionMessage)
+ {
+ MessagingExceptionMessage message = (MessagingExceptionMessage) response;
+
+ throw message.getException();
+ }
+ else
+ {
+ return response;
+ }
+ }
+
+ public synchronized void setFailureListener(FailureListener newListener)
+ {
+ if (listener != null && newListener != null)
+ {
+ throw new IllegalStateException("FailureListener already set to " + listener);
+ }
+
+ if (newListener != null)
+ {
+ connector.addFailureListener(newListener);
+ }
+ else
+ {
+ connector.removeFailureListener(listener);
+ }
+ this.listener = newListener;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ private AbstractPacket send(AbstractPacket packet, boolean oneWay) throws Exception
+ {
+ assert packet != null;
+ checkConnected();
+ packet.setOneWay(oneWay);
+
+ if (oneWay)
+ {
+ session.write(packet);
+ return null;
+ } else
+ {
+ AbstractPacket response = (AbstractPacket) session.writeAndBlock(packet,
+ remotingConfig.getTimeout(), SECONDS);
+ return response;
+ }
+ }
+
+ private void checkConnected() throws MessagingException
+ {
+ if (session == null)
+ {
+ throw new IllegalStateException("Client " + this
+ + " is not connected.");
+ }
+ if (!session.isConnected())
+ {
+ throw new MessagingException(MessagingException.NOT_CONNECTED);
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2008-02-22 16:48:43 UTC (rev 3772)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2008-02-22 17:56:42 UTC (rev 3773)
@@ -8,7 +8,7 @@
import static org.jboss.messaging.core.remoting.Assert.assertValidID;
-import org.jboss.jms.client.remoting.MessagingRemotingConnection;
+import org.jboss.jms.client.remoting.RemotingConnectionImpl;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -37,7 +37,7 @@
* <code>oneWay</code> is <code>true</code> when the packet is sent "one way"
* by the client which does not expect any response to it.
*
- * @see MessagingRemotingConnection#sendOneWay(AbstractPacket)
+ * @see RemotingConnection#sendOneWay(AbstractPacket)
*/
private boolean oneWay = false;
More information about the jboss-cvs-commits
mailing list