Author: jmesnil
Date: 2010-01-27 07:23:11 -0500 (Wed, 27 Jan 2010)
New Revision: 8847
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* fixed disconnection of stomp connections
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26
16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-27
12:23:11 UTC (rev 8847)
@@ -13,8 +13,10 @@
package org.hornetq.core.protocol.stomp;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -48,6 +50,10 @@
private boolean valid;
+ private boolean destroyed = false;
+
+ private final List<FailureListener> failureListeners = new
CopyOnWriteArrayList<FailureListener>();
+
StompConnection(final Connection transportConnection, final StompProtocolManager
manager)
{
this.transportConnection = transportConnection;
@@ -61,6 +67,12 @@
public void addFailureListener(FailureListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ failureListeners.add(listener);
}
public boolean checkDataReceived()
@@ -75,8 +87,16 @@
public void destroy()
{
- manager.cleanup(this);
+ if (destroyed)
+ {
+ return;
+ }
+
+ destroyed = true;
+
transportConnection.close();
+
+ callFailureListeners(new HornetQException(HornetQException.INTERNAL_ERROR,
"Stomp connection destroyed"));
}
public void disconnect()
@@ -93,6 +113,8 @@
public List<FailureListener> getFailureListeners()
{
+ // we do not return the listeners otherwise the remoting service
+ // would NOT destroy the connection.
return Collections.emptyList();
}
@@ -118,7 +140,7 @@
public boolean isDestroyed()
{
- return false;
+ return destroyed;
}
public boolean removeCloseListener(CloseListener listener)
@@ -128,11 +150,19 @@
public boolean removeFailureListener(FailureListener listener)
{
- return false;
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ return failureListeners.remove(listener);
}
public void setFailureListeners(List<FailureListener> listeners)
{
+ failureListeners.clear();
+
+ failureListeners.addAll(listeners);
}
@@ -175,4 +205,25 @@
{
this.valid = valid;
}
+
+ private void callFailureListeners(final HornetQException me)
+ {
+ final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
+
+ for (final FailureListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionFailed(me);
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26
16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27
12:23:11 UTC (rev 8847)
@@ -366,7 +366,7 @@
StompSession stompSession = sessions.get(connection);
if (stompSession == null)
{
- stompSession = new StompSession(marshaller, connection);
+ stompSession = new StompSession(connection, this);
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -389,7 +389,7 @@
StompSession stompSession = transactedSessions.get(txID);
if (stompSession == null)
{
- stompSession = new StompSession(marshaller, connection);
+ stompSession = new StompSession(connection, this);
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -508,35 +508,27 @@
return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
}
- private void send(RemotingConnection connection, StompFrame frame)
+ public int send(RemotingConnection connection, StompFrame frame)
{
System.out.println(">>> " + frame);
-
- try
+ synchronized (connection)
{
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
- }
- catch (IOException e)
- {
- log.error("Unable to send frame " + frame, e);
- }
- }
-
- synchronized void cleanup(StompConnection conn)
- {
- StompSession session = sessions.remove(conn);
- if (session != null)
- {
+ if (connection.isDestroyed())
+ {
+ log.warn("Connection closed " + connection);
+ return 0;
+ }
try
{
- session.getSession().rollback(true);
- session.getSession().close();
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+ return bytes.length;
}
- catch (Exception e)
+ catch (IOException e)
{
- log.error(e);
+ log.error("Unable to send frame " + frame, e);
+ return 0;
}
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 16:14:02
UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-27 12:23:11
UTC (rev 8847)
@@ -18,7 +18,6 @@
import java.util.Map.Entry;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageImpl;
@@ -35,9 +34,9 @@
*/
class StompSession implements SessionCallback
{
- private final RemotingConnection connection;
+ private final StompProtocolManager manager;
- private final StompMarshaller marshaller;
+ private final StompConnection connection;
private ServerSession session;
@@ -46,10 +45,10 @@
// key = message ID, value = consumer ID
private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
- StompSession(final StompMarshaller marshaller, final RemotingConnection connection)
+ StompSession(final StompConnection connection, final StompProtocolManager manager)
{
- this.marshaller = marshaller;
this.connection = connection;
+ this.manager = manager;
}
void setServerSession(ServerSession session)
@@ -101,10 +100,7 @@
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
- System.out.println(">>> " + frame);
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
+ int length = manager.send(connection, frame);
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
@@ -115,7 +111,7 @@
{
messagesToAck.put(serverMessage.getMessageID(), consumerID);
}
- return bytes.length;
+ return length;
}
catch (Exception e)
@@ -151,8 +147,6 @@
public void addSubscription(long consumerID, String subscriptionID, String
destination, String selector, String ack) throws Exception
{
String queue = StompUtils.toHornetQAddress(destination);
- synchronized (session)
- {
session.createConsumer(consumerID,
SimpleString.toSimpleString(queue),
SimpleString.toSimpleString(selector),
@@ -163,7 +157,6 @@
// FIXME not very smart: since we can't start the consumer, we start the
session
// everytime to start the new consumer (and all previous consumers...)
session.start();
- }
}
public boolean unsubscribe(String id) throws Exception
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-26 16:14:02 UTC
(rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-27 12:23:11 UTC
(rev 8847)
@@ -156,7 +156,6 @@
Map.Entry<String, Object> entry = iter.next();
String name = (String)entry.getKey();
Object value = entry.getValue();
- System.out.println(name + "=" + value);
msg.putObjectProperty(name, value);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 16:14:02
UTC (rev 8846)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 12:23:11
UTC (rev 8847)
@@ -60,8 +60,9 @@
import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.UnitTestCase;
-public class StompTest extends TestCase {
+public class StompTest extends UnitTestCase {
private static final transient Logger log = Logger.getLogger(StompTest.class);
private int port = 61613;
private Socket stompSocket;