[hornetq-commits] JBoss hornetq SVN: r8847 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 27 07:23:12 EST 2010


Author: jmesnil
Date: 2010-01-27 07:23:11 -0500 (Wed, 27 Jan 2010)
New Revision: 8847

Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* fixed disconnection of stomp connections

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-27 12:23:11 UTC (rev 8847)
@@ -13,8 +13,10 @@
 
 package org.hornetq.core.protocol.stomp;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
@@ -48,6 +50,10 @@
 
    private boolean valid;
 
+   private boolean destroyed = false;
+
+   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+
    StompConnection(final Connection transportConnection, final StompProtocolManager manager)
    {
       this.transportConnection = transportConnection;
@@ -61,6 +67,12 @@
 
    public void addFailureListener(FailureListener listener)
    {
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      failureListeners.add(listener);
    }
 
    public boolean checkDataReceived()
@@ -75,8 +87,16 @@
 
    public void destroy()
    {
-      manager.cleanup(this);
+      if (destroyed)
+      {
+         return;
+      }
+
+      destroyed = true;
+
       transportConnection.close();
+      
+      callFailureListeners(new HornetQException(HornetQException.INTERNAL_ERROR, "Stomp connection destroyed"));
    }
 
    public void disconnect()
@@ -93,6 +113,8 @@
 
    public List<FailureListener> getFailureListeners()
    {
+      // we do not return the listeners otherwise the remoting service
+      // would NOT destroy the connection.
       return Collections.emptyList();
    }
 
@@ -118,7 +140,7 @@
 
    public boolean isDestroyed()
    {
-      return false;
+      return destroyed;
    }
 
    public boolean removeCloseListener(CloseListener listener)
@@ -128,11 +150,19 @@
 
    public boolean removeFailureListener(FailureListener listener)
    {
-      return false;
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      return failureListeners.remove(listener);
    }
 
    public void setFailureListeners(List<FailureListener> listeners)
    {
+      failureListeners.clear();
+
+      failureListeners.addAll(listeners);
    }
 
    
@@ -175,4 +205,25 @@
    {
       this.valid = valid;
    }
+   
+   private void callFailureListeners(final HornetQException me)
+   {
+      final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
+
+      for (final FailureListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionFailed(me);
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            log.error("Failed to execute failure listener", t);
+         }
+      }
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-27 12:23:11 UTC (rev 8847)
@@ -366,7 +366,7 @@
       StompSession stompSession = sessions.get(connection);
       if (stompSession == null)
       {
-         stompSession = new StompSession(marshaller, connection);
+         stompSession = new StompSession(connection, this);
          String name = UUIDGenerator.getInstance().generateStringUUID();
          ServerSession session = server.createSession(name,
                                                       connection.getLogin(),
@@ -389,7 +389,7 @@
       StompSession stompSession = transactedSessions.get(txID);
       if (stompSession == null)
       {
-         stompSession = new StompSession(marshaller, connection);
+         stompSession = new StompSession(connection, this);
          String name = UUIDGenerator.getInstance().generateStringUUID();
          ServerSession session = server.createSession(name,
                                                       connection.getLogin(),
@@ -508,35 +508,27 @@
       return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
    }
 
-   private void send(RemotingConnection connection, StompFrame frame)
+   public int send(RemotingConnection connection, StompFrame frame)
    {
       System.out.println(">>> " + frame);
-
-      try
+      synchronized (connection)
       {
-         byte[] bytes = marshaller.marshal(frame);
-         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-         connection.getTransportConnection().write(buffer, true);
-      }
-      catch (IOException e)
-      {
-         log.error("Unable to send frame " + frame, e);
-      }
-   }
-
-   synchronized void cleanup(StompConnection conn)
-   {
-      StompSession session = sessions.remove(conn);
-      if (session != null)
-      {
+         if (connection.isDestroyed())
+         {
+            log.warn("Connection closed " + connection);
+            return 0;
+         }
          try
          {
-            session.getSession().rollback(true);
-            session.getSession().close();
+            byte[] bytes = marshaller.marshal(frame);
+            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+            connection.getTransportConnection().write(buffer, true);
+            return bytes.length;
          }
-         catch (Exception e)
+         catch (IOException e)
          {
-            log.error(e);
+            log.error("Unable to send frame " + frame, e);
+            return 0;
          }
       }
    }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-27 12:23:11 UTC (rev 8847)
@@ -18,7 +18,6 @@
 import java.util.Map.Entry;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.message.impl.MessageImpl;
@@ -35,9 +34,9 @@
  */
 class StompSession implements SessionCallback
 {
-   private final RemotingConnection connection;
+   private final StompProtocolManager manager;
 
-   private final StompMarshaller marshaller;
+   private final StompConnection connection;
 
    private ServerSession session;
 
@@ -46,10 +45,10 @@
    // key = message ID, value = consumer ID
    private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
 
-   StompSession(final StompMarshaller marshaller, final RemotingConnection connection)
+   StompSession(final StompConnection connection, final StompProtocolManager manager)
    {
-      this.marshaller = marshaller;
       this.connection = connection;
+      this.manager = manager;
    }
 
    void setServerSession(ServerSession session)
@@ -101,10 +100,7 @@
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
          
-         System.out.println(">>> " + frame);
-         byte[] bytes = marshaller.marshal(frame);
-         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-         connection.getTransportConnection().write(buffer, true);
+         int length = manager.send(connection, frame);
 
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
          {
@@ -115,7 +111,7 @@
          {
             messagesToAck.put(serverMessage.getMessageID(), consumerID);
          }
-         return bytes.length;
+         return length;
 
       }
       catch (Exception e)
@@ -151,8 +147,6 @@
    public void addSubscription(long consumerID, String subscriptionID, String destination, String selector, String ack) throws Exception
    {
       String queue = StompUtils.toHornetQAddress(destination);
-      synchronized (session)
-      {
          session.createConsumer(consumerID,
                                 SimpleString.toSimpleString(queue),
                                 SimpleString.toSimpleString(selector),
@@ -163,7 +157,6 @@
          // FIXME not very smart: since we can't start the consumer, we start the session
          // everytime to start the new consumer (and all previous consumers...)
          session.start();
-      }
    }
 
    public boolean unsubscribe(String id) throws Exception

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-27 12:23:11 UTC (rev 8847)
@@ -156,7 +156,6 @@
          Map.Entry<String, Object> entry = iter.next();
          String name = (String)entry.getKey();
          Object value = entry.getValue();
-         System.out.println(name + "=" + value);   
          msg.putObjectProperty(name, value);
       }
    }

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-27 12:23:11 UTC (rev 8847)
@@ -60,8 +60,9 @@
 import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
 import org.hornetq.jms.server.impl.JMSServerManagerImpl;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.UnitTestCase;
 
-public class StompTest extends TestCase {
+public class StompTest extends UnitTestCase {
     private static final transient Logger log = Logger.getLogger(StompTest.class);
     private int port = 61613;
     private Socket stompSocket;



More information about the hornetq-commits mailing list