Author: clebert.suconic(a)jboss.com
Date: 2011-09-29 15:53:05 -0400 (Thu, 29 Sep 2011)
New Revision: 11443
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Log:
improvements on stomp
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -25,6 +25,7 @@
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -56,6 +57,8 @@
private final long creationTime;
private StompDecoder decoder = new StompDecoder();
+
+ private final Acceptor acceptorUsed;
private final List<FailureListener> failureListeners = new
CopyOnWriteArrayList<FailureListener>();
@@ -70,13 +73,15 @@
return decoder;
}
- StompConnection(final Connection transportConnection, final StompProtocolManager
manager)
+ StompConnection(final Acceptor acceptorUsed, final Connection transportConnection,
final StompProtocolManager manager)
{
this.transportConnection = transportConnection;
this.manager = manager;
this.creationTime = System.currentTimeMillis();
+
+ this.acceptorUsed = acceptorUsed;
}
public void addFailureListener(final FailureListener listener)
@@ -186,6 +191,11 @@
callClosingListeners();
}
+
+ Acceptor getAcceptorUsed()
+ {
+ return acceptorUsed;
+ }
private void internalClose()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -112,7 +112,7 @@
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final
Connection connection)
{
- StompConnection conn = new StompConnection(connection, this);
+ StompConnection conn = new StompConnection(acceptorUsed, connection, this);
// Note that STOMP has no heartbeat, so if connection ttl is non zero, data must
continue to be sent or connection
// will be timed out and closed!
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -20,17 +20,21 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.remoting.ReadyListener;
+import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UUIDGenerator;
@@ -54,15 +58,20 @@
private final Map<Long, StompSubscription> subscriptions = new
ConcurrentHashMap<Long, StompSubscription>();
// key = message ID, value = consumer ID
- private final Map<Long, Long> messagesToAck = new ConcurrentHashMap<Long,
Long>();
+ private final Map<Long, Pair<Long, Integer>> messagesToAck = new
ConcurrentHashMap<Long, Pair<Long, Integer>>();
private volatile boolean noLocal = false;
+ private final int consumerCredits;
+
StompSession(final StompConnection connection, final StompProtocolManager manager,
OperationContext sessionContext)
{
this.connection = connection;
this.manager = manager;
this.sessionContext = sessionContext;
+ this.consumerCredits =
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT,
+
TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT,
+
connection.getAcceptorUsed().getConfiguration());
}
void setServerSession(ServerSession session)
@@ -119,19 +128,20 @@
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
- if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
+ int length = frame.getEncodedSize();
+
+ if (subscription.isAutoACK())
{
session.acknowledge(consumerID, serverMessage.getMessageID());
session.commit();
}
else
{
- messagesToAck.put(serverMessage.getMessageID(), consumerID);
+ messagesToAck.put(serverMessage.getMessageID(), new Pair<Long,
Integer>(consumerID, length));
}
// Must send AFTER adding to messagesToAck - or could get acked from client
BEFORE it's been added!
manager.send(connection, frame);
- int length = frame.getEncodedSize();
return length;
@@ -157,10 +167,10 @@
public void closed()
{
}
-
+
public void addReadyListener(final ReadyListener listener)
{
- connection.getTransportConnection().addReadyListener(listener);
+ connection.getTransportConnection().addReadyListener(listener);
}
public void removeReadyListener(final ReadyListener listener)
@@ -171,9 +181,21 @@
public void acknowledge(String messageID) throws Exception
{
long id = Long.parseLong(messageID);
- long consumerID = messagesToAck.remove(id);
- session.acknowledge(consumerID, id);
- session.commit();
+ Pair<Long, Integer> pair = messagesToAck.remove(id);
+
+ if (pair != null)
+ {
+ long consumerID = pair.a;
+ int credits = pair.b;
+
+ if (this.consumerCredits != -1)
+ {
+ session.receiveConsumerCredits(consumerID, credits);
+ }
+
+ session.acknowledge(consumerID, id);
+ session.commit();
+ }
}
public void addSubscription(long consumerID,
@@ -200,14 +222,6 @@
{
session.createQueue(SimpleString.toSimpleString(destination), queue, null,
false, true);
}
- else
- {
- // Already exists
- if (query.getConsumerCount() > 0)
- {
- throw new IllegalStateException("Cannot create a subscriber on the
durable subscription since it already has a subscriber: " + queue);
- }
- }
}
else
{
@@ -216,11 +230,19 @@
}
}
session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector),
false);
- session.receiveConsumerCredits(consumerID, -1);
- StompSubscription subscription = new StompSubscription(subscriptionID, ack);
+
+ StompSubscription subscription = new StompSubscription(subscriptionID,
ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO));
subscriptions.put(consumerID, subscription);
- // FIXME not very smart: since we can't start the consumer, we start the
session
- // every time to start the new consumer (and all previous consumers...)
+
+ if (subscription.isAutoACK())
+ {
+ session.receiveConsumerCredits(consumerID, -1);
+ }
+ else
+ {
+ session.receiveConsumerCredits(consumerID, consumerCredits);
+ }
+
session.start();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -27,24 +27,24 @@
// Attributes ----------------------------------------------------
private final String subID;
+
+ private final boolean autoACK;
- private final String ack;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public StompSubscription(String subID, String ack)
+ public StompSubscription(String subID, boolean ack)
{
this.subID = subID;
- this.ack = ack;
+ this.autoACK = ack;
}
// Public --------------------------------------------------------
- public String getAck()
+ public boolean isAutoACK()
{
- return ack;
+ return autoACK;
}
public String getID()
@@ -55,7 +55,7 @@
@Override
public String toString()
{
- return "StompSubscription[id=" + subID + ", ack=" + ack +
"]";
+ return "StompSubscription[id=" + subID + ", autoACK=" + autoACK
+ "]";
}
// Package protected ---------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -61,6 +61,8 @@
private boolean paused;
private NotificationService notificationService;
+
+ private final Map<String, Object> configuration;
public InVMAcceptor(final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@@ -70,6 +72,8 @@
{
this.clusterConnection = clusterConnection;
+ this.configuration = configuration;
+
this.handler = handler;
this.listener = listener;
@@ -78,6 +82,11 @@
executorFactory = new OrderedExecutorFactory(threadPool);
}
+
+ public Map<String, Object> getConfiguration()
+ {
+ return configuration;
+ }
public ClusterConnection getClusterConnection()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -143,6 +143,8 @@
private HttpAcceptorHandler httpHandler = null;
private final ConcurrentMap<Object, NettyConnection> connections = new
ConcurrentHashMap<Object, NettyConnection>();
+
+ private final Map<String, Object> configuration;
private final Executor threadPool;
@@ -185,6 +187,8 @@
this.clusterConnection = clusterConnection;
+ this.configuration = configuration;
+
this.handler = handler;
this.decoder = decoder;
@@ -505,6 +509,11 @@
serverChannelGroup.add(serverChannel);
}
}
+
+ public Map<String, Object> getConfiguration()
+ {
+ return this.configuration;
+ }
public synchronized void stop()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -76,6 +76,10 @@
public static final String CLUSTER_CONNECTION = "cluster-connection";
+ public static final String STOMP_CONSUMERS_CREDIT =
"stomp-consumer-credits";
+
+ public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
+
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -154,6 +158,7 @@
allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
+ allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -13,6 +13,8 @@
package org.hornetq.spi.core.remoting;
+import java.util.Map;
+
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.NotificationService;
@@ -35,6 +37,8 @@
* @return the cluster connection associated with this Acceptor
*/
ClusterConnection getClusterConnection();
+
+ Map<String, Object> getConfiguration();
/**
* Set the notification service for this acceptor to use.
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-29
03:26:28 UTC (rev 11442)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-29
19:53:05 UTC (rev 11443)
@@ -117,6 +117,7 @@
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
+ params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
@@ -168,7 +169,7 @@
protected Socket createSocket() throws IOException
{
- return new Socket("127.0.0.1", port);
+ return new Socket("localhost", port);
}
protected String getQueueName()