JBoss hornetq SVN: r10000 - trunk/tests/src/org/hornetq/tests/integration/cluster/topology.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-06 05:57:42 -0500 (Mon, 06 Dec 2010)
New Revision: 10000
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
fixed topology test
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-12-06 10:41:00 UTC (rev 9999)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-12-06 10:57:42 UTC (rev 10000)
@@ -414,7 +414,7 @@
final List<String> nodes = new ArrayList<String>();
final CountDownLatch upLatch = new CountDownLatch(5);
- final CountDownLatch downLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
@@ -454,11 +454,16 @@
waitForClusterConnections(2, 4);
waitForClusterConnections(3, 4);
waitForClusterConnections(4, 4);
+ //we cant close all of the servers, we need to leave one up to notify us
+ stopServers(4, 2, 3, 1);
- stopServers(0, 4, 2, 3, 1);
-
- assertTrue("Was not notified that all servers are Down", upLatch.await(10, SECONDS));
- checkContains(new int[] { }, nodeIDs, nodes);
+ boolean ok = downLatch.await(10, SECONDS);
+ if(!ok)
+ {
+ System.out.println("TopologyClusterTestBase.testMultipleClientSessionFactories");
+ }
+ assertTrue("Was not notified that all servers are Down", ok);
+ checkContains(new int[] { 0 }, nodeIDs, nodes);
for (int i = 0; i < sfs.length; i++)
{
@@ -467,6 +472,8 @@
}
locator.close();
+
+ stopServers(0);
}
// Private -------------------------------------------------------
14 years, 3 months
JBoss hornetq SVN: r9999 - trunk/tests/src/org/hornetq/tests/integration/xa.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-06 05:41:00 -0500 (Mon, 06 Dec 2010)
New Revision: 9999
Modified:
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
Log:
fixed xa test
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-12-06 09:37:04 UTC (rev 9998)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-12-06 10:41:00 UTC (rev 9999)
@@ -95,6 +95,28 @@
//
}
}
+ if(sessionFactory != null)
+ {
+ try
+ {
+ sessionFactory.close();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ }
+ if(locator != null)
+ {
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ }
if (messagingService != null && messagingService.isStarted())
{
try
@@ -390,7 +412,7 @@
int numSessions = 100;
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
- for (int i = 0; i < 10 * numSessions; i++)
+ for (int i = 0; i < numSessions; i++)
{
clientProducer.send(createTextMessage(clientSession2, "m" + i));
}
@@ -410,7 +432,8 @@
session.start();
}
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ boolean ok = latch.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(ok);
for (TxMessageHandler messageHandler : handlers)
{
Assert.assertFalse(messageHandler.failedToAck);
@@ -419,6 +442,7 @@
clientSession2.close();
for (ClientSession session : clientSessions)
{
+ session.stop();
session.close();
}
@@ -962,6 +986,8 @@
}
}
+ private static volatile int received = 0;
+
class TxMessageHandler implements MessageHandler
{
boolean failedToAck = false;
@@ -970,6 +996,7 @@
private final CountDownLatch latch;
+
public TxMessageHandler(final ClientSession session, final CountDownLatch latch)
{
this.latch = latch;
@@ -993,6 +1020,7 @@
try
{
message.acknowledge();
+ BasicXaTest.log.info("processed message " + (received++));
}
catch (HornetQException e)
{
14 years, 3 months
JBoss hornetq SVN: r9998 - projects/jopr-plugin/tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-06 04:37:04 -0500 (Mon, 06 Dec 2010)
New Revision: 9998
Added:
projects/jopr-plugin/tags/HornetQ-jopr-plugin_2_0_0_CR1/
Log:
HornetQ-jopr-plugin_2_0_0_CR1 branch
Copied: projects/jopr-plugin/tags/HornetQ-jopr-plugin_2_0_0_CR1 (from rev 9997, projects/jopr-plugin/trunk)
14 years, 3 months
JBoss hornetq SVN: r9997 - in trunk/src/main/org/hornetq: jms/client and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-06 04:24:43 -0500 (Mon, 06 Dec 2010)
New Revision: 9997
Modified:
trunk/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java
trunk/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
Log:
added method to obtain the factory type
Modified: trunk/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -49,6 +49,12 @@
boolean isHA();
/**
+ * return the type of factory
+ * @return 0 = jms cf, 1 = queue cf, 2 = topic cf, 3 = xa cf, 4 = xa queue cf, 5 = xa topic cf
+ */
+ int getFactoryType();
+
+ /**
* Returns the Client ID of this connection factory (or {@code null} if it is not set.
*/
String getClientID();
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -34,6 +34,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
import org.hornetq.jms.referenceable.SerializableObjectRefAddr;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* HornetQ implementation of a JMS ConnectionFactory.
@@ -541,7 +542,11 @@
{
return serverLocator;
}
-
+
+ public int getFactoryType()
+ {
+ return JMSFactoryType.CF.intValue();
+ }
/**
*
* @deprecated use {@link ServerLocator#createSessionFactory()}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A class that represents a QueueConnectionFactory.
@@ -64,4 +65,8 @@
super(ha, initialConnectors);
}
+ public int getFactoryType()
+ {
+ return JMSFactoryType.QUEUE_CF.intValue();
+ }
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A class that represents a TopicConnectionFactory.
@@ -65,4 +66,8 @@
super(ha, initialConnectors);
}
+ public int getFactoryType()
+ {
+ return JMSFactoryType.TOPIC_CF.intValue();
+ }
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A class that represents a XAConnectionFactory.
@@ -70,4 +71,9 @@
super(ha, initialConnectors);
}
+ public int getFactoryType()
+ {
+ return JMSFactoryType.XA_CF.intValue();
+ }
+
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A class that represents a XAQueueConnectionFactory.
@@ -64,4 +65,9 @@
super(ha, initialConnectors);
}
+ public int getFactoryType()
+ {
+ return JMSFactoryType.QUEUE_XA_CF.intValue();
+ }
+
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A class that represents a XATopicConnectionFactory.
@@ -64,4 +65,8 @@
super(ha, initialConnectors);
}
+ public int getFactoryType()
+ {
+ return JMSFactoryType.TOPIC_XA_CF.intValue();
+ }
}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2010-12-06 09:14:41 UTC (rev 9996)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2010-12-06 09:24:43 UTC (rev 9997)
@@ -71,6 +71,11 @@
return cf.isHA();
}
+ public int getFactoryType()
+ {
+ return cf.getFactoryType();
+ }
+
public String getClientID()
{
return cf.getClientID();
14 years, 3 months
JBoss hornetq SVN: r9996 - projects/jopr-plugin/trunk/src/resources/META-INF.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-06 04:14:41 -0500 (Mon, 06 Dec 2010)
New Revision: 9996
Modified:
projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml
Log:
fix required attr
Modified: projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml
===================================================================
--- projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml 2010-12-06 09:12:16 UTC (rev 9995)
+++ projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml 2010-12-06 09:14:41 UTC (rev 9996)
@@ -631,7 +631,7 @@
</c:simple-property>
<c:simple-property name="connectorNames"
default="netty"
- readOnly="true" required="false"
+ readOnly="true" required="true"
displayName="Connectors to live servers">
<c:description>comma-separated list of either connector names or a discovery group name</c:description>
</c:simple-property>
14 years, 3 months
JBoss hornetq SVN: r9995 - in projects/jopr-plugin/trunk: src/main/java/org/jboss/as/integration/hornetq/jopr and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-06 04:12:16 -0500 (Mon, 06 Dec 2010)
New Revision: 9995
Modified:
projects/jopr-plugin/trunk/pom.xml
projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSConnectionFactoryComponent.java
projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSManagerComponent.java
projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml
Log:
changes for latest hornetq management API
Modified: projects/jopr-plugin/trunk/pom.xml
===================================================================
--- projects/jopr-plugin/trunk/pom.xml 2010-12-05 10:14:53 UTC (rev 9994)
+++ projects/jopr-plugin/trunk/pom.xml 2010-12-06 09:12:16 UTC (rev 9995)
@@ -5,7 +5,7 @@
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jopr-plugin</artifactId>
<packaging>jar</packaging>
- <version>1.0.0.Final</version>
+ <version>2.0.0.CR1</version>
<name>JBoss Application Server JOPR plugin</name>
<url>http://hornetq.org</url>
<description>HornetQ JOPR plugin</description>
Modified: projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSConnectionFactoryComponent.java
===================================================================
--- projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSConnectionFactoryComponent.java 2010-12-05 10:14:53 UTC (rev 9994)
+++ projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSConnectionFactoryComponent.java 2010-12-06 09:12:16 UTC (rev 9995)
@@ -86,7 +86,6 @@
{
Map<String, PropertySimple> simpleProps = configurationUpdateReport.getConfiguration().getSimpleProperties();
String name = simpleProps.get("name").getStringValue();
- long discoveryRefreshTimeout = simpleProps.get("DiscoveryRefreshTimeout").getLongValue();
String clientId = simpleProps.get("ClientID").getStringValue();
int dupsOkBatchSize = simpleProps.get("DupsOKBatchSize").getIntegerValue();
int transactionBatchSize = simpleProps.get("TransactionBatchSize").getIntegerValue();
@@ -107,7 +106,6 @@
long maxRetryInterval = simpleProps.get("MaxRetryInterval").getLongValue();
double retryIntervalMultiplier = simpleProps.get("RetryIntervalMultiplier").getDoubleValue();
int reconnectAttempts = simpleProps.get("ReconnectAttempts").getIntegerValue();
- boolean failoverOnShutdown = simpleProps.get("FailoverOnServerShutdown").getBooleanValue();
int scheduledThreadPoolMaxSize = simpleProps.get("ScheduledThreadPoolMaxSize").getIntegerValue();
int threadPoolMaxSize = simpleProps.get("ThreadPoolMaxSize").getIntegerValue();
String groupId = simpleProps.get("GroupID").getStringValue();
@@ -120,7 +118,7 @@
{
ManagementView managementView = getProfileService();
- updateConnectionFactory(configurationUpdateReport, managementView, name, discoveryRefreshTimeout, clientId, dupsOkBatchSize, transactionBatchSize, clientFailureCheckPeriod, connectionTTL, callTimeout, consumerWindowSize, confirmationWindowSize, producerMaxRate, producerWindowSize, cacheLargeMessageClient, minLargeMessageSize, blockOnNonDurableSend, blockOnAcknowledge, blockOnDurableSend, autoGroup, preAcknowledge, maxRetryInterval, retryIntervalMultiplier, reconnectAttempts, failoverOnShutdown, scheduledThreadPoolMaxSize, threadPoolMaxSize, groupId, initialMessagePacketSize, useGlobalPools, retryInterval, connectionLoadBalancingPolicyClassName);
+ updateConnectionFactory(configurationUpdateReport, managementView, name, clientId, dupsOkBatchSize, transactionBatchSize, clientFailureCheckPeriod, connectionTTL, callTimeout, consumerWindowSize, confirmationWindowSize, producerMaxRate, producerWindowSize, cacheLargeMessageClient, minLargeMessageSize, blockOnNonDurableSend, blockOnAcknowledge, blockOnDurableSend, autoGroup, preAcknowledge, maxRetryInterval, retryIntervalMultiplier, reconnectAttempts, scheduledThreadPoolMaxSize, threadPoolMaxSize, groupId, initialMessagePacketSize, useGlobalPools, retryInterval, connectionLoadBalancingPolicyClassName);
}
catch (Exception e)
{
@@ -134,7 +132,6 @@
private void updateConnectionFactory(ConfigurationUpdateReport configurationUpdateReport,
ManagementView managementView,
String name,
- long discoveryRefreshTimeout,
String clientId,
int dupsOkBatchSize,
int transactionBatchSize,
@@ -155,7 +152,6 @@
long maxRetryInterval,
double retryIntervalMultiplier,
int reconnectAttempts,
- boolean failoverOnShutdown,
int scheduledThreadPoolMaxSize,
int threadPoolMaxSize,
String groupId,
@@ -166,7 +162,6 @@
{
ManagedOperation operation = ManagementSupport.getOperation(managementView, COMPONENT_NAME, "updateConnectionFactory", new ComponentType("JMSManage", "ConnectionFactoryManage"));
operation.invoke(new SimpleValueSupport(SimpleMetaType.STRING, name),
- new SimpleValueSupport(SimpleMetaType.LONG_PRIMITIVE, discoveryRefreshTimeout),
new SimpleValueSupport(SimpleMetaType.STRING, clientId),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, dupsOkBatchSize),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, transactionBatchSize),
@@ -187,7 +182,6 @@
new SimpleValueSupport(SimpleMetaType.LONG_PRIMITIVE, maxRetryInterval),
new SimpleValueSupport(SimpleMetaType.DOUBLE_PRIMITIVE, retryIntervalMultiplier),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, reconnectAttempts),
- new SimpleValueSupport(SimpleMetaType.BOOLEAN_PRIMITIVE, failoverOnShutdown),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, scheduledThreadPoolMaxSize),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, threadPoolMaxSize),
new SimpleValueSupport(SimpleMetaType.STRING, groupId),
Modified: projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSManagerComponent.java
===================================================================
--- projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSManagerComponent.java 2010-12-05 10:14:53 UTC (rev 9994)
+++ projects/jopr-plugin/trunk/src/main/java/org/jboss/as/integration/hornetq/jopr/JMSManagerComponent.java 2010-12-06 09:12:16 UTC (rev 9995)
@@ -56,11 +56,11 @@
for (MeasurementScheduleRequest measurementScheduleRequest : measurementScheduleRequests)
{
- if("provider".equalsIgnoreCase(measurementScheduleRequest.getName()))
+ if ("provider".equalsIgnoreCase(measurementScheduleRequest.getName()))
{
measurementReport.addData(new MeasurementDataTrait(measurementScheduleRequest, "HornetQ"));
}
- else if("started".equalsIgnoreCase(measurementScheduleRequest.getName()))
+ else if ("started".equalsIgnoreCase(measurementScheduleRequest.getName()))
{
ManagementView managementView = getProfileService();
ManagedOperation operation = ManagementSupport.getOperation(managementView, "JMSServerMO", "isStarted", new ComponentType("JMSManage", "ServerManage"));
@@ -95,14 +95,11 @@
{
String name = simpleProps.get("name").getStringValue();
- String liveTransportClassNames = simpleProps.get("liveTransportClassNames").getStringValue();
- String liveTransportParams = simpleProps.get("liveTransportParams").getStringValue();
- String backupTransportClassNames = simpleProps.get("backupTransportClassNames").getStringValue();
- String backupTransportParams = simpleProps.get("backupTransportParams").getStringValue();
+ String connectorNames = simpleProps.get("connectorNames").getStringValue();
+ boolean ha = simpleProps.get("ha").getBooleanValue();
+ boolean useDiscovery = simpleProps.get("useDiscovery").getBooleanValue();
+ int cfType = simpleProps.get("cfType").getIntegerValue();
String bindings = simpleProps.get("Bindings").getStringValue();
- String discoveryAddress = simpleProps.get("DiscoveryAddress").getStringValue();
- int discoveryPort = simpleProps.get("DiscoveryPort").getIntegerValue();
- long discoveryRefreshTimeout = simpleProps.get("DiscoveryRefreshTimeout").getLongValue();
String clientId = simpleProps.get("ClientID").getStringValue();
int dupsOkBatchSize = simpleProps.get("DupsOKBatchSize").getIntegerValue();
int transactionBatchSize = simpleProps.get("TransactionBatchSize").getIntegerValue();
@@ -123,7 +120,6 @@
long maxRetryInterval = simpleProps.get("MaxRetryInterval").getLongValue();
double retryIntervalMultiplier = simpleProps.get("RetryIntervalMultiplier").getDoubleValue();
int reconnectAttempts = simpleProps.get("ReconnectAttempts").getIntegerValue();
- boolean failoverOnShutdown = simpleProps.get("FailoverOnServerShutdown").getBooleanValue();
int scheduledThreadPoolMaxSize = simpleProps.get("ScheduledThreadPoolMaxSize").getIntegerValue();
int threadPoolMaxSize = simpleProps.get("ThreadPoolMaxSize").getIntegerValue();
String groupId = simpleProps.get("GroupID").getStringValue();
@@ -131,7 +127,8 @@
boolean useGlobalPools = simpleProps.get("UseGlobalPools").getBooleanValue();
long retryInterval = simpleProps.get("RetryInterval").getLongValue();
String connectionLoadBalancingPolicyClassName = simpleProps.get("ConnectionLoadBalancingPolicyClassName").getStringValue();
- createConnectionFactory(createResourceReport, managementView, name,liveTransportClassNames, liveTransportParams, backupTransportClassNames, backupTransportParams, bindings, discoveryAddress, discoveryPort, discoveryRefreshTimeout, clientId, dupsOkBatchSize, transactionBatchSize, clientFailureCheckPeriod, connectionTTL, callTimeout, consumerWindowSize, confirmationWindowSize, producerMaxRate, producerWindowSize, cacheLargeMessageClient, minLargeMessageSize, blockOnNonDurableSend, blockOnAcknowledge, blockOnDurableSend, autoGroup, preAcknowledge, maxRetryInterval, retryIntervalMultiplier, reconnectAttempts, failoverOnShutdown, scheduledThreadPoolMaxSize, threadPoolMaxSize, groupId, initialMessagePacketSize, useGlobalPools, retryInterval, connectionLoadBalancingPolicyClassName);
+ createConnectionFactory(createResourceReport, managementView, name, connectorNames, ha, useDiscovery, cfType, bindings,
+ clientId, dupsOkBatchSize, transactionBatchSize, clientFailureCheckPeriod, connectionTTL, callTimeout, consumerWindowSize, confirmationWindowSize, producerMaxRate, producerWindowSize, cacheLargeMessageClient, minLargeMessageSize, blockOnNonDurableSend, blockOnAcknowledge, blockOnDurableSend, autoGroup, preAcknowledge, maxRetryInterval, retryIntervalMultiplier, reconnectAttempts, scheduledThreadPoolMaxSize, threadPoolMaxSize, groupId, initialMessagePacketSize, useGlobalPools, retryInterval, connectionLoadBalancingPolicyClassName);
}
else
{
@@ -181,19 +178,50 @@
this.resourceContext = null;
}
- private void createConnectionFactory(CreateResourceReport createResourceReport, ManagementView managementView, String name,String liveTransportClassNames, String liveTransportParams, String backupTransportClassNames, String backupTransportParams, String bindings, String discoveryAddress, int discoveryPort, long discoveryRefreshTimeout, String clientId, int dupsOkBatchSize, int transactionBatchSize, long clientFailureCheckPeriod, long connectionTTL, long callTimeout, int consumerWindowSize, int confirmationWindowSize, int producerMaxRate, int producerWindowSize, boolean cacheLargeMessageClient, int minLargeMessageSize, boolean blockOnNonDurableSend, boolean blockOnAcknowledge, boolean blockOnDurableSend, boolean autoGroup, boolean preAcknowledge, long maxRetryInterval, double retryIntervalMultiplier, int reconnectAttempts, boolean failoverOnShutdown, int scheduledThreadPoolMaxSize, int threadPoolMaxSize, String groupId, int initialMessagePacketSize, boolean useGlobalPool!
s, long retryInterval, String connectionLoadBalancingPolicyClassName)
+ private void createConnectionFactory(CreateResourceReport createResourceReport,
+ ManagementView managementView,
+ String name,
+ String connectorNames,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String bindings,
+ String clientId,
+ int dupsOkBatchSize,
+ int transactionBatchSize,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int consumerWindowSize,
+ int confirmationWindowSize,
+ int producerMaxRate,
+ int producerWindowSize,
+ boolean cacheLargeMessageClient,
+ int minLargeMessageSize,
+ boolean blockOnNonDurableSend,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ long maxRetryInterval,
+ double retryIntervalMultiplier,
+ int reconnectAttempts,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ String groupId,
+ int initialMessagePacketSize,
+ boolean useGlobalPools,
+ long retryInterval,
+ String connectionLoadBalancingPolicyClassName)
throws Exception
{
ManagedOperation operation = ManagementSupport.getOperation(managementView, JMSConstants.ConnectionFactory.COMPONENT_NAME, "createConnectionFactory", JMSConstants.ConnectionFactory.COMPONENT_TYPE);
operation.invoke(new SimpleValueSupport(SimpleMetaType.STRING, name),
- new SimpleValueSupport(SimpleMetaType.STRING, liveTransportClassNames),
- new SimpleValueSupport(SimpleMetaType.STRING, liveTransportParams),
- new SimpleValueSupport(SimpleMetaType.STRING, backupTransportClassNames),
- new SimpleValueSupport(SimpleMetaType.STRING, backupTransportParams),
+ new SimpleValueSupport(SimpleMetaType.BOOLEAN_PRIMITIVE, ha),
+ new SimpleValueSupport(SimpleMetaType.BOOLEAN_PRIMITIVE, useDiscovery),
+ new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, cfType),
+ new SimpleValueSupport(SimpleMetaType.STRING, connectorNames),
new SimpleValueSupport(SimpleMetaType.STRING, bindings),
- new SimpleValueSupport(SimpleMetaType.STRING, discoveryAddress),
- new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, discoveryPort),
- new SimpleValueSupport(SimpleMetaType.LONG_PRIMITIVE, discoveryRefreshTimeout),
new SimpleValueSupport(SimpleMetaType.STRING, clientId),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, dupsOkBatchSize),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, transactionBatchSize),
@@ -214,7 +242,6 @@
new SimpleValueSupport(SimpleMetaType.LONG_PRIMITIVE, maxRetryInterval),
new SimpleValueSupport(SimpleMetaType.DOUBLE_PRIMITIVE, retryIntervalMultiplier),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, reconnectAttempts),
- new SimpleValueSupport(SimpleMetaType.BOOLEAN_PRIMITIVE, failoverOnShutdown),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, scheduledThreadPoolMaxSize),
new SimpleValueSupport(SimpleMetaType.INTEGER_PRIMITIVE, threadPoolMaxSize),
new SimpleValueSupport(SimpleMetaType.STRING, groupId),
@@ -322,7 +349,7 @@
@Override
protected String getInvokeOperation()
{
- return "invokeManagerOperation";
+ return "invokeManagerOperation";
}
@Override
@@ -357,7 +384,7 @@
public void updateResourceConfiguration(ConfigurationUpdateReport configurationUpdateReport)
{
-
+
}
private void createRoles(CreateResourceReport configurationUpdateReport, String name, StringBuffer sendRoles, StringBuffer consumeRoles)
Modified: projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml
===================================================================
--- projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml 2010-12-05 10:14:53 UTC (rev 9994)
+++ projects/jopr-plugin/trunk/src/resources/META-INF/rhq-plugin.xml 2010-12-06 09:12:16 UTC (rev 9995)
@@ -606,53 +606,38 @@
<c:simple-property name="name" displayName="Connection Factory Name" required="true" readOnly="true">
<c:description>Name of the connection factory to deploy</c:description>
</c:simple-property>
- <c:simple-property name="Bindings" displayName="JNDI Bindings" required="true" readOnly="true">
- <c:description>comma-separated list of JNDI bindings (use '&comma;' if u need to use commas in your jndi name)</c:description>
+ <c:simple-property name="ha"
+ default="false" type="boolean"
+ displayName="supports high availability" required="true" readOnly="true">
+ <c:description>whether or not this connection factory will support high availability</c:description>
</c:simple-property>
- <c:simple-property name="liveTransportClassNames"
- default="org.hornetq.integration.transports.netty.NettyConnectorFactory"
+ <c:simple-property name="useDiscovery"
+ default="false" type="boolean"
+ displayName="use discovery" required="true" readOnly="true">
+ <c:description>whether or not to use connectors or discovery</c:description>
+ </c:simple-property>
+ <c:simple-property name="cfType"
+ default="1" type="integer"
+ displayName="the type of factory" required="true" readOnly="true">
+ <c:description>the connection factory type</c:description>
+ <c:property-options>
+ <c:option name="JMS Factory" value="0"/>
+ <c:option name="JMS Queue Factory" value="1"/>
+ <c:option name="JMS Topic Factory" value="2"/>
+ <c:option name="JMS XA Factory" value="3"/>
+ <c:option name="JMS XA Queue Factory" value="4"/>
+ <c:option name="JMS XA Topic Factory" value="5"/>
+ </c:property-options>
+ </c:simple-property>
+ <c:simple-property name="connectorNames"
+ default="netty"
readOnly="true" required="false"
displayName="Connectors to live servers">
- <c:description>comma-separated list of class names to connect to live servers, default id 'org.hornetq.core.remoting.impl.netty.NettyConnectorFactory'.</c:description>
+ <c:description>comma-separated list of either connector names or a discovery group name</c:description>
</c:simple-property>
- <c:simple-property name="liveTransportParams"
- default="{host=localhost,port=5445}"
- readOnly="true" required="false"
- displayName="Parameters for connectors to live servers">
- <c:description>comma-separated list of key=value parameters for the live connectors ( enclosed between { }), default is {host=localhost,port=5445}.</c:description>
+ <c:simple-property name="Bindings" displayName="JNDI Bindings" required="true" readOnly="true">
+ <c:description>comma-separated list of JNDI bindings (use '&comma;' if u need to use commas in your jndi name)</c:description>
</c:simple-property>
- <c:simple-property name="backupTransportClassNames"
- required="false" readOnly="true"
- displayName="Connectors to backup servers">
- <c:description>comma-separated list of class names to connect to backup servers, default is no backup connectors.</c:description>
- </c:simple-property>
- <c:simple-property name="backupTransportParams"
- required="false" readOnly="true"
- displayName="Parameters for connectors to backup servers">
- <c:description>comma-separated list of key=value parameters for the backup connectors ( enclosed between { }), default is no backup params. NB these must match 'Connectors to backup servers', 1 for each entry.</c:description>
- </c:simple-property>
- <c:simple-property name="DiscoveryAddress"
- required="false"
- readOnly="true"
- displayName="Discovery Address">
- <c:description>the address to listen to discover which connectors this connection factory can use. NB this is used instead of static connectors, i.e. setting 'Connectors to backup servers'.</c:description>
- </c:simple-property>
- <c:simple-property name="DiscoveryPort"
- readOnly="true"
- default="0" type="integer"
- displayName="Discovery Port">
- <c:description>port to listen on to discover which connectors this connection factory can use. Should be used when 'Discovery Address' is set and greater 0</c:description>
- </c:simple-property>
- <c:simple-property name="DiscoveryRefreshTimeout"
- default="10000" type="long"
- displayName="Discovery Refresh Timeout">
- <c:description>refresh timeout for discovered HornetQ servers in milli seconds.</c:description>
- </c:simple-property>
- <c:simple-property name="DiscoveryInitialWaitTimeout"
- default="2000" type="long"
- displayName="Discovery Initial Wait Timeout">
- <c:description>initial wait timeout if this connection factory is configured to use discovery, in milliseconds. </c:description>
- </c:simple-property>
<c:simple-property name="ClientID"
required="false"
displayName="Client ID">
@@ -763,11 +748,6 @@
displayName="Reconnect Attempts">
<c:description>maximum number of attempts to retry connection in case of failure</c:description>
</c:simple-property>
- <c:simple-property name="FailoverOnServerShutdown"
- default="false" type="boolean"
- displayName="Failover On Server Shutdown">
- <c:description>whether connections created by this factory must failover in case the server they are connected to has normally shut down</c:description>
- </c:simple-property>
<c:simple-property name="ScheduledThreadPoolMaxSize"
default="5" type="integer"
displayName="Scheduled Thread Pool Max Size">
14 years, 3 months
JBoss hornetq SVN: r9994 - trunk/tests/jms-tests/src/org/hornetq/jms/tests.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-12-05 05:14:53 -0500 (Sun, 05 Dec 2010)
New Revision: 9994
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java
Log:
fix test failure
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java 2010-12-05 10:00:09 UTC (rev 9993)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java 2010-12-05 10:14:53 UTC (rev 9994)
@@ -335,32 +335,32 @@
factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
assertTrue(factory instanceof ConnectionFactory);
- assertEquals(1, getTypes(factory));
+ assertEquals(3, getTypes(factory));
factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_TRUE");
assertTrue(factory instanceof XAConnectionFactory);
- assertEquals(1, getTypes(factory));
+ assertEquals(6, getTypes(factory));
factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_FALSE");
assertTrue(factory instanceof ConnectionFactory);
- assertEquals(1, getTypes(factory));
+ assertEquals(3, getTypes(factory));
factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC");
assertTrue(factory instanceof ConnectionFactory);
- assertEquals(1, getTypes(factory));
+ assertEquals(3, getTypes(factory));
factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC_XA_TRUE");
assertTrue(factory instanceof XAConnectionFactory);
- assertEquals(1, getTypes(factory));
+ assertEquals(6, getTypes(factory));
factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC_XA_FALSE");
assertTrue(factory instanceof ConnectionFactory);
- assertEquals(1, getTypes(factory));
+ assertEquals(3, getTypes(factory));
factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE");
14 years, 3 months
JBoss hornetq SVN: r9993 - in trunk: src/main/org/hornetq/api/core and 18 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-12-05 05:00:09 -0500 (Sun, 05 Dec 2010)
New Revision: 9993
Added:
trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java
trunk/src/main/org/hornetq/utils/InflaterReader.java
trunk/src/main/org/hornetq/utils/InflaterWriter.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
Modified:
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/api/core/Message.java
trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-448
Large Message Compression Impl
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2010-12-05 10:00:09 UTC (rev 9993)
@@ -75,7 +75,11 @@
</xsd:element>
<xsd:element name="min-large-message-size" type="xsd:long"
maxOccurs="1" minOccurs="0">
- </xsd:element>
+ </xsd:element>
+ <xsd:element name="compress-large-messages" type="xsd:boolean"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
+
<xsd:element name="client-id" type="xsd:string"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/Message.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -65,6 +65,8 @@
public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");
public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
+
+ public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
Modified: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -138,4 +138,7 @@
CoreRemotingConnection getConnection();
+ void setCompressLargeMessages(boolean compressLargeMessage);
+
+ boolean isCompressLargeMessages();
}
Modified: trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -43,6 +43,8 @@
// Any message beyond this size is considered a large message (to be sent in chunks)
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
+
+ public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -29,6 +29,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.utils.DecompressedLargeMessageBuffer;
import org.hornetq.utils.Future;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
@@ -454,6 +455,11 @@
// ClientConsumerInternal implementation
// --------------------------------------------------------------
+ public ClientSessionInternal getSession()
+ {
+ return session;
+ }
+
public SessionQueueQueryResponseMessage getQueueInfo()
{
return queueInfo;
@@ -554,7 +560,14 @@
currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
- currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ if (currentChunkMessage.isCompressed())
+ {
+ currentChunkMessage.setBuffer(new DecompressedLargeMessageBuffer(currentLargeMessageBuffer));
+ }
+ else
+ {
+ currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ }
currentChunkMessage.setFlowControlSize(0);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -67,4 +67,6 @@
void start();
SessionQueueQueryResponseMessage getQueueInfo();
+
+ ClientSessionInternal getSession();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -21,11 +21,11 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.utils.DataConstants;
/**
*
@@ -117,6 +117,11 @@
{
return largeMessage;
}
+
+ public boolean isCompressed()
+ {
+ return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
+ }
/**
* @param largeMessage the largeMessage to set
@@ -142,7 +147,6 @@
"]";
}
- // FIXME - only used for large messages - move it!
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
*/
@@ -150,7 +154,7 @@
{
if (largeMessage)
{
- ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
+ ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
}
else
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -44,4 +44,6 @@
void discardLargeBody();
void setBuffer(HornetQBuffer buffer);
+
+ boolean isCompressed();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -22,12 +22,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;
@@ -150,7 +151,7 @@
{
return;
}
-
+
doCleanup();
}
@@ -190,7 +191,7 @@
{
return credits;
}
-
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
@@ -203,7 +204,7 @@
{
session.returnCredits(address);
}
-
+
session.removeProducer(this);
closed = true;
@@ -212,12 +213,13 @@
private void doSend(final SimpleString address, final Message msg) throws HornetQException
{
MessageInternal msgI = (MessageInternal)msg;
-
+
ClientProducerCredits theCredits;
-
+
boolean isLarge;
- if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
+ if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+ msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
{
isLarge = true;
}
@@ -236,7 +238,7 @@
{
msg.setAddress(address);
}
-
+
// Anonymous
theCredits = session.getCredits(address, true);
}
@@ -250,7 +252,7 @@
{
msg.setAddress(this.address);
}
-
+
theCredits = credits;
}
@@ -270,8 +272,6 @@
session.workDone();
-
-
if (isLarge)
{
largeMessageSend(sendBlocking, msgI, theCredits);
@@ -322,8 +322,16 @@
* @param msgI
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits) throws HornetQException
{
+
+ if (session.isCompressLargeMessages())
+ {
+ msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
+ }
+
int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
if (headerSize >= minLargeMessageSize)
@@ -341,7 +349,6 @@
HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
msgI.encodeHeadersAndProperties(headerBuffer);
-
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
channel.send(initialChunk);
@@ -358,7 +365,7 @@
if (input != null)
{
- largeMessageSendStreamed(sendBlocking, input, credits);
+ largeMessageSendStreamed(sendBlocking, msgI, input, credits);
}
else
{
@@ -375,72 +382,29 @@
final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{
- BodyEncoder context = msgI.getBodyEncoder();
-
- final long bodySize = context.getLargeBodySize();
-
- context.open();
- try
- {
-
- for (int pos = 0; pos < bodySize;)
- {
- final boolean lastChunk;
-
- final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
-
- final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
-
- context.encode(bodyBuffer, chunkLength);
-
- pos += chunkLength;
-
- lastChunk = pos >= bodySize;
-
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
- .array(),
- !lastChunk,
- lastChunk && sendBlocking);
-
- if (sendBlocking && lastChunk)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk);
- }
- else
- {
- channel.send(chunk);
- }
-
- try
- {
- credits.acquireCredits(chunk.getPacketSize());
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- finally
- {
- context.close();
- }
+ msgI.getBodyBuffer().readerIndex(0);
+ largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits);
}
/**
- * TODO: This method could be eliminated and
- * combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}.
- * All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
* @param sendBlocking
* @param input
* @throws HornetQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
- final InputStream input,
+ final MessageInternal msgI,
+ final InputStream inputStreamParameter,
final ClientProducerCredits credits) throws HornetQException
{
boolean lastPacket = false;
+ InputStream input = inputStreamParameter;
+
+ if (session.isCompressLargeMessages())
+ {
+ input = new DeflaterReader(inputStreamParameter);
+ }
+
while (!lastPacket)
{
byte[] buff = new byte[minLargeMessageSize];
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -30,6 +30,7 @@
import org.hornetq.api.core.*;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -146,6 +147,8 @@
public final Exception e = new Exception();
private final Object waitLock = new Object();
+
+ private boolean compressLargeMessages;
// Static
// ---------------------------------------------------------------------------------------
@@ -202,6 +205,8 @@
closeExecutor = orderedExecutorFactory.getExecutor();
this.interceptors = interceptors;
+
+ compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
}
@@ -768,6 +773,7 @@
serverLocator.isBlockOnDurableSend(),
serverLocator.isCacheLargeMessagesClient(),
serverLocator.getMinLargeMessageSize(),
+ compressLargeMessages,
serverLocator.getInitialMessagePacketSize(),
serverLocator.getGroupID(),
connection,
@@ -1358,4 +1364,14 @@
cancelled = true;
}
}
+
+ public void setCompressLargeMessages(boolean compressLargeMessage)
+ {
+ this.compressLargeMessages = compressLargeMessage;
+ }
+
+ public boolean isCompressLargeMessages()
+ {
+ return this.compressLargeMessages;
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -155,6 +155,8 @@
private final boolean blockOnDurableSend;
private final int minLargeMessageSize;
+
+ private final boolean compressLargeMessages;
private volatile int initialMessagePacketSize;
@@ -207,6 +209,7 @@
final boolean blockOnDurableSend,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int initialMessagePacketSize,
final String groupID,
final CoreRemotingConnection remotingConnection,
@@ -259,6 +262,8 @@
this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
+
+ this.compressLargeMessages = compressLargeMessages;
this.initialMessagePacketSize = initialMessagePacketSize;
@@ -269,7 +274,7 @@
// ClientSession implementation
// -----------------------------------------------------------------
-
+
public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
{
internalCreateQueue(address, queueName, null, false, false);
@@ -684,6 +689,11 @@
{
return minLargeMessageSize;
}
+
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessages;
+ }
/**
* @return the cacheLargeMessageClient
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -39,6 +39,8 @@
boolean isCacheLargeMessageClient();
int getMinLargeMessageSize();
+
+ boolean isCompressLargeMessages();
void expire(long consumerID, long messageID) throws HornetQException;
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -561,4 +561,9 @@
{
session.addMetaData(key, data);
}
+
+ public boolean isCompressLargeMessages()
+ {
+ return session.isCompressLargeMessages();
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+
+/**
+ * A ConnectionManager
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 27 Nov 2008 18:45:46
+ *
+ *
+ */
+public interface FailoverManager
+{
+ ClientSession createSession(final String username,
+ final String password,
+ final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final int ackBatchSize,
+ final boolean cacheLargeMessageClient,
+ final int minLargeMessageSize,
+ final boolean compressLargeMessages,
+ final boolean blockOnAcknowledge,
+ final boolean autoGroup,
+ final int confirmationWindowSize,
+ final int producerWindowSize,
+ final int consumerWindowSize,
+ final int producerMaxRate,
+ final int consumerMaxRate,
+ final boolean blockOnNonDurableSend,
+ final boolean blockOnDurableSend,
+ final int initialMessagePacketSize,
+ final String groupID) throws HornetQException;
+
+ void removeSession(final ClientSessionInternal session);
+
+ public CoreRemotingConnection getConnection();
+
+ int numConnections();
+
+ int numSessions();
+
+ void addFailureListener(SessionFailureListener listener);
+
+ boolean removeFailureListener(SessionFailureListener listener);
+
+ void causeExit();
+
+}
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -287,7 +287,6 @@
*/
public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
{
-
if (outStream == null)
{
// There is no stream.. it will never achieve the end of streaming
@@ -1258,11 +1257,12 @@
{
try
{
+ output.write(packet.getBody());
if (!packet.isContinues())
{
streamEnded = true;
+ output.close();
}
- output.write(packet.getBody());
}
catch (IOException e)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
/**
* A LargeMessageBufferInternal
@@ -55,6 +56,8 @@
* Saves this buffer to the specified output.
*/
void saveBuffer(final OutputStream output) throws HornetQException;
+
+ public void addPacket(final SessionReceiveContinuationMessage packet);
/**
* Waits for the completion for the specified waiting time (in milliseconds).
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -177,6 +177,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessage,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
@@ -211,6 +212,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessages,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -69,6 +69,10 @@
int getMinLargeMessageSize();
void setMinLargeMessageSize(int minLargeMessageSize);
+
+ boolean isCompressLargeMessages();
+
+ void setCompressLargeMessages(boolean compress);
int getConsumerWindowSize();
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -60,6 +60,8 @@
private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
private int minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ private boolean compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
private int consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -248,6 +250,16 @@
this.minLargeMessageSize = minLargeMessageSize;
}
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessage;
+ }
+
+ public void setCompressLargeMessages(final boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
public int getConsumerWindowSize()
{
return consumerWindowSize;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -293,6 +293,11 @@
"min-large-message-size",
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
Validators.GT_ZERO);
+
+ boolean compressLargeMessages = XMLConfigurationUtil.getBoolean(e,
+ "compress-large-messages",
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES);
+
boolean blockOnAcknowledge = XMLConfigurationUtil.getBoolean(e,
"block-on-acknowledge",
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
@@ -417,6 +422,7 @@
cfConfig.setCallTimeout(callTimeout);
cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+ cfConfig.setCompressLargeMessages(compressLargeMessages);
cfConfig.setConsumerWindowSize(consumerWindowSize);
cfConfig.setConsumerMaxRate(consumerMaxRate);
cfConfig.setConfirmationWindowSize(confirmationWindowSize);
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -18,7 +18,6 @@
import javax.naming.Context;
import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
import javax.naming.NamingException;
import javax.transaction.xa.Xid;
@@ -723,6 +722,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessage,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -761,6 +761,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessage);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -797,6 +798,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -836,6 +838,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessages);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
Added: trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,1091 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.LargeMessageBufferInternal;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * A DecompressedHornetQBuffer
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DecompressedLargeMessageBuffer implements LargeMessageBufferInternal
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final String OPERATION_NOT_SUPPORTED = "Operation not supported";
+
+ private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
+
+ // Attributes ----------------------------------------------------
+
+ final LargeMessageBufferInternal bufferDelegate;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate)
+ {
+ this.bufferDelegate = bufferDelegate;
+ }
+
+
+ // Public --------------------------------------------------------
+
+ /**
+ *
+ */
+ public void discardUnusedPackets()
+ {
+ bufferDelegate.discardUnusedPackets();
+ }
+
+ /**
+ * Add a buff to the List, or save it to the OutputStream if set
+ * @param packet
+ */
+ public void addPacket(final SessionReceiveContinuationMessage packet)
+ {
+ bufferDelegate.addPacket(packet);
+ }
+
+ public synchronized void cancel()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public synchronized void close()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public void setOutputStream(final OutputStream output) throws HornetQException
+ {
+ bufferDelegate.setOutputStream(new InflaterWriter(output));
+ }
+
+ public synchronized void saveBuffer(final OutputStream output) throws HornetQException
+ {
+ setOutputStream(output);
+ waitCompletion(0);
+ }
+
+ /**
+ *
+ * @param timeWait Milliseconds to Wait. 0 means forever
+ * @throws Exception
+ */
+ public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
+ {
+ return bufferDelegate.waitCompletion(timeWait);
+ }
+
+ // Channel Buffer Implementation ---------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#array()
+ */
+ public byte[] array()
+ {
+ throw new IllegalAccessError("array not supported on LargeMessageBufferImpl");
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#capacity()
+ */
+ public int capacity()
+ {
+ return -1;
+ }
+
+ DataInputStream dataInput = null;
+
+ private DataInputStream getStream()
+ {
+ if (dataInput == null)
+ {
+ try
+ {
+ InputStream input = new HornetQBufferInputStream(bufferDelegate);
+
+ dataInput = new DataInputStream(new InflaterReader(input));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ }
+ return dataInput;
+ }
+
+ private void positioningNotSupported()
+ {
+ throw new IllegalStateException("Position not supported over compressed large messages");
+ }
+
+ public byte readByte()
+ {
+ try
+ {
+ return getStream().readByte();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getByte(int)
+ */
+ public byte getByte(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ private byte getByte(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final long index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, byte[], int, int)
+ */
+ public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.ByteBuffer)
+ */
+ public void getBytes(final int index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.io.OutputStream, int)
+ */
+ public void getBytes(final int index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.channels.GatheringByteChannel, int)
+ */
+ public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getInt(int)
+ */
+ public int getInt(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public int getInt(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getLong(int)
+ */
+ public long getLong(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long getLong(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getShort(int)
+ */
+ public short getShort(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public short getShort(final long index)
+ {
+ return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getUnsignedMedium(int)
+ */
+ public int getUnsignedMedium(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+
+
+ public int getUnsignedMedium(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setByte(int, byte)
+ */
+ public void setByte(final int index, final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, byte[], int, int)
+ */
+ public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.ByteBuffer)
+ */
+ public void setBytes(final int index, final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.io.InputStream, int)
+ */
+ public int setBytes(final int index, final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.channels.ScatteringByteChannel, int)
+ */
+ public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setInt(int, int)
+ */
+ public void setInt(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setLong(int, long)
+ */
+ public void setLong(final int index, final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setMedium(int, int)
+ */
+ public void setMedium(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setShort(int, short)
+ */
+ public void setShort(final int index, final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toByteBuffer(int, int)
+ */
+ public ByteBuffer toByteBuffer(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toString(int, int, java.lang.String)
+ */
+ public String toString(final int index, final int length, final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int readerIndex()
+ {
+ return 0;
+ }
+
+ public void readerIndex(final int readerIndex)
+ {
+ // TODO
+ }
+
+ public int writerIndex()
+ {
+ // TODO
+ return 0;
+ }
+
+ public long getSize()
+ {
+ // TODO
+ return 0;
+ }
+
+ public void writerIndex(final int writerIndex)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setIndex(final int readerIndex, final int writerIndex)
+ {
+ positioningNotSupported();
+ }
+
+ public void clear()
+ {
+ }
+
+ public boolean readable()
+ {
+ return true;
+ }
+
+ public boolean writable()
+ {
+ return false;
+ }
+
+ public int readableBytes()
+ {
+ return 1;
+ }
+
+ public int writableBytes()
+ {
+ return 0;
+ }
+
+ public void markReaderIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetReaderIndex()
+ {
+ // TODO: reset positioning if possible
+ }
+
+ public void markWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void discardReadBytes()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short getUnsignedByte(final int index)
+ {
+ return (short)(getByte(index) & 0xFF);
+ }
+
+ public int getUnsignedShort(final int index)
+ {
+ return getShort(index) & 0xFFFF;
+ }
+
+ public int getMedium(final int index)
+ {
+ int value = getUnsignedMedium(index);
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+ public long getUnsignedInt(final int index)
+ {
+ return getInt(index) & 0xFFFFFFFFL;
+ }
+
+ public void getBytes(int index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(long index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst)
+ {
+ getBytes(index, dst, dst.writableBytes());
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ getBytes(index, dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void setBytes(final int index, final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setZero(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short readUnsignedByte()
+ {
+ try
+ {
+ return (short)getStream().readUnsignedByte();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public short readShort()
+ {
+ try
+ {
+ return (short)getStream().readShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readUnsignedShort()
+ {
+ try
+ {
+ return (int)getStream().readUnsignedShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readMedium()
+ {
+ int value = readUnsignedMedium();
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+
+ public int readUnsignedMedium()
+ {
+ return (readByte() & 0xff) << 16 | (readByte() & 0xff) << 8 | (readByte() & 0xff) << 0;
+ }
+
+ public int readInt()
+ {
+ try
+ {
+ return getStream().readInt();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public int readInt(final int pos)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long readUnsignedInt()
+ {
+ return readInt() & 0xFFFFFFFFL;
+ }
+
+ public long readLong()
+ {
+ try
+ {
+ return getStream().readLong();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst, final int dstIndex, final int length)
+ {
+ try
+ {
+ getStream().read(dst, dstIndex, length);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst)
+ {
+ readBytes(dst, 0, dst.length);
+ }
+
+ public void readBytes(final HornetQBuffer dst)
+ {
+ readBytes(dst, dst.writableBytes());
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ readBytes(dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ byte[] destBytes = new byte[length];
+ readBytes(destBytes);
+ dst.setBytes(dstIndex, destBytes);
+ }
+
+ public void readBytes(final ByteBuffer dst)
+ {
+ byte bytesToGet[] = new byte[dst.remaining()];
+ readBytes(bytesToGet);
+ dst.put(bytesToGet);
+ }
+
+ public int readBytes(final GatheringByteChannel out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void readBytes(final OutputStream out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void skipBytes(final int length)
+ {
+
+ try
+ {
+ for (int i = 0 ; i < length; i++)
+ {
+ getStream().read();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void writeByte(final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeShort(final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeMedium(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeInt(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeLong(final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeZero(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer toByteBuffer()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public String toString(final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public Object getUnderlyingBuffer()
+ {
+ return this;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readBoolean()
+ */
+ public boolean readBoolean()
+ {
+ return readByte() != 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readChar()
+ */
+ public char readChar()
+ {
+ return (char)readShort();
+ }
+
+ public char getChar(final int index)
+ {
+ return (char)getShort(index);
+ }
+
+ public double getDouble(final int index)
+ {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ public float getFloat(final int index)
+ {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ public HornetQBuffer readBytes(final int length)
+ {
+ byte bytesToGet[] = new byte[length];
+ readBytes(bytesToGet);
+ return HornetQBuffers.wrappedBuffer(bytesToGet);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
+ */
+ public double readDouble()
+ {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readFloat()
+ */
+ public float readFloat()
+ {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableSimpleString()
+ */
+ public SimpleString readNullableSimpleString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readSimpleString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableString()
+ */
+ public String readNullableString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readSimpleString()
+ */
+ public SimpleString readSimpleString()
+ {
+ int len = readInt();
+ byte[] data = new byte[len];
+ readBytes(data);
+ return new SimpleString(data);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readString()
+ */
+ public String readString()
+ {
+ int len = readInt();
+
+ if (len < 9)
+ {
+ char[] chars = new char[len];
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = (char)readShort();
+ }
+ return new String(chars);
+ }
+ else if (len < 0xfff)
+ {
+ return readUTF();
+ }
+ else
+ {
+ return readSimpleString().toString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readUTF()
+ */
+ public String readUTF()
+ {
+ return UTF8Util.readUTF(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeBoolean(boolean)
+ */
+ public void writeBoolean(final boolean val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeChar(char)
+ */
+ public void writeChar(final char val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeDouble(double)
+ */
+ public void writeDouble(final double val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeFloat(float)
+ */
+ public void writeFloat(final float val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeNullableSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableString(java.lang.String)
+ */
+ public void writeNullableString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeString(java.lang.String)
+ */
+ public void writeString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeUTF(java.lang.String)
+ */
+ public void writeUTF(final String utf)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#compareTo(org.hornetq.api.core.buffers.ChannelBuffer)
+ */
+ public int compareTo(final HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+ public HornetQBuffer copy()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public HornetQBuffer slice(final int index, final int length)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @param body
+ */
+ // Inner classes -------------------------------------------------
+
+ public ChannelBuffer channelBuffer()
+ {
+ return null;
+ }
+
+ public HornetQBuffer copy(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer duplicate()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer readSlice(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setChar(final int index, final char value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setDouble(final int index, final double value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setFloat(final int index, final float value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer slice()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+}
Added: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Deflater;
+
+/**
+ * A DeflaterReader
+ * The reader takes an inputstream and compress it.
+ * Not for concurrent use.
+
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class DeflaterReader extends InputStream
+{
+ private Deflater deflater = new Deflater();
+ private boolean isFinished = false;
+ private boolean compressDone = false;
+
+ private InputStream input;
+
+ public DeflaterReader(InputStream inData)
+ {
+ input = inData;
+ }
+
+ public int read() throws IOException
+ {
+ byte[] buffer = new byte[1];
+ int n = read(buffer, 0, 1);
+ if (n == 1)
+ {
+ return (int)buffer[0] & 0xFF;
+ }
+ if (n == -1 || n == 0)
+ {
+ return -1;
+ }
+ throw new IOException("Error reading data, invalid n: " + n);
+ }
+
+ /**
+ * Try to fill the buffer with compressed bytes. Except the last effective read,
+ * this method always returns with a full buffer of compressed data.
+ *
+ * @param buffer the buffer to fill compressed bytes
+ * @return the number of bytes really filled, -1 indicates end.
+ * @throws IOException
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException
+ {
+ if (compressDone)
+ {
+ return -1;
+ }
+
+ //buffer for reading input stream
+ byte[] readBuffer = new byte[2 * len];
+
+ int n = 0;
+ int read = 0;
+
+ while (len > 0)
+ {
+ n = deflater.deflate(buffer, offset, len);
+ if (n == 0)
+ {
+ if (isFinished)
+ {
+ deflater.end();
+ compressDone = true;
+ break;
+ }
+ else if (deflater.needsInput())
+ {
+ // read some data from inputstream
+ int m = input.read(readBuffer);
+
+ if (m == -1)
+ {
+ deflater.finish();
+ isFinished = true;
+ }
+ else
+ {
+ deflater.setInput(readBuffer, 0, m);
+ }
+ }
+ else
+ {
+ deflater.finish();
+ isFinished = true;
+ }
+ }
+ else
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ return read;
+ }
+
+}
Added: trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java
===================================================================
--- trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Used to send large messages
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStream extends InputStream
+{
+
+ /* (non-Javadoc)
+ * @see java.io.InputStream#read()
+ */
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private HornetQBuffer bb;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer)
+ {
+ bb = paramByteBuffer;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (remainingBytes() == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return bb.readByte() & 0xFF;
+ }
+ }
+
+ @Override
+ public int read(final byte[] byteArray) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ return read(byteArray, 0, byteArray.length);
+ }
+
+ @Override
+ public int read(final byte[] byteArray, final int off, final int len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (byteArray == null)
+ {
+ throw new NullPointerException();
+ }
+ if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0)
+ {
+ return 0;
+ }
+
+ int size = Math.min(remainingBytes(), len);
+
+ if (size == 0)
+ {
+ return -1;
+ }
+
+ bb.readBytes(byteArray, off, size);
+ return size;
+ }
+
+ @Override
+ public long skip(final long len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("skip on a closed InputStream");
+ }
+
+ if (len <= 0L)
+ {
+ return 0L;
+ }
+
+ int size = Math.min(remainingBytes(), (int) len);
+
+ bb.skipBytes((int)size);
+
+ return size;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("available on a closed InputStream");
+ }
+
+ return remainingBytes();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ bb = null;
+ }
+
+ @Override
+ public synchronized void mark(final int paramInt)
+ {
+ }
+
+ @Override
+ public synchronized void reset() throws IOException
+ {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @return
+ */
+ private int remainingBytes()
+ {
+ return bb.writerIndex() - bb.readerIndex();
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/src/main/org/hornetq/utils/InflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/InflaterReader.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/InflaterReader.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+/**
+ * An InflaterReader
+ * It takes an compressed input stream and decompressed it as it is being read.
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class InflaterReader extends InputStream
+{
+ private Inflater inflater = new Inflater();
+
+ private InputStream input;
+
+ private byte[] readBuffer;
+ private int pointer;
+ private int length;
+
+ public InflaterReader(InputStream input)
+ {
+ this(input, 1024);
+ }
+
+ public InflaterReader(InputStream input, int bufferSize)
+ {
+ this.input = input;
+ this.readBuffer = new byte[bufferSize];
+ this.pointer = -1;
+ }
+
+ public int read() throws IOException
+ {
+ if (pointer == -1)
+ {
+ try
+ {
+ length = doRead(readBuffer, 0, readBuffer.length);
+ if (length == 0)
+ {
+ return -1;
+ }
+ pointer = 0;
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ int value = readBuffer[pointer] & 0xFF;
+ pointer++;
+ if (pointer == length)
+ {
+ pointer = -1;
+ }
+
+ return value;
+ }
+
+ /*
+ * feed inflater more bytes in order to get some
+ * decompressed output.
+ * returns number of bytes actually got
+ */
+ private int doRead(byte[] buf, int offset, int len) throws DataFormatException, IOException
+ {
+ int read = 0;
+ int n = 0;
+ byte[] inputBuffer = new byte[len];
+
+ while (len > 0)
+ {
+ n = inflater.inflate(buf, offset, len);
+ if (n == 0)
+ {
+ if (inflater.finished())
+ {
+ break;
+ }
+ else if (inflater.needsInput())
+ {
+ //feeding
+ int m = input.read(inputBuffer);
+
+ if (m == -1)
+ {
+ //it shouldn't be here, throw exception
+ throw new DataFormatException("Input is over while inflater still expecting data");
+ }
+ else
+ {
+ //feed the data in
+ inflater.setInput(inputBuffer);
+ n = inflater.inflate(buf, offset, len);
+ if (n > 0)
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ }
+ else
+ {
+ //it shouldn't be here, throw
+ throw new DataFormatException("Inflater is neither finished nor needing input.");
+ }
+ }
+ else
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ return read;
+ }
+
+}
Added: trunk/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
--- trunk/src/main/org/hornetq/utils/InflaterWriter.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/InflaterWriter.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+/**
+ * A InflaterWriter
+ *
+ * This class takes an OutputStream. Compressed bytes
+ * can directly be written into this class. The class will
+ * decompress the bytes and write them to the output stream.
+ *
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class InflaterWriter extends OutputStream
+{
+ private Inflater inflater = new Inflater();
+ private OutputStream output;
+
+ private byte[] writeBuffer = new byte[1024];
+ private int writePointer = 0;
+
+ private byte[] outputBuffer = new byte[writeBuffer.length*2];
+
+ public InflaterWriter(OutputStream output)
+ {
+ this.output = output;
+ }
+
+ /*
+ * Write a compressed byte.
+ */
+ @Override
+ public void write(int b) throws IOException
+ {
+ writeBuffer[writePointer] = (byte)(b & 0xFF);
+ writePointer++;
+
+ if (writePointer == writeBuffer.length)
+ {
+ writePointer = 0;
+ try
+ {
+ doWrite();
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException("Error decompressing data", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (writePointer > 0)
+ {
+ inflater.setInput(writeBuffer, 0, writePointer);
+ try
+ {
+ int n = inflater.inflate(outputBuffer);
+ while (n > 0)
+ {
+ output.write(outputBuffer, 0, n);
+ n = inflater.inflate(outputBuffer);
+ }
+ output.close();
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private void doWrite() throws DataFormatException, IOException
+ {
+ inflater.setInput(writeBuffer);
+ int n = inflater.inflate(outputBuffer);
+
+ while (n > 0)
+ {
+ output.write(outputBuffer, 0, n);
+ n = inflater.inflate(outputBuffer);
+ }
+ }
+
+}
Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-12-05 10:00:09 UTC (rev 9993)
@@ -20,6 +20,7 @@
<producer-window-size>7712652</producer-window-size>
<producer-max-rate>789</producer-max-rate>
<min-large-message-size>12</min-large-message-size>
+ <compress-large-messages>true</compress-large-messages>
<client-id>TestClientID</client-id>
<dups-ok-batch-size>3456</dups-ok-batch-size>
<transaction-batch-size>4567</transaction-batch-size>
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -68,6 +68,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -14,15 +14,10 @@
package org.hornetq.jms.tests;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import javax.naming.InitialContext;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.client.HornetQQueueConnectionFactory;
import org.hornetq.jms.client.HornetQTopicConnectionFactory;
@@ -81,6 +76,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -115,6 +111,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -149,6 +146,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -303,6 +303,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
prefetchSize,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Copied: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+
+/**
+ * A LargeMessageCompressTest
+ *
+ * Just extend the LargeMessageTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ *
+ */
+public class LargeMessageCompressTest extends LargeMessageTest
+{
+ // Constructors --------------------------------------------------
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected ClientSessionFactory createSessionFactory() throws Exception
+ {
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+ return sf;
+ }
+
+
+ public void testLargeMessageCompression() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = msg1.getBodyBuffer().readByte();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ msg1.acknowledge();
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testLargeMessageCompression2() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ msg1.setOutputStream(output);
+
+ msg1.waitOutputStreamCompletion(0);
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testLargeMessageCompression3() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ msg1.saveToOutputStream(output);
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+
+ // below are large message tests that are not applied to compressed messages
+
+ public void testResendSmallStreamMessage() throws Exception
+ {
+ }
+
+ public void testResendLargeStreamMessage() throws Exception
+ {
+ }
+
+ public void testResendCachedSmallStreamMessage() throws Exception
+ {
+ }
+
+ public void testResendCachedLargeStreamMessage() throws Exception
+ {
+ }
+
+ public void testSimpleRollback() throws Exception
+ {
+ }
+
+ public void testSimpleRollbackXA() throws Exception
+ {
+ }
+
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -59,7 +59,7 @@
// Static --------------------------------------------------------
private final Logger log = Logger.getLogger(LargeMessageTest.class);
- private ServerLocator locator;
+ protected ServerLocator locator;
// Constructors --------------------------------------------------
@@ -70,6 +70,11 @@
return false;
}
+ protected ClientSessionFactory createSessionFactory() throws Exception
+ {
+ return locator.createSessionFactory();
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -82,7 +87,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -153,7 +158,7 @@
public void doTestLargeBuffer(boolean transacted) throws Exception
{
final int journalsize = 100 * 1024;
- final int messageSize = 3 * journalsize;
+ final int messageSize = 3 * journalsize + 5;
// final int messageSize = 5 * 1024;
ClientSession session = null;
@@ -170,7 +175,9 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
+
+ sf.setCompressLargeMessages(true);
session = sf.createSession(!transacted, !transacted, 0);
@@ -254,7 +261,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -307,7 +314,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -381,7 +388,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -464,7 +471,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -599,7 +606,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -675,7 +682,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -742,7 +749,7 @@
server.getAddressSettingsRepository().addMatch("*", addressSettings);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -784,7 +791,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -849,7 +856,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -887,6 +894,7 @@
}
catch (Throwable e)
{
+ log.error("failed", e);
failed = true;
}
@@ -955,7 +963,7 @@
locator.setCacheLargeMessagesClient(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -1887,7 +1895,7 @@
SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1970,7 +1978,7 @@
SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1998,7 +2006,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
}
@@ -2052,7 +2060,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(isXA, false, false);
@@ -2082,7 +2090,7 @@
session.close();
server.stop();
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(isXA, false, false);
session.rollback(xid);
@@ -2139,7 +2147,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(isXA, false, false);
@@ -2276,7 +2284,7 @@
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, false, false, false, 0);
@@ -2380,7 +2388,7 @@
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, false, false, false, 0);
@@ -2483,7 +2491,7 @@
locator.setMinLargeMessageSize(100 * 1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2557,7 +2565,7 @@
locator.setMinLargeMessageSize(1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2633,7 +2641,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(false, false);
@@ -2687,6 +2695,77 @@
}
}
+ public void testLargeMessageCompression() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ //System.out.print(msg1.getBodyBuffer().readByte() + " ");
+ //if (i % 100 == 0) System.out.println();
+ byte b = msg1.getBodyBuffer().readByte();
+ //System.out.println("Byte read: " + (char)b + " i " + i);
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ msg1.acknowledge();
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -2731,7 +2810,7 @@
try
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
if (sendBlocking)
{
@@ -2776,7 +2855,7 @@
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
}
session = sf.createSession(null, null, false, true, true, false, 0);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -136,6 +136,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -214,6 +214,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -307,6 +307,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -76,6 +76,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -242,6 +242,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -154,6 +154,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -74,6 +74,7 @@
assertEquals(7712652, cfConfig.getProducerWindowSize());
assertEquals(789, cfConfig.getProducerMaxRate());
assertEquals(12, cfConfig.getMinLargeMessageSize());
+ assertEquals(true, cfConfig.isCompressLargeMessages());
assertEquals("TestClientID", cfConfig.getClientID());
assertEquals(3456, cfConfig.getDupsOKBatchSize());
assertEquals(4567, cfConfig.getTransactionBatchSize());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -15,6 +15,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -35,12 +36,14 @@
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.LargeMessageBufferImpl;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
/**
* A LargeMessageBufferUnitTest
@@ -56,7 +59,7 @@
// Attributes ----------------------------------------------------
- static int tmpFileCounter = 0;
+ static int tmpFileCounter = 0;
// Static --------------------------------------------------------
@@ -67,13 +70,13 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
tmpFileCounter++;
File tmp = new File(getTestDir());
tmp.mkdirs();
}
-
+
protected void tearDown() throws Exception
{
super.tearDown();
@@ -166,6 +169,20 @@
}
}
+ public void testReadIntegersOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithIntegers(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readInt());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
// testing void getBytes(int index, ChannelBuffer dst, int dstIndex, int length)
public void testReadLongs() throws Exception
{
@@ -186,6 +203,20 @@
}
}
+ public void testReadLongsOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithLongs(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readLong());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
public void testReadData() throws Exception
{
HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
@@ -315,14 +346,14 @@
Assert.assertEquals(i, bytes[i]);
}
}
-
+
public void testSplitBufferOnFile() throws Exception
{
LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
- 1024 * 1024,
- 1,
- getTestFile(),
- 1024);
+ 1024 * 1024,
+ 1,
+ getTestFile(),
+ 1024);
try
{
@@ -525,6 +556,36 @@
}
+ public void testReadBytesOnStreaming() throws Exception
+ {
+ byte[] byteArray = new byte[1024];
+ for (int i = 0; i < byteArray.length; i++)
+ {
+ byteArray[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer splitbuffer = splitBuffer(3, byteArray);
+
+ HornetQBufferInputStream is = new HornetQBufferInputStream(splitbuffer);
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertEquals(getSamplebyte(i), (byte)is.read());
+ }
+
+ for (int i = 100; i < byteArray.length; i += 10)
+ {
+ byte readBytes[] = new byte[10];
+
+ int size = is.read(readBytes);
+
+ for (int j = 0; j < size; j++)
+ {
+ assertEquals(getSamplebyte(i + j), readBytes[j]);
+ }
+ }
+ }
+
/**
* @return
*/
@@ -795,6 +856,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientConsumerInternal#getSession()
+ */
+ public ClientSessionInternal getSession()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
Copied: trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.zip.Deflater;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.InflaterReader;
+import org.hornetq.utils.InflaterWriter;
+
+/**
+ * A CompressionUtilTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class CompressionUtilTest extends UnitTestCase
+{
+
+ public void testDeflaterReader() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+
+ DeflaterReader reader = new DeflaterReader(inputStream);
+
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+ int b = reader.read();
+
+ while (b != -1)
+ {
+ zipHolder.add(b);
+ b = reader.read();
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ compareByteArray(allCompressed, output, compressedDataLength);
+ }
+
+ public void testDeflaterReader2() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+
+ DeflaterReader reader = new DeflaterReader(inputStream);
+
+ byte[] buffer = new byte[7];
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+
+ int n = reader.read(buffer);
+ while (n != -1)
+ {
+ for (int i = 0; i < n; i++)
+ {
+ zipHolder.add((int)buffer[i]);
+ }
+ n = reader.read(buffer);
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ compareByteArray(allCompressed, output, compressedDataLength);
+ }
+
+ public void testInflaterReader() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ InflaterReader inflater = new InflaterReader(byteInput);
+ ArrayList<Integer> holder = new ArrayList<Integer>();
+ int read = inflater.read();
+
+ while (read != -1)
+ {
+ holder.add(read);
+ read = inflater.read();
+ }
+
+ byte[] result = new byte[holder.size()];
+
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = holder.get(i).byteValue();
+ }
+
+ String txt = new String(result);
+
+ assertEquals(inputString, txt);
+
+ }
+
+ public void testInflaterWriter() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+ InflaterWriter writer = new InflaterWriter(byteOutput);
+
+ byte[] zipBuffer = new byte[12];
+
+ int n = byteInput.read(zipBuffer);
+ while (n > 0)
+ {
+ writer.write(zipBuffer, 0, n);
+ n = byteInput.read(zipBuffer);
+ }
+
+ writer.close();
+
+ byte[] outcome = byteOutput.toByteArray();
+ String outStr = new String(outcome);
+
+ assertEquals(inputString, outStr);
+ }
+
+ private void compareByteArray(byte[] first, byte[] second, int length)
+ {
+ for (int i = 0; i < length; i++)
+ {
+ assertEquals(first[i], second[i]);
+ }
+ }
+}
Copied: trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
+
+/**
+ * A HornetQInputStreamTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStreamTest extends UnitTestCase
+{
+
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testReadBytes() throws Exception
+ {
+ byte bytes[] = new byte[10*1024];
+ for (int i = 0 ; i < bytes.length; i++)
+ {
+ bytes[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+
+ // First read byte per byte
+ for (int i = 0 ; i < 1024; i++)
+ {
+ assertEquals(getSamplebyte(i), is.read());
+ }
+
+ // Second, read in chunks
+ for (int i = 1; i < 10; i++)
+ {
+ bytes = new byte[1024];
+ is.read(bytes);
+ for (int j = 0 ; j < bytes.length; j++)
+ {
+ assertEquals(getSamplebyte(i * 1024 + j), bytes[j]);
+ }
+
+ }
+
+ assertEquals(-1, is.read());
+
+
+ bytes = new byte[1024];
+
+ int sizeRead = is.read(bytes);
+
+ assertEquals(-1, sizeRead);
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -190,6 +190,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -49,8 +49,6 @@
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
-import static org.hornetq.tests.util.ServiceTestBase.*;
-
/**
*
* Base class with basic utilities on starting up a basic server
14 years, 3 months
JBoss hornetq SVN: r9992 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-03 14:28:19 -0500 (Fri, 03 Dec 2010)
New Revision: 9992
Added:
tags/HornetQ_2_2_0_QA3/
Log:
uploading another build for EAP tests
Copied: tags/HornetQ_2_2_0_QA3 (from rev 9991, trunk)
14 years, 3 months
JBoss hornetq SVN: r9991 - trunk.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-03 14:24:39 -0500 (Fri, 03 Dec 2010)
New Revision: 9991
Modified:
trunk/build-maven.xml
Log:
Upgrade version on maven upload
Modified: trunk/build-maven.xml
===================================================================
--- trunk/build-maven.xml 2010-12-03 19:17:35 UTC (rev 9990)
+++ trunk/build-maven.xml 2010-12-03 19:24:39 UTC (rev 9991)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.QA2"/>
+ <property name="hornetq.version" value="2.2.0.QA3"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
14 years, 3 months