[hornetq-commits] JBoss hornetq SVN: r8998 - in trunk: src/config/jboss-6/clustered and 32 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Mar 29 09:35:31 EDT 2010
Author: timfox
Date: 2010-03-29 09:35:27 -0400 (Mon, 29 Mar 2010)
New Revision: 8998
Added:
trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java
Removed:
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/
Modified:
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/config/jboss-6/clustered/hornetq-configuration.xml
trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml
trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
trunk/src/config/trunk/clustered/hornetq-configuration.xml
trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
Log:
batching optimisation
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2010-03-29 13:35:27 UTC (rev 8998)
@@ -291,7 +291,7 @@
<xsd:sequence>
<xsd:element maxOccurs="1" minOccurs="1" name="queue-name" type="xsd:IDREF">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="1" name="forwarding-address" type="xsd:string">
+ <xsd:element maxOccurs="1" minOccurs="0" name="forwarding-address" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="filter">
<xsd:complexType>
Modified: trunk/src/config/jboss-6/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-6/clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-6/clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -22,6 +22,13 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -35,6 +42,13 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
Modified: trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -20,6 +20,13 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -33,6 +40,13 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
Modified: trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -22,6 +22,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -35,6 +42,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5445}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -20,6 +20,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -33,6 +40,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5445}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -20,6 +20,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
</connectors>
<acceptors>
@@ -28,6 +35,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
</acceptors>
<broadcast-groups>
Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -18,6 +18,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
</connectors>
<acceptors>
@@ -26,6 +33,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
</acceptors>
<security-settings>
Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -12,6 +12,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
</connectors>
<acceptors>
@@ -20,6 +27,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
</acceptors>
<broadcast-groups>
Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -10,6 +10,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
+
+ <connector name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </connector>
</connectors>
<acceptors>
@@ -18,6 +25,13 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
+
+ <acceptor name="netty-throughput">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
+ <param key="batch-delay" value="50"/>
+ </acceptor>
</acceptors>
<security-settings>
Modified: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -724,7 +724,7 @@
* @param size initial size of messages created through this factory.
*/
void setInitialMessagePacketSize(int size);
-
+
/**
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
*
Modified: trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -89,7 +89,7 @@
public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
-
+
/**
* Creates a ClientSessionFactory using all the defaults.
*
Modified: trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -13,18 +13,15 @@
package org.hornetq.api.core.management;
+import java.util.Set;
+
import javax.management.MBeanOperationInfo;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.core.security.Role;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
/**
* A HornetQServerControl is used to manage HornetQ servers.
*/
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -274,7 +274,7 @@
}
else
{
- channel.send(packet);
+ channel.sendBatched(packet);
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -153,7 +153,7 @@
private static ScheduledExecutorService globalScheduledThreadPool;
private String groupID;
-
+
private static synchronized ExecutorService getGlobalThreadPool()
{
if (ClientSessionFactoryImpl.globalThreadPool == null)
@@ -244,7 +244,7 @@
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
- reconnectAttempts,
+ reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors);
@@ -778,7 +778,7 @@
checkWrite();
initialMessagePacketSize = size;
}
-
+
public ClientSession createSession(final String username,
final String password,
final boolean xa,
@@ -984,7 +984,7 @@
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
- reconnectAttempts,
+ reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -154,7 +154,7 @@
private final int minLargeMessageSize;
- private final int initialMessagePacketSize;
+ private volatile int initialMessagePacketSize;
private final boolean cacheLargeMessageClient;
@@ -989,7 +989,7 @@
HornetQBuffer buffer = packet.encode(channel.getConnection());
- conn.write(buffer, false);
+ conn.write(buffer, false, false);
}
resetCreditManager = true;
@@ -1040,6 +1040,16 @@
}
}
}
+
+
+
+ public void setPacketSize(final int packetSize)
+ {
+ if (packetSize > this.initialMessagePacketSize)
+ {
+ this.initialMessagePacketSize = (int)(packetSize * 1.2);
+ }
+ }
private void sendPacketWithoutLock(final Packet packet)
{
@@ -1049,7 +1059,7 @@
HornetQBuffer buffer = packet.encode(channel.getConnection());
- conn.write(buffer, false);
+ conn.write(buffer, false, false);
}
public void workDone()
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -81,4 +81,6 @@
ClientProducerCreditManager getProducerCreditManager();
void setAddress(Message message, SimpleString address);
+
+ void setPacketSize(int packetSize);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -545,4 +545,9 @@
{
session.setAddress(message, address);
}
+
+ public void setPacketSize(int packetSize)
+ {
+ session.setPacketSize(packetSize);
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -75,7 +75,7 @@
private static final long serialVersionUID = 2512460695662741413L;
private static final Logger log = Logger.getLogger(FailoverManagerImpl.class);
-
+
// debug
private static Map<TransportConfiguration, Set<CoreRemotingConnection>> debugConns;
@@ -199,7 +199,7 @@
private volatile boolean exitLoop;
private final List<Interceptor> interceptors;
-
+
// Static
// ---------------------------------------------------------------------------------------
@@ -383,7 +383,7 @@
{
theConnection.destroy();
}
-
+
if (e.getCode() == HornetQException.UNBLOCKED)
{
// This means the thread was blocked on create session and failover unblocked it
@@ -440,6 +440,7 @@
}
catch (Throwable t)
{
+ t.printStackTrace();
if (lock != null)
{
lock.unlock();
@@ -552,7 +553,7 @@
private void failoverOrReconnect(final Object connectionID, final HornetQException me)
{
Set<ClientSessionInternal> sessionsToClose = null;
-
+
synchronized (failoverLock)
{
if (connection == null || connection.getID() != connectionID)
@@ -561,7 +562,7 @@
// over then a async connection exception or disconnect
// came in for one of the already exitLoop connections, so we return true - we don't want to call the
// listeners again
-
+
return;
}
@@ -609,7 +610,7 @@
{
attemptReconnect = reconnectAttempts != 0;
}
-
+
if (attemptFailover || attemptReconnect)
{
lockChannel1();
@@ -664,7 +665,7 @@
{
}
- cancelPinger();
+ cancelScheduledTasks();
connector = null;
@@ -853,7 +854,7 @@
}
}
- private void cancelPinger()
+ private void cancelScheduledTasks()
{
if (pingerFuture != null)
{
@@ -871,7 +872,7 @@
{
if (connection != null && sessions.size() == 0)
{
- cancelPinger();
+ cancelScheduledTasks();
try
{
@@ -905,7 +906,7 @@
Connection tc = null;
try
- {
+ {
DelegatingBufferHandler handler = new DelegatingBufferHandler();
connector = connectorFactory.createConnector(transportParams,
@@ -918,7 +919,7 @@
if (connector != null)
{
connector.start();
-
+
tc = connector.createConnection();
if (tc == null)
@@ -986,7 +987,7 @@
{
pingRunnable = new PingRunnable();
- pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduled(pingRunnable),
+ pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduledPinger(pingRunnable),
0,
clientFailureCheckPeriod,
TimeUnit.MILLISECONDS);
@@ -1113,11 +1114,11 @@
}
}
- private static final class ActualScheduled implements Runnable
+ private static final class ActualScheduledPinger implements Runnable
{
private final WeakReference<PingRunnable> pingRunnable;
- ActualScheduled(final PingRunnable runnable)
+ ActualScheduledPinger(final PingRunnable runnable)
{
pingRunnable = new WeakReference<PingRunnable>(runnable);
}
@@ -1134,6 +1135,47 @@
}
+ private static final class ActualScheduledBatchFlusher implements Runnable
+ {
+ private final WeakReference<BatchFlushRunnable> batchFlushRunnable;
+
+ ActualScheduledBatchFlusher(final BatchFlushRunnable runnable)
+ {
+ batchFlushRunnable = new WeakReference<BatchFlushRunnable>(runnable);
+ }
+
+ public void run()
+ {
+ BatchFlushRunnable runnable = batchFlushRunnable.get();
+
+ if (runnable != null)
+ {
+ runnable.run();
+ }
+ }
+
+ }
+
+ private final class BatchFlushRunnable implements Runnable
+ {
+ private boolean cancelled;
+
+ public synchronized void run()
+ {
+ if (cancelled)
+ {
+ return;
+ }
+ //log.info("calling check flush on client");
+ connection.getTransportConnection().checkFlushBatchBuffer();
+ }
+
+ public synchronized void cancel()
+ {
+ cancelled = true;
+ }
+ }
+
private final class PingRunnable implements Runnable
{
private boolean cancelled;
@@ -1186,7 +1228,7 @@
Channel channel0 = connection.getChannel(0, -1);
channel0.send(ping);
-
+
connection.flush();
}
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -183,7 +183,7 @@
* Sets whether this server is manageable using JMX or not.
*/
void setJMXManagementEnabled(boolean enabled);
-
+
/**
* Returns the domain used by JMX MBeans (provided JMX management is enabled).
* <br>
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -154,7 +154,7 @@
public static final int DEFAULT_ID_CACHE_SIZE = 2000;
public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
-
+
public static final boolean DEFAULT_CLUSTER_DUPLICATE_DETECTION = true;
public static final boolean DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS = false;
@@ -220,7 +220,7 @@
protected String logDelegateFactoryClassName = ConfigurationImpl.DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME;
protected List<String> interceptorClassNames = new ArrayList<String>();
-
+
protected Map<String, TransportConfiguration> connectorConfigs = new HashMap<String, TransportConfiguration>();
protected Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
@@ -486,7 +486,7 @@
{
this.backupConnectorName = backupConnectorName;
}
-
+
public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
{
return groupingHandlerConfiguration;
@@ -1280,6 +1280,7 @@
{
return false;
}
+
return true;
}
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -16,7 +16,6 @@
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
-import java.util.Properties;
import org.hornetq.core.deployers.impl.FileConfigurationParser;
import org.hornetq.core.logging.Logger;
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -183,7 +183,7 @@
public int getHeadersAndPropertiesEncodeSize()
{
return DataConstants.SIZE_LONG + // Message ID
- /* address */SimpleString.sizeofString(address) +
+ /* address */SimpleString.sizeofNullableString(address) +
DataConstants./* Type */SIZE_BYTE +
DataConstants./* Durable */SIZE_BOOLEAN +
DataConstants./* Expiration */SIZE_LONG +
@@ -191,11 +191,12 @@
DataConstants./* Priority */SIZE_BYTE +
/* PropertySize and Properties */properties.getEncodeSize();
}
+
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
{
buffer.writeLong(messageID);
- buffer.writeSimpleString(address);
+ buffer.writeNullableSimpleString(address);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
@@ -207,7 +208,7 @@
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
messageID = buffer.readLong();
- address = buffer.readSimpleString();
+ address = buffer.readNullableSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
@@ -215,7 +216,7 @@
priority = buffer.readByte();
properties.decode(buffer);
}
-
+
public HornetQBuffer getBodyBuffer()
{
if (bodyBuffer == null)
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -689,6 +689,7 @@
return;
}
}
+
runnable.run();
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -88,8 +88,7 @@
Object getTransferLock();
/**
- *
- * @return the maximum batch size used when batching writes
+ * Called periodically to flush any data in the batch buffer
*/
- int getMaxBatchSize();
+ void checkFlushBatchBuffer();
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -74,7 +74,7 @@
private CommandConfirmationHandler commandConfirmationHandler;
private volatile boolean transferring;
-
+
public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
{
this.connection = connection;
@@ -161,7 +161,7 @@
packet.setChannelID(id);
HornetQBuffer buffer = packet.encode(connection);
-
+
lock.lock();
try
@@ -189,7 +189,7 @@
resendCache.add(packet);
}
- connection.getTransportConnection().write(buffer, flush);
+ connection.getTransportConnection().write(buffer, flush, batch);
}
finally
{
@@ -198,9 +198,6 @@
}
}
- public void checkFlushBatchBuffer()
- {
- }
public Packet sendBlocking(final Packet packet) throws HornetQException
{
@@ -245,7 +242,7 @@
resendCache.add(packet);
}
- connection.getTransportConnection().write(buffer);
+ connection.getTransportConnection().write(buffer, false, false);
long toWait = connection.getBlockingCallTimeout();
@@ -385,8 +382,6 @@
// Needs to be synchronized since can be called by remoting service timer thread too for timeout flush
public synchronized void flushConfirmations()
{
- checkFlushBatchBuffer();
-
if (resendCache != null && receivedBytes != 0)
{
receivedBytes = 0;
@@ -480,7 +475,7 @@
{
final HornetQBuffer buffer = packet.encode(connection);
- connection.getTransportConnection().write(buffer);
+ connection.getTransportConnection().write(buffer, false, false);
}
private void clearUpTo(final int lastReceivedCommandID)
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -57,12 +57,11 @@
{
final Configuration config = server.getConfiguration();
- CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
- interceptors,
- config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
- .getExecutor()
- : null,
- connection.getBatchingBufferSize());
+ final CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
+ interceptors,
+ config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
+ .getExecutor()
+ : null);
Channel channel1 = rc.getChannel(1, -1);
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -23,7 +23,6 @@
import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.core.logging.Logger;
@@ -34,7 +33,6 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
-import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleIDGenerator;
/**
@@ -89,8 +87,6 @@
private final Executor executor;
- private final int maxBatchSize;
-
private volatile boolean executing;
// Constructors
@@ -103,7 +99,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, blockingCallTimeout, interceptors, true, null, -1);
+ this(transportConnection, blockingCallTimeout, interceptors, true, null);
}
/*
@@ -111,19 +107,17 @@
*/
public RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
- final Executor executor,
- final int maxBatchSize)
+ final Executor executor)
{
- this(transportConnection, -1, interceptors, false, executor, maxBatchSize);
+ this(transportConnection, -1, interceptors, false, executor);
}
private RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean client,
- final Executor executor,
- final int maxBatchSize)
+ final Executor executor)
{
this.transportConnection = transportConnection;
@@ -135,8 +129,6 @@
this.client = client;
this.executor = executor;
-
- this.maxBatchSize = maxBatchSize;
}
// RemotingConnection implementation
@@ -372,9 +364,9 @@
}
}
- public int getMaxBatchSize()
+ public void checkFlushBatchBuffer()
{
- return maxBatchSize;
+ transportConnection.checkFlushBatchBuffer();
}
// Buffer Handler implementation
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -108,7 +108,7 @@
this.frameDecoder = new StompFrameDecoder();
this.executor = server.getExecutorFactory().getExecutor();
}
-
+
// ProtocolManager implementation --------------------------------
public ConnectionEntry createConnectionEntry(final Connection connection)
@@ -261,7 +261,7 @@
try
{
HornetQBuffer buffer = frame.toHornetQBuffer();
- connection.getTransportConnection().write(buffer, false);
+ connection.getTransportConnection().write(buffer, false, false);
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -45,6 +45,8 @@
private final int serverID;
private final Executor executor;
+
+ private volatile boolean closing;
public InVMConnection(final int serverID,
final BufferHandler handler,
@@ -73,8 +75,6 @@
listener.connectionCreated(this, ProtocolType.CORE);
}
- private volatile boolean closing;
-
public void close()
{
if (closing)
@@ -104,13 +104,17 @@
{
return id;
}
+
+ public void checkFlushBatchBuffer()
+ {
+ }
public void write(final HornetQBuffer buffer)
{
- write(buffer, false);
+ write(buffer, false, false);
}
-
- public void write(final HornetQBuffer buffer, final boolean flush)
+
+ public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch)
{
final HornetQBuffer copied = buffer.copy(0, buffer.capacity());
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -31,7 +31,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.server.RemotingService;
@@ -89,17 +88,16 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
- private Map<ProtocolType, ProtocolManager> protocolMap =
- new ConcurrentHashMap<ProtocolType, ProtocolManager>();
-
+
+ private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public RemotingServiceImpl(final Configuration config,
final HornetQServer server,
- final ManagementService managementService,
+ final ManagementService managementService,
final ScheduledExecutorService scheduledThreadPool)
{
transportConfigs = config.getAcceptorConfigurations();
@@ -121,13 +119,15 @@
}
this.config = config;
+
this.managementService = managementService;
-
+
this.scheduledThreadPool = scheduledThreadPool;
-
- this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
- this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
- this.protocolMap.put(ProtocolType.AARDVARK, new AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
+
+ this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server,
+ interceptors));
+ this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
+ interceptors));
}
// RemotingService implementation -------------------------------
@@ -138,17 +138,18 @@
{
return;
}
-
- //The remoting service maintains it's own thread pool for handling remoting traffic
- //If OIO each connection will have it's own thread
- //If NIO these are capped at nio-remoting-threads which defaults to num cores * 3
- //This needs to be a different thread pool to the main thread pool especially for OIO where we may need
- //to support many hundreds of connections, but the main thread pool must be kept small for better performance
-
- ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this), false);
+ // The remoting service maintains it's own thread pool for handling remoting traffic
+ // If OIO each connection will have it's own thread
+ // If NIO these are capped at nio-remoting-threads which defaults to num cores * 3
+ // This needs to be a different thread pool to the main thread pool especially for OIO where we may need
+ // to support many hundreds of connections, but the main thread pool must be kept small for better performance
+
+ ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this),
+ false);
+
threadPool = Executors.newCachedThreadPool(tFactory);
-
+
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (TransportConfiguration info : transportConfigs)
@@ -174,13 +175,15 @@
continue;
}
}
-
- String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.DEFAULT_PROTOCOL, info.getParams());
+ String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+ TransportConstants.DEFAULT_PROTOCOL,
+ info.getParams());
+
ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
-
+
ProtocolManager manager = protocolMap.get(protocol);
-
+
Acceptor acceptor = factory.createAcceptor(info.getParams(),
new DelegatingBufferHandler(manager),
manager,
@@ -193,7 +196,7 @@
if (managementService != null)
{
acceptor.setNotificationService(managementService);
-
+
managementService.registerAcceptor(acceptor, info);
}
}
@@ -208,7 +211,7 @@
a.start();
}
- //This thread checks connections that need to be closed, and also flushes confirmations
+ // This thread checks connections that need to be closed, and also flushes confirmations
failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
failureCheckAndFlushThread.start();
@@ -275,11 +278,11 @@
{
managementService.unregisterAcceptors();
}
-
+
threadPool.shutdown();
-
+
boolean ok = threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
-
+
if (!ok)
{
log.warn("Timed out waiting for remoting thread pool to terminate");
@@ -331,23 +334,23 @@
{
return protocolMap.get(protocol);
}
-
+
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
if (server == null)
{
throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
}
-
+
ProtocolManager pmgr = this.getProtocolManager(protocol);
-
+
if (pmgr == null)
{
throw new IllegalArgumentException("Unknown protocol " + protocol);
}
ConnectionEntry entry = pmgr.createConnectionEntry(connection);
-
+
connections.put(connection.getID(), entry);
if (config.isBackup())
@@ -410,12 +413,12 @@
private final class DelegatingBufferHandler implements BufferHandler
{
private ProtocolManager manager;
-
+
DelegatingBufferHandler(final ProtocolManager manager)
{
this.manager = manager;
}
-
+
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
ConnectionEntry conn = connections.get(connectionID);
@@ -470,9 +473,9 @@
for (ConnectionEntry entry : connections.values())
{
RemotingConnection conn = entry.connection;
-
+
boolean flush = true;
-
+
if (entry.ttl != -1)
{
if (now >= entry.lastCheck + entry.ttl)
@@ -480,7 +483,7 @@
if (!conn.checkDataReceived())
{
idsToRemove.add(conn.getID());
-
+
flush = false;
}
else
@@ -489,9 +492,9 @@
}
}
}
-
+
if (flush)
- {
+ {
conn.flush();
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -787,7 +787,7 @@
0,
1.0d,
0,
- 1,
+ 1,
threadPool,
scheduledPool,
null);
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -24,6 +24,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@@ -132,19 +133,25 @@
private final int nioRemotingThreads;
- private final int batchingBufferSize;
-
private final HttpKeepAliveRunnable httpKeepAliveRunnable;
private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
private final Executor threadPool;
+
+ private final ScheduledExecutorService scheduledThreadPool;
private NotificationService notificationService;
private VirtualExecutorService bossExecutor;
private boolean paused;
+
+ private BatchFlusher flusher;
+
+ private ScheduledFuture<?> batchFlusherFuture;
+
+ private final long batchDelay;
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
@@ -196,10 +203,6 @@
-1,
configuration);
- batchingBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME,
- TransportConstants.DEFAULT_BATCHING_BUFFER_SIZE,
- configuration);
-
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
TransportConstants.DEFAULT_USE_INVM,
configuration);
@@ -248,6 +251,12 @@
configuration);
this.threadPool = threadPool;
+
+ this.scheduledThreadPool = scheduledThreadPool;
+
+ batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
+ TransportConstants.DEFAULT_BATCH_DELAY,
+ configuration);
}
public synchronized void start() throws Exception
@@ -395,6 +404,13 @@
Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
notificationService.sendNotification(notification);
}
+
+ if (batchDelay > 0)
+ {
+ flusher = new BatchFlusher();
+
+ batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+ }
NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID);
}
@@ -424,6 +440,17 @@
{
return;
}
+
+ if (batchFlusherFuture != null)
+ {
+ batchFlusherFuture.cancel(false);
+
+ flusher.cancel();
+
+ flusher = null;
+
+ batchFlusherFuture = null;
+ }
serverChannelGroup.close().awaitUninterruptibly();
@@ -549,7 +576,7 @@
@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
{
- new NettyConnection(e.getChannel(), new Listener(), httpEnabled ? -1 : batchingBufferSize);
+ new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0);
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null)
@@ -610,4 +637,25 @@
}
}
+
+ private class BatchFlusher implements Runnable
+ {
+ private boolean cancelled;
+
+ public synchronized void run()
+ {
+ if (!cancelled)
+ {
+ for (Connection connection : connections.values())
+ {
+ connection.checkFlushBatchBuffer();
+ }
+ }
+ }
+
+ public synchronized void cancel()
+ {
+ cancelled = true;
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -14,6 +14,7 @@
package org.hornetq.integration.transports.netty;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -28,7 +29,7 @@
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * buhnaflagilibrn
+ *
* @version <tt>$Revision$</tt>
*/
public class NettyConnection implements Connection
@@ -37,6 +38,8 @@
private static final Logger log = Logger.getLogger(NettyConnection.class);
+ private static final int BATCHING_BUFFER_SIZE = 8192;
+
// Attributes ----------------------------------------------------
private final Channel channel;
@@ -44,22 +47,25 @@
private boolean closed;
private final ConnectionLifeCycleListener listener;
+
+ private final boolean batchingEnabled;
- private final int batchingBufferSize;
+ private HornetQBuffer batchBuffer;
+
+ private final Object writeLock = new Object();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener,
- final int batchingBufferSize)
+ public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled)
{
this.channel = channel;
this.listener = listener;
+
+ this.batchingEnabled = batchingEnabled;
- this.batchingBufferSize = batchingBufferSize;
-
listener.connectionCreated(this, ProtocolType.CORE);
}
@@ -114,34 +120,90 @@
return channel.getId();
}
+ // This is called periodically to flush the batch buffer
+ public void checkFlushBatchBuffer()
+ {
+ synchronized (writeLock)
+ {
+ if (!batchingEnabled)
+ {
+ return;
+ }
+
+ if (batchBuffer != null && batchBuffer.readable())
+ {
+ channel.write(batchBuffer.channelBuffer());
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
+ }
+ }
+
public void write(final HornetQBuffer buffer)
{
- write(buffer, false);
+ write(buffer, false, false);
}
- public void write(final HornetQBuffer buffer, final boolean flush)
+ public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
{
- ChannelFuture future = channel.write(buffer.channelBuffer());
-
- if (flush)
+ synchronized (writeLock)
{
- while (true)
+ if (batchBuffer == null && batchingEnabled && batched && !flush)
{
- try
+ // Lazily create batch buffer
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
+
+ if (batchBuffer != null)
+ {
+ batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+
+ if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
{
- boolean ok = future.await(10000);
+ // If the batch buffer is full or it's flush param or not batched then flush the buffer
- if (!ok)
- {
- NettyConnection.log.warn("Timed out waiting for packet to be flushed");
- }
+ buffer = batchBuffer;
+ }
+ else
+ {
+ return;
+ }
- break;
+ if (!batched || flush)
+ {
+ batchBuffer = null;
}
- catch (InterruptedException ignore)
+ else
{
+ // Create a new buffer
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
}
}
+
+ ChannelFuture future = channel.write(buffer.channelBuffer());
+
+ if (flush)
+ {
+ while (true)
+ {
+ try
+ {
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ NettyConnection.log.warn("Timed out waiting for packet to be flushed");
+ }
+
+ break;
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
}
}
@@ -149,11 +211,6 @@
{
return channel.getRemoteAddress().toString();
}
-
- public int getBatchingBufferSize()
- {
- return batchingBufferSize;
- }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@@ -129,6 +130,8 @@
private final int tcpReceiveBufferSize;
+ private final long batchDelay;
+
private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
private final String servletPath;
@@ -140,9 +143,11 @@
private final ScheduledExecutorService scheduledThreadPool;
private final Executor closeExecutor;
+
+ private BatchFlusher flusher;
+
+ private ScheduledFuture<?> batchFlusherFuture;
- private final int batchingBufferSize;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -240,15 +245,15 @@
TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
configuration);
+ batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
+ TransportConstants.DEFAULT_BATCH_DELAY,
+ configuration);
+
this.closeExecutor = closeExecutor;
virtualExecutor = new VirtualExecutorService(threadPool);
this.scheduledThreadPool = scheduledThreadPool;
-
- batchingBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME,
- TransportConstants.DEFAULT_BATCHING_BUFFER_SIZE,
- configuration);
}
public synchronized void start()
@@ -363,6 +368,14 @@
return pipeline;
}
});
+
+ if (batchDelay > 0)
+ {
+ flusher = new BatchFlusher();
+
+ batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+ }
+
if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()))
{
NettyConnector.log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion()
@@ -372,13 +385,24 @@
}
NettyConnector.log.debug("Started Netty Connector version " + Version.ID);
}
-
+
public synchronized void close()
{
if (channelFactory == null)
{
return;
}
+
+ if (batchFlusherFuture != null)
+ {
+ batchFlusherFuture.cancel(false);
+
+ flusher.cancel();
+
+ flusher = null;
+
+ batchFlusherFuture = null;
+ }
bootstrap = null;
channelGroup.close().awaitUninterruptibly();
@@ -447,7 +471,7 @@
ch.getPipeline().get(HornetQChannelHandler.class).active = true;
}
- NettyConnection conn = new NettyConnection(ch, new Listener(), batchingBufferSize);
+ NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0);
return conn;
}
@@ -673,5 +697,26 @@
});
}
}
+
+ private class BatchFlusher implements Runnable
+ {
+ private boolean cancelled;
+ public synchronized void run()
+ {
+ if (!cancelled)
+ {
+ for (Connection connection : connections.values())
+ {
+ connection.checkFlushBatchBuffer();
+ }
+ }
+ }
+
+ public synchronized void cancel()
+ {
+ cancelled = true;
+ }
+ }
+
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -70,8 +70,8 @@
public static final String NIO_REMOTING_THREADS_PROPNAME = "nio-remoting-threads";
- public static final String BATCHING_BUFFER_SIZE_PROPNAME = "batching-buffer-size";
-
+ public static final String BATCH_DELAY = "batch-delay";
+
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -105,8 +105,6 @@
public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
- public static final int DEFAULT_BATCHING_BUFFER_SIZE = 8192;
-
public static final boolean DEFAULT_HTTP_ENABLED = false;
public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500;
@@ -120,6 +118,8 @@
public static final boolean DEFAULT_HTTP_REQUIRES_SESSION_ID = false;
public static final String DEFAULT_SERVLET_PATH = "/messaging/HornetQServlet";
+
+ public static final long DEFAULT_BATCH_DELAY = 0;
public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
@@ -145,7 +145,7 @@
allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
- allowableAcceptorKeys.add(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME);
+ allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
@@ -166,7 +166,7 @@
allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
- allowableConnectorKeys.add(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME);
+ allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -568,7 +568,7 @@
{
return sessionFactory.getGroupID();
}
-
+
public void close()
{
sessionFactory.close();
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -409,11 +409,9 @@
UUID uid = UUIDGenerator.getInstance().generateUUID();
- //msg.getCoreMessage().putBytesProperty(HornetQMessage.HORNETQ_MESSAGE_ID, uid.asBytes());
-
msg.getCoreMessage().putStringProperty(HornetQMessage.JMSMESSAGEID_HEADER_NAME, new SimpleString("ID:" + uid.toString()));
- msg.resetMessageID(null);
+ msg.resetMessageID(null);
}
if (foreign)
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -110,7 +110,7 @@
private boolean failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
private String groupID = null;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -540,9 +540,8 @@
public void setDiscoveryGroupName(String groupName)
{
this.discoveryGroupName = groupName;
-
}
-
+
// Encoding Support Implementation --------------------------------------------------------------
/* (non-Javadoc)
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -47,7 +47,7 @@
*
*
*/
-public class JMSServerConfigParserImpl implements JMSServerConfigParser
+public class JMSServerConfigParserImpl implements JMSServerConfigParser
{
private static final Logger log = Logger.getLogger(JMSServerConfigParserImpl.class);
@@ -307,7 +307,7 @@
Validators.GT_ZERO);
String groupid = XMLConfigurationUtil.getString(e, "group-id", null, Validators.NO_CHECK);
List<String> jndiBindings = new ArrayList<String>();
- List<Pair<String, String>> connectorNames = new ArrayList<Pair<String,String>>();
+ List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
String discoveryGroupName = null;
NodeList children = node.getChildNodes();
@@ -347,11 +347,9 @@
{
backupConnectorName = backupNode.getNodeValue();
}
-
-
+
connectorNames.add(new Pair<String, String>(connectorName, backupConnectorName));
-
-
+
}
}
}
@@ -368,8 +366,7 @@
if (discoveryGroupName != null)
{
- cfConfig = new ConnectionFactoryConfigurationImpl(name,
- strbindings);
+ cfConfig = new ConnectionFactoryConfigurationImpl(name, strbindings);
cfConfig.setInitialWaitTimeout(discoveryInitialWaitTimeout);
cfConfig.setDiscoveryGroupName(discoveryGroupName);
}
@@ -378,7 +375,7 @@
cfConfig = new ConnectionFactoryConfigurationImpl(name, strbindings);
cfConfig.setConnectorNames(connectorNames);
}
-
+
cfConfig.setInitialWaitTimeout(discoveryInitialWaitTimeout);
cfConfig.setClientID(clientID);
cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod);
@@ -411,7 +408,6 @@
return cfConfig;
}
-
/**
* hook for integration layers
* @param topicName
@@ -432,9 +428,9 @@
* @return
*/
protected JMSQueueConfiguration newQueue(final String queueName,
- final String selectorString,
- final boolean durable,
- final String[] jndiArray)
+ final String selectorString,
+ final boolean durable,
+ final String[] jndiArray)
{
return new JMSQueueConfigurationImpl(queueName, selectorString, durable, jndiArray);
}
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -129,16 +129,16 @@
public JMSServerManagerImpl(final HornetQServer server) throws Exception
{
this.server = server;
-
+
this.coreConfig = server.getConfiguration();
-
+
configFileName = null;
}
public JMSServerManagerImpl(final HornetQServer server, final String configFileName) throws Exception
{
this.server = server;
-
+
this.coreConfig = server.getConfiguration();
this.configFileName = configFileName;
@@ -147,7 +147,7 @@
public JMSServerManagerImpl(final HornetQServer server, final JMSConfiguration configuration) throws Exception
{
this.server = server;
-
+
this.coreConfig = server.getConfiguration();
configFileName = null;
@@ -187,7 +187,7 @@
if (configFileName != null)
{
- jmsDeployer.setConfigFileNames(new String[]{configFileName});
+ jmsDeployer.setConfigFileNames(new String[] { configFileName });
}
jmsDeployer.start();
@@ -224,16 +224,15 @@
deploymentManager = new FileDeploymentManager(server.getConfiguration().getFileDeployerScanPeriod());
server.registerActivateCallback(this);
-
+
server.start();
-
if (server.getReplicationEndpoint() != null)
{
createJournal();
storage.installReplication(server.getReplicationEndpoint());
}
-
+
started = true;
}
@@ -253,17 +252,17 @@
{
deploymentManager.stop();
}
-
+
// Storage could be null on a shared store backup server before initialization
if (storage != null)
{
storage.stop();
}
-
+
unbindJNDI(queueJNDI);
unbindJNDI(topicJNDI);
-
+
unbindJNDI(connectionFactoryJNDI);
for (String connectionFactory : new HashSet<String>(connectionFactories.keySet()))
@@ -284,10 +283,10 @@
if (jmsManagementService != null)
{
jmsManagementService.unregisterJMSServer();
-
+
jmsManagementService.stop();
}
-
+
server.stop();
started = false;
@@ -339,7 +338,10 @@
return server.getVersion().getFullVersion();
}
- public synchronized boolean createQueue(final String queueName, final String selectorString, final boolean durable, final String... jndi) throws Exception
+ public synchronized boolean createQueue(final String queueName,
+ final String selectorString,
+ final boolean durable,
+ final String... jndi) throws Exception
{
checkInitialised();
@@ -409,7 +411,6 @@
return getJNDIList(connectionFactoryJNDI, factoryName);
}
-
public boolean addQueueToJndi(final String queueName, final String jndiBinding) throws Exception
{
checkInitialised();
@@ -540,14 +541,12 @@
return true;
}
-
public synchronized boolean destroyQueue(final String name) throws Exception
{
checkInitialised();
removeFromJNDI(queueJNDI, name);
-
queues.remove(name);
queueJNDI.remove(name);
@@ -571,8 +570,8 @@
jmsManagementService.unregisterTopic(name);
- AddressControl addressControl = (AddressControl) server.getManagementService()
- .getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
+ AddressControl addressControl = (AddressControl)server.getManagementService()
+ .getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
if (addressControl != null)
{
for (String queueName : addressControl.getQueueNames())
@@ -581,9 +580,9 @@
if (binding == null)
{
log.warn("Queue " + queueName +
- " doesn't exist on the topic " +
- name +
- ". It was deleted manually probably.");
+ " doesn't exist on the topic " +
+ name +
+ ". It was deleted manually probably.");
continue;
}
@@ -734,7 +733,9 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, discoveryAddress, discoveryPort);
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
+ discoveryAddress,
+ discoveryPort);
configuration.setClientID(clientID);
configuration.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
@@ -777,12 +778,13 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, discoveryAddress, discoveryPort);
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
+ discoveryAddress,
+ discoveryPort);
createConnectionFactory(configuration, jndiBindings);
}
}
-
public synchronized void createConnectionFactory(final String name,
final String clientID,
final String discoveryAddress,
@@ -794,13 +796,16 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, discoveryAddress, discoveryPort);
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
+ discoveryAddress,
+ discoveryPort);
configuration.setClientID(clientID);
createConnectionFactory(configuration, jndiBindings);
}
}
- public synchronized void createConnectionFactory(final ConnectionFactoryConfiguration cfConfig, String... jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final ConnectionFactoryConfiguration cfConfig,
+ String... jndiBindings) throws Exception
{
internalCreateCF(cfConfig);
storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
@@ -849,7 +854,7 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+ cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
cf.setClientID(clientID);
cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
@@ -919,7 +924,7 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(connectorConfigs);
+ cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(connectorConfigs);
cf.setClientID(clientID);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
@@ -952,7 +957,6 @@
return cf;
}
-
private String[] getJNDIList(final Map<String, List<String>> map, final String name)
{
List<String> result = map.get(name);
@@ -981,9 +985,9 @@
}
server.getHornetQServerControl().deployQueue(hqQueue.getAddress(),
- hqQueue.getAddress(),
- coreFilterString,
- durable);
+ hqQueue.getAddress(),
+ coreFilterString,
+ durable);
queues.put(queueName, hqQueue);
@@ -1008,9 +1012,9 @@
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
server.getHornetQServerControl().deployQueue(hqTopic.getAddress(),
- hqTopic.getAddress(),
- JMSServerManagerImpl.REJECT_FILTER,
- true);
+ hqTopic.getAddress(),
+ JMSServerManagerImpl.REJECT_FILTER,
+ true);
topics.put(topicName, hqTopic);
@@ -1024,7 +1028,8 @@
* @throws HornetQException
* @throws Exception
*/
- private HornetQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws HornetQException, Exception
+ private HornetQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws HornetQException,
+ Exception
{
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = lookupConnectors(cfConfig);
@@ -1034,71 +1039,71 @@
if (cfConfig.getDiscoveryAddress() != null)
{
cf = internalCreateConnectionFactory(cfConfig.getName(),
- cfConfig.getDiscoveryAddress(),
- cfConfig.getDiscoveryPort(),
- cfConfig.getClientID(),
- cfConfig.getDiscoveryRefreshTimeout(),
- cfConfig.getClientFailureCheckPeriod(),
- cfConfig.getConnectionTTL(),
- cfConfig.getCallTimeout(),
- cfConfig.isCacheLargeMessagesClient(),
- cfConfig.getMinLargeMessageSize(),
- cfConfig.getConsumerWindowSize(),
- cfConfig.getConsumerMaxRate(),
- cfConfig.getConfirmationWindowSize(),
- cfConfig.getProducerWindowSize(),
- cfConfig.getProducerMaxRate(),
- cfConfig.isBlockOnAcknowledge(),
- cfConfig.isBlockOnDurableSend(),
- cfConfig.isBlockOnNonDurableSend(),
- cfConfig.isAutoGroup(),
- cfConfig.isPreAcknowledge(),
- cfConfig.getLoadBalancingPolicyClassName(),
- cfConfig.getTransactionBatchSize(),
- cfConfig.getDupsOKBatchSize(),
- cfConfig.getInitialWaitTimeout(),
- cfConfig.isUseGlobalPools(),
- cfConfig.getScheduledThreadPoolMaxSize(),
- cfConfig.getThreadPoolMaxSize(),
- cfConfig.getRetryInterval(),
- cfConfig.getRetryIntervalMultiplier(),
- cfConfig.getMaxRetryInterval(),
- cfConfig.getReconnectAttempts(),
- cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
+ cfConfig.getDiscoveryAddress(),
+ cfConfig.getDiscoveryPort(),
+ cfConfig.getClientID(),
+ cfConfig.getDiscoveryRefreshTimeout(),
+ cfConfig.getClientFailureCheckPeriod(),
+ cfConfig.getConnectionTTL(),
+ cfConfig.getCallTimeout(),
+ cfConfig.isCacheLargeMessagesClient(),
+ cfConfig.getMinLargeMessageSize(),
+ cfConfig.getConsumerWindowSize(),
+ cfConfig.getConsumerMaxRate(),
+ cfConfig.getConfirmationWindowSize(),
+ cfConfig.getProducerWindowSize(),
+ cfConfig.getProducerMaxRate(),
+ cfConfig.isBlockOnAcknowledge(),
+ cfConfig.isBlockOnDurableSend(),
+ cfConfig.isBlockOnNonDurableSend(),
+ cfConfig.isAutoGroup(),
+ cfConfig.isPreAcknowledge(),
+ cfConfig.getLoadBalancingPolicyClassName(),
+ cfConfig.getTransactionBatchSize(),
+ cfConfig.getDupsOKBatchSize(),
+ cfConfig.getInitialWaitTimeout(),
+ cfConfig.isUseGlobalPools(),
+ cfConfig.getScheduledThreadPoolMaxSize(),
+ cfConfig.getThreadPoolMaxSize(),
+ cfConfig.getRetryInterval(),
+ cfConfig.getRetryIntervalMultiplier(),
+ cfConfig.getMaxRetryInterval(),
+ cfConfig.getReconnectAttempts(),
+ cfConfig.isFailoverOnServerShutdown(),
+ cfConfig.getGroupID());
}
else
{
cf = internalCreateConnectionFactory(cfConfig.getName(),
- connectorConfigs,
- cfConfig.getClientID(),
- cfConfig.getClientFailureCheckPeriod(),
- cfConfig.getConnectionTTL(),
- cfConfig.getCallTimeout(),
- cfConfig.isCacheLargeMessagesClient(),
- cfConfig.getMinLargeMessageSize(),
- cfConfig.getConsumerWindowSize(),
- cfConfig.getConsumerMaxRate(),
- cfConfig.getConfirmationWindowSize(),
- cfConfig.getProducerWindowSize(),
- cfConfig.getProducerMaxRate(),
- cfConfig.isBlockOnAcknowledge(),
- cfConfig.isBlockOnDurableSend(),
- cfConfig.isBlockOnNonDurableSend(),
- cfConfig.isAutoGroup(),
- cfConfig.isPreAcknowledge(),
- cfConfig.getLoadBalancingPolicyClassName(),
- cfConfig.getTransactionBatchSize(),
- cfConfig.getDupsOKBatchSize(),
- cfConfig.isUseGlobalPools(),
- cfConfig.getScheduledThreadPoolMaxSize(),
- cfConfig.getThreadPoolMaxSize(),
- cfConfig.getRetryInterval(),
- cfConfig.getRetryIntervalMultiplier(),
- cfConfig.getMaxRetryInterval(),
- cfConfig.getReconnectAttempts(),
- cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
+ connectorConfigs,
+ cfConfig.getClientID(),
+ cfConfig.getClientFailureCheckPeriod(),
+ cfConfig.getConnectionTTL(),
+ cfConfig.getCallTimeout(),
+ cfConfig.isCacheLargeMessagesClient(),
+ cfConfig.getMinLargeMessageSize(),
+ cfConfig.getConsumerWindowSize(),
+ cfConfig.getConsumerMaxRate(),
+ cfConfig.getConfirmationWindowSize(),
+ cfConfig.getProducerWindowSize(),
+ cfConfig.getProducerMaxRate(),
+ cfConfig.isBlockOnAcknowledge(),
+ cfConfig.isBlockOnDurableSend(),
+ cfConfig.isBlockOnNonDurableSend(),
+ cfConfig.isAutoGroup(),
+ cfConfig.isPreAcknowledge(),
+ cfConfig.getLoadBalancingPolicyClassName(),
+ cfConfig.getTransactionBatchSize(),
+ cfConfig.getDupsOKBatchSize(),
+ cfConfig.isUseGlobalPools(),
+ cfConfig.getScheduledThreadPoolMaxSize(),
+ cfConfig.getThreadPoolMaxSize(),
+ cfConfig.getRetryInterval(),
+ cfConfig.getRetryIntervalMultiplier(),
+ cfConfig.getMaxRetryInterval(),
+ cfConfig.getReconnectAttempts(),
+ cfConfig.isFailoverOnServerShutdown(),
+ cfConfig.getGroupID());
}
connectionFactories.put(cfConfig.getName(), cf);
@@ -1308,16 +1313,16 @@
DiscoveryGroupConfiguration discoveryGroupConfiguration = null;
discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
- .get(cfConfig.getDiscoveryGroupName());
+ .get(cfConfig.getDiscoveryGroupName());
if (discoveryGroupConfiguration == null)
{
JMSServerManagerImpl.log.warn("There is no discovery group with name '" + cfConfig.getDiscoveryGroupName() +
- "' deployed.");
+ "' deployed.");
throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "There is no discovery group with name '" + cfConfig.getDiscoveryGroupName() +
- "' deployed.");
+ "There is no discovery group with name '" + cfConfig.getDiscoveryGroupName() +
+ "' deployed.");
}
cfConfig.setDiscoveryAddress(discoveryGroupConfiguration.getGroupAddress());
@@ -1395,10 +1400,9 @@
private void initJournal() throws Exception
{
this.coreConfig = server.getConfiguration();
-
createJournal();
-
+
storage.load();
List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
@@ -1414,9 +1418,7 @@
{
if (destination.getType() == PersistedType.Queue)
{
- internalCreateQueue(destination.getName(),
- destination.getSelector(),
- destination.isDurable());
+ internalCreateQueue(destination.getName(), destination.getSelector(), destination.isDurable());
}
else if (destination.getType() == PersistedType.Topic)
{
@@ -1461,7 +1463,6 @@
mapJNDI.put(record.getName(), jndiList);
}
-
for (String jndi : record.getJndi())
{
jndiList.add(jndi);
@@ -1479,7 +1480,9 @@
{
if (coreConfig.isPersistenceEnabled())
{
- storage = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager());
+ storage = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(),
+ server.getConfiguration(),
+ server.getReplicationManager());
}
else
{
@@ -1518,8 +1521,9 @@
return true;
}
-
- private synchronized boolean removeFromJNDI(final Map<String, List<String>> jndiMap, final String name, final String jndi) throws Exception
+ private synchronized boolean removeFromJNDI(final Map<String, List<String>> jndiMap,
+ final String name,
+ final String jndi) throws Exception
{
checkInitialised();
List<String> jndiBindings = jndiMap.get(name);
@@ -1564,7 +1568,7 @@
{
JMSServerManagerImpl.log.warn("There is no connector with name '" + connectorName + "' deployed.");
throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "There is no connector with name '" + connectorName + "' deployed.");
+ "There is no connector with name '" + connectorName + "' deployed.");
}
TransportConfiguration backupConnector = null;
@@ -1576,10 +1580,10 @@
if (backupConnector == null)
{
JMSServerManagerImpl.log.warn("There is no backup connector with name '" + backupConnectorName +
- "' deployed.");
+ "' deployed.");
throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "There is no backup connector with name '" + backupConnectorName +
- "' deployed.");
+ "There is no backup connector with name '" + backupConnectorName +
+ "' deployed.");
}
}
Modified: trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/Connection.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/spi/core/remoting/Connection.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -14,7 +14,6 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
/**
* The connection used by a channel to write data to.
@@ -40,19 +39,20 @@
Object getID();
/**
- * writes the buffer to the wire.
+ * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
*
* @param buffer the buffer to write
+ * @param flush whether to flush the buffers onto the wire
+ * @param batched whether the packet is allowed to batched for better performance
*/
- void write(HornetQBuffer buffer);
-
+ void write(HornetQBuffer buffer, boolean flush, boolean batched);
+
/**
- * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
+ * writes the buffer to the connection with no flushing or batching
*
* @param buffer the buffer to write
- * @param flush whether to flush the buffers onto the wire
*/
- void write(HornetQBuffer buffer, boolean flush);
+ void write(HornetQBuffer buffer);
/**
* closes this connection.
@@ -67,10 +67,7 @@
String getRemoteAddress();
/**
- * The batch size in bytes of the buffer for batching sends
- * or -1 if the connection does not support batching
- *
- * @return
+ * Called periodically to flush any data in the batch buffer
*/
- int getBatchingBufferSize();
+ void checkFlushBatchBuffer();
}
\ No newline at end of file
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2010-03-29 13:35:27 UTC (rev 8998)
@@ -48,7 +48,7 @@
<large-messages-directory>largemessagesdir</large-messages-directory>
<memory-warning-threshold>95</memory-warning-threshold>
<memory-measure-interval>54321</memory-measure-interval>
-
+
<remoting-interceptors>
<class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor2</class-name>
Modified: trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -68,7 +68,6 @@
server.start();
ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(connectorFactoryClassName));
- // sf.setConsumerWindowSize(0);
ClientSession session = sf.createSession(false, true, true);
@@ -109,8 +108,6 @@
{
ClientMessage message2 = consumer.receive();
- // log.info("got message " + i);
-
HornetQBuffer buffer = message2.getBodyBuffer();
Assert.assertEquals("testINVMCoreClient", buffer.readString());
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -90,7 +90,7 @@
{
testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
}
-
+
public void testFlowControlSingleConsumerSlowConsumer() throws Exception
{
testFlowControl(100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
@@ -331,6 +331,8 @@
ProducerFlowControlTest.log.info("rate is " + rate + " msgs / sec");
session.close();
+
+ sf.close();
server.stop();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -62,7 +62,9 @@
message.getBodyBuffer().writeString(body);
+ // log.info("sending message");
producer.send(message);
+ // log.info("sent message");
ClientMessage received1 = consumer1.receive(1000);
Assert.assertNotNull(received1);
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -93,7 +93,7 @@
}
@Override
- public void write(final HornetQBuffer buffer, final boolean flush)
+ public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch)
{
InVMConnector.log.info("calling mock connection write");
if (callback != null)
@@ -101,7 +101,7 @@
callback.onWrite(buffer);
}
- super.write(buffer, flush);
+ super.write(buffer, flush, batch);
}
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ *
+ * A BatchDelayTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class BatchDelayTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(BatchDelayTest.class);
+
+ private static final long DELAY = 500;
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.BATCH_DELAY, DELAY);
+
+ TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+
+ Configuration config = new ConfigurationImpl();
+ config.getAcceptorConfigurations().add(tc);
+
+ config.setSecurityEnabled(false);
+ server = createServer(false, config);
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ protected ClientSessionFactory createSessionFactory()
+ {
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.BATCH_DELAY, DELAY);
+
+ ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName(),
+ params));
+
+ return sf;
+ }
+
+ public void testSendReceiveMany() throws Exception
+ {
+ ClientSessionFactory sf = createSessionFactory();
+
+ ClientSession session = sf.createSession();
+
+ final String foo = "foo";
+
+ session.createQueue(foo, foo);
+
+ ClientProducer prod = session.createProducer(foo);
+
+ ClientConsumer cons = session.createConsumer(foo);
+
+ session.start();
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(false);
+
+ prod.send(msg);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = cons.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ }
+
+ sf.close();
+ }
+
+ public void testSendReceiveOne() throws Exception
+ {
+ ClientSessionFactory sf = createSessionFactory();
+
+ ClientSession session = sf.createSession();
+
+ final String foo = "foo";
+
+ session.createQueue(foo, foo);
+
+ ClientProducer prod = session.createProducer(foo);
+
+ ClientConsumer cons = session.createConsumer(foo);
+
+ session.start();
+
+ ClientMessage msg = session.createMessage(false);
+
+ long start = System.currentTimeMillis();
+
+ prod.send(msg);
+
+ msg = cons.receive(10000);
+
+ assertNotNull(msg);
+
+ long end = System.currentTimeMillis();
+
+ //This will be delayed
+
+ assertTrue(end - start > DELAY);
+
+ msg.acknowledge();
+
+ sf.close();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -19,7 +19,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -71,7 +71,6 @@
Assert.assertEquals(false, conf.isCreateBindingsDir());
Assert.assertEquals("somedir2", conf.getJournalDirectory());
Assert.assertEquals(false, conf.isCreateJournalDir());
-
Assert.assertEquals(JournalType.NIO, conf.getJournalType());
Assert.assertEquals(10000, conf.getJournalBufferSize_NIO());
Assert.assertEquals(1000, conf.getJournalBufferTimeout_NIO());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-03-29 13:35:27 UTC (rev 8998)
@@ -47,7 +47,7 @@
public void testGetID() throws Exception
{
Channel channel = new SimpleChannel(RandomUtil.randomInt());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), -1);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
Assert.assertEquals(channel.getId().intValue(), conn.getID());
}
@@ -59,7 +59,7 @@
Assert.assertEquals(0, channel.getWritten().size());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), -1);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
conn.write(buff);
Assert.assertEquals(1, channel.getWritten().size());
@@ -68,7 +68,7 @@
public void testCreateBuffer() throws Exception
{
Channel channel = new SimpleChannel(RandomUtil.randomInt());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), -1);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
final int size = 1234;
More information about the hornetq-commits
mailing list