[hornetq-commits] JBoss hornetq SVN: r11443 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/remoting/impl/invm and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 29 15:53:05 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-29 15:53:05 -0400 (Thu, 29 Sep 2011)
New Revision: 11443

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Log:
improvements on stomp

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -25,6 +25,7 @@
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.Connection;
 
 /**
@@ -56,6 +57,8 @@
    private final long creationTime;
 
    private StompDecoder decoder = new StompDecoder();
+   
+   private final Acceptor acceptorUsed;
 
    private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
 
@@ -70,13 +73,15 @@
       return decoder;
    }
 
-   StompConnection(final Connection transportConnection, final StompProtocolManager manager)
+   StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager)
    {
       this.transportConnection = transportConnection;
 
       this.manager = manager;
       
       this.creationTime = System.currentTimeMillis();
+      
+      this.acceptorUsed = acceptorUsed;
    }
 
    public void addFailureListener(final FailureListener listener)
@@ -186,6 +191,11 @@
 
       callClosingListeners();
    }
+   
+   Acceptor getAcceptorUsed()
+   {
+      return acceptorUsed;
+   }
 
    private void internalClose()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -112,7 +112,7 @@
 
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
    {
-      StompConnection conn = new StompConnection(connection, this);
+      StompConnection conn = new StompConnection(acceptorUsed, connection, this);
 
       // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
       // will be timed out and closed!

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -20,17 +20,21 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.server.QueueQueryResult;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
 import org.hornetq.spi.core.remoting.ReadyListener;
+import org.hornetq.utils.ConfigurationHelper;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -54,15 +58,20 @@
    private final Map<Long, StompSubscription> subscriptions = new ConcurrentHashMap<Long, StompSubscription>();
 
    // key = message ID, value = consumer ID
-   private final Map<Long, Long> messagesToAck = new ConcurrentHashMap<Long, Long>();
+   private final Map<Long, Pair<Long, Integer>> messagesToAck = new ConcurrentHashMap<Long, Pair<Long, Integer>>();
 
    private volatile boolean noLocal = false;
 
+   private final int consumerCredits;
+
    StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext)
    {
       this.connection = connection;
       this.manager = manager;
       this.sessionContext = sessionContext;
+      this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT,
+                                                               TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT,
+                                                               connection.getAcceptorUsed().getConfiguration());
    }
 
    void setServerSession(ServerSession session)
@@ -119,19 +128,20 @@
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
 
-         if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
+         int length = frame.getEncodedSize();
+
+         if (subscription.isAutoACK())
          {
             session.acknowledge(consumerID, serverMessage.getMessageID());
             session.commit();
          }
          else
          {
-            messagesToAck.put(serverMessage.getMessageID(), consumerID);
+            messagesToAck.put(serverMessage.getMessageID(), new Pair<Long, Integer>(consumerID, length));
          }
 
          // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
          manager.send(connection, frame);
-         int length = frame.getEncodedSize();
 
          return length;
 
@@ -157,10 +167,10 @@
    public void closed()
    {
    }
-   
+
    public void addReadyListener(final ReadyListener listener)
    {
-      connection.getTransportConnection().addReadyListener(listener);      
+      connection.getTransportConnection().addReadyListener(listener);
    }
 
    public void removeReadyListener(final ReadyListener listener)
@@ -171,9 +181,21 @@
    public void acknowledge(String messageID) throws Exception
    {
       long id = Long.parseLong(messageID);
-      long consumerID = messagesToAck.remove(id);
-      session.acknowledge(consumerID, id);
-      session.commit();
+      Pair<Long, Integer> pair = messagesToAck.remove(id);
+
+      if (pair != null)
+      {
+         long consumerID = pair.a;
+         int credits = pair.b;
+   
+         if (this.consumerCredits != -1)
+         {
+            session.receiveConsumerCredits(consumerID, credits);
+         }
+         
+         session.acknowledge(consumerID, id);
+         session.commit();
+      }
    }
 
    public void addSubscription(long consumerID,
@@ -200,14 +222,6 @@
             {
                session.createQueue(SimpleString.toSimpleString(destination), queue, null, false, true);
             }
-            else
-            {
-               // Already exists
-               if (query.getConsumerCount() > 0)
-               {
-                  throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has a subscriber: " + queue);
-               }
-            }
          }
          else
          {
@@ -216,11 +230,19 @@
          }
       }
       session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false);
-      session.receiveConsumerCredits(consumerID, -1);
-      StompSubscription subscription = new StompSubscription(subscriptionID, ack);
+
+      StompSubscription subscription = new StompSubscription(subscriptionID, ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO));
       subscriptions.put(consumerID, subscription);
-      // FIXME not very smart: since we can't start the consumer, we start the session
-      // every time to start the new consumer (and all previous consumers...)
+      
+      if (subscription.isAutoACK())
+      {
+         session.receiveConsumerCredits(consumerID, -1);
+      }
+      else
+      {
+         session.receiveConsumerCredits(consumerID, consumerCredits);
+      }
+
       session.start();
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -27,24 +27,24 @@
    // Attributes ----------------------------------------------------
 
    private final String subID;
+   
+   private final boolean autoACK;
 
-   private final String ack;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public StompSubscription(String subID, String ack)
+   public StompSubscription(String subID, boolean ack)
    {
       this.subID = subID;
-      this.ack = ack;
+      this.autoACK = ack;
    }
 
    // Public --------------------------------------------------------
 
-   public String getAck()
+   public boolean isAutoACK()
    {
-      return ack;
+      return autoACK;
    }
 
    public String getID()
@@ -55,7 +55,7 @@
    @Override
    public String toString()
    {
-      return "StompSubscription[id=" + subID + ", ack=" + ack + "]";
+      return "StompSubscription[id=" + subID + ", autoACK=" + autoACK + "]";
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -61,6 +61,8 @@
    private boolean paused;
 
    private NotificationService notificationService;
+   
+   private final Map<String, Object> configuration;
 
    public InVMAcceptor(final ClusterConnection clusterConnection,
                        final Map<String, Object> configuration,
@@ -70,6 +72,8 @@
    {
       this.clusterConnection = clusterConnection;
       
+      this.configuration = configuration;
+      
       this.handler = handler;
       
       this.listener = listener;
@@ -78,6 +82,11 @@
 
       executorFactory = new OrderedExecutorFactory(threadPool);
    }
+   
+   public Map<String, Object> getConfiguration()
+   {
+      return configuration;
+   }
 
    public ClusterConnection getClusterConnection()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -143,6 +143,8 @@
    private HttpAcceptorHandler httpHandler = null;
 
    private final ConcurrentMap<Object, NettyConnection> connections = new ConcurrentHashMap<Object, NettyConnection>();
+   
+   private final Map<String, Object> configuration;
 
    private final Executor threadPool;
 
@@ -185,6 +187,8 @@
       
       this.clusterConnection = clusterConnection;
       
+      this.configuration = configuration;
+      
       this.handler = handler;
 
       this.decoder = decoder;
@@ -505,6 +509,11 @@
          serverChannelGroup.add(serverChannel);
       }
    }
+   
+   public Map<String, Object> getConfiguration()
+   {
+      return this.configuration;
+   }
 
    public synchronized void stop()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -76,6 +76,10 @@
    
    public static final String CLUSTER_CONNECTION = "cluster-connection";
    
+   public static final String STOMP_CONSUMERS_CREDIT = "stomp-consumer-credits";
+   
+   public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
+   
    public static final boolean DEFAULT_SSL_ENABLED = false;
 
    public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -154,6 +158,7 @@
       allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
       allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
       allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
+      allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
 
       ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -13,6 +13,8 @@
 
 package org.hornetq.spi.core.remoting;
 
+import java.util.Map;
+
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.management.NotificationService;
@@ -35,6 +37,8 @@
     * @return the cluster connection associated with this Acceptor
     */
    ClusterConnection getClusterConnection();
+   
+   Map<String, Object> getConfiguration();
 
    /**
     * Set the notification service for this acceptor to use.

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java	2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java	2011-09-29 19:53:05 UTC (rev 11443)
@@ -117,6 +117,7 @@
       Map<String, Object> params = new HashMap<String, Object>();
       params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
       params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
       TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
       config.getAcceptorConfigurations().add(stompTransport);
       config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
@@ -168,7 +169,7 @@
 
    protected Socket createSocket() throws IOException
    {
-      return new Socket("127.0.0.1", port);
+      return new Socket("localhost", port);
    }
 
    protected String getQueueName()



More information about the hornetq-commits mailing list