Author: clebert.suconic(a)jboss.com
Date: 2011-04-21 18:08:46 -0400 (Thu, 21 Apr 2011)
New Revision: 10550
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQActivationTest.java
Modified:
trunk/
trunk/distribution/
trunk/distribution/hornetq/
trunk/distribution/jnp-client/
trunk/docs/
trunk/docs/user-manual/
trunk/examples/
trunk/hornetq-bootstrap/
trunk/hornetq-core-client/
trunk/hornetq-core/
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientSession.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java
trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/FilterImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCache.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/FileLockNodeManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/hornetq-jboss-as-integration/
trunk/hornetq-jms-client/
trunk/hornetq-jms/
trunk/hornetq-logging/
trunk/hornetq-ra/
trunk/hornetq-ra/hornetq-ra-jar/
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAProperties.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/Util.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java
trunk/hornetq-ra/hornetq-ra-rar/
trunk/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml
trunk/hornetq-rest/
trunk/hornetq-rest/hornetq-rest/
trunk/hornetq-service-sar/
trunk/hornetq-spring-integration/
trunk/hornetq-twitter-integration/
trunk/tests/
trunk/tests/concurrent-tests/
trunk/tests/integration-tests/
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DurableQueueTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageDurabilityTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SelfExpandingBufferTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/ReplicatedJMSFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/config/JMSConfigurationTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/logging/LogDelegateTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
trunk/tests/jms-tests/
trunk/tests/jms-tests/src/test/java/org/hornetq/jms/tests/message/MessageHeaderTest.java
trunk/tests/joram-tests/
trunk/tests/performance-tests/
trunk/tests/soak-tests/
trunk/tests/stress-tests/
trunk/tests/timing-tests/
trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/util/UUIDTest.java
trunk/tests/unit-tests/
trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/jms/misc/ManifestTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
Log:
Synchronizing changes made on Branch_2_2
Property changes on: trunk
___________________________________________________________________
Modified: svn:ignore
- build
eclipse-output
thirdparty
logs
ObjectStore
tmp
data
junit*.properties
target
+ build
eclipse-output
thirdparty
logs
ObjectStore
tmp
data
junit*.properties
target
.metadata
Property changes on: trunk/distribution
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.project
Property changes on: trunk/distribution/hornetq
___________________________________________________________________
Added: svn:ignore
+ .project
Property changes on: trunk/distribution/jnp-client
___________________________________________________________________
Modified: svn:ignore
- target
+ target
bin
.classpath
.project
Property changes on: trunk/docs
___________________________________________________________________
Added: svn:ignore
+ .project
Property changes on: trunk/docs/user-manual
___________________________________________________________________
Modified: svn:ignore
- build
target
+ build
target
bin
.classpath
.project
Property changes on: trunk/examples
___________________________________________________________________
Added: svn:ignore
+ bin
.project
Property changes on: trunk/hornetq-bootstrap
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-core
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientSession.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientSession.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientSession.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -567,4 +567,12 @@
* @throws HornetQException
*/
void addMetaData(String key, String data) throws HornetQException;
+
+ /**
+ * Attach any metadata to the session.
+ * Sends a Metadata using the older version
+ * @deprecated Use {@link ClientSession#addMetaData(String, String)}
+ * @throws HornetQException
+ */
+ void addMetaDataV1(String key, String data) throws HornetQException;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -83,9 +83,13 @@
semaphore.drainPermits();
+ int beforeFailure = arriving;
+
arriving = 0;
- checkCredits(windowSize * 2);
+ // If we are waiting for more credits than what's configured, then we need to
use what we tried before
+ // otherwise the client may starve as the credit will never arrive
+ checkCredits(Math.max(windowSize * 2, beforeFailure));
}
public void close()
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -452,6 +452,11 @@
{
stopPingingAfterOne = true;
}
+
+ public void resumePinging()
+ {
+ stopPingingAfterOne = false;
+ }
// Protected
//
------------------------------------------------------------------------------------
@@ -1088,6 +1093,8 @@
0,
clientFailureCheckPeriod,
TimeUnit.MILLISECONDS);
+ // To make sure the first ping will be sent
+ pingRunnable.send();
}
// send a ping every time we create a new remoting connection
// to set up its TTL on the server side
@@ -1308,8 +1315,8 @@
first = false;
long now = System.currentTimeMillis();
-
- if (clientFailureCheckPeriod != -1 && now >= lastCheck +
clientFailureCheckPeriod)
+
+ if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now
>= lastCheck + connectionTTL )
{
if (!connection.checkDataReceived())
{
@@ -1335,6 +1342,14 @@
}
}
+ send();
+ }
+
+ /**
+ *
+ */
+ public void send()
+ {
// Send a ping
Ping ping = new Ping(connectionTTL);
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -47,6 +47,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -1083,25 +1084,22 @@
}
// Resetting the metadata after failover
- try
+ for (Map.Entry<String, String> entries : metadata.entrySet())
{
- for (Map.Entry<String, String> entries : metadata.entrySet())
- {
- addMetaData(entries.getKey(), entries.getValue());
- }
+ sendPacketWithoutLock(new SessionAddMetaDataMessageV2(entries.getKey(),
entries.getValue(), false));
}
- catch (HornetQException e)
- {
+ }
- log.warn("Error on resending metadata: " + metadata, e);
-
- }
+ public void addMetaDataV1(String key, String data) throws HornetQException
+ {
+ metadata.put(key, data);
+ channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
}
public void addMetaData(String key, String data) throws HornetQException
{
metadata.put(key, data);
- channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
+ channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
public ClientSessionFactoryInternal getSessionFactory()
@@ -1608,6 +1606,16 @@
{
return remotingConnection;
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (Map.Entry<String, String> entry : metadata.entrySet())
+ {
+ buffer.append(entry.getKey() + "=" + entry.getValue() +
",");
+ }
+ return "ClientSessionImpl::(" + buffer.toString() + ")";
+ }
// Protected
// ----------------------------------------------------------------------------
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -566,7 +566,14 @@
{
session.addMetaData(key, data);
}
+
+ @Deprecated
+ public void addMetaDataV1(String key, String data) throws HornetQException
+ {
+ session.addMetaDataV1(key, data);
+ }
+
public boolean isCompressLargeMessages()
{
return session.isCompressLargeMessages();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/FilterImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/FilterImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/FilterImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -79,7 +79,7 @@
*/
public static Filter createFilter(final String filterStr) throws HornetQException
{
- return FilterImpl.createFilter(SimpleString.toSimpleString(filterStr));
+ return FilterImpl.createFilter(SimpleString.toSimpleString(filterStr == null ? null
: filterStr.trim()));
}
/**
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java 2011-04-20 22:22:34
UTC (rev 10549)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java 2011-04-21 22:08:46
UTC (rev 10550)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.persistence.StorageManager;
/**
*
@@ -29,7 +30,7 @@
void write(PagedMessage message) throws Exception;
- List<PagedMessage> read() throws Exception;
+ List<PagedMessage> read(StorageManager storage) throws Exception;
void setLiveCache(LivePageCache pageCache);
@@ -43,5 +44,5 @@
void close() throws Exception;
- boolean delete() throws Exception;
+ boolean delete(PagedMessage[] messages) throws Exception;
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java 2011-04-20
22:22:34 UTC (rev 10549)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -108,7 +108,7 @@
System.out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(sm);
page.close();
int msgID = 0;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCache.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCache.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -34,6 +34,8 @@
void setMessages(PagedMessage[] messages);
+ PagedMessage[] getMessages();
+
/**
* If this cache is still being updated
* @return
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -19,6 +19,7 @@
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.server.LargeServerMessage;
/**
* This is the same as PageCache, however this is for the page that's being currently
written.
@@ -132,6 +133,10 @@
*/
public synchronized void addLiveMessage(PagedMessage message)
{
+ if (message.getMessage().isLargeMessage())
+ {
+ ((LargeServerMessage)message.getMessage()).incrementDelayDeletionCount();
+ }
this.messages.add(message);
}
@@ -143,7 +148,15 @@
this.isLive = false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getMessages()
+ */
+ public PagedMessage[] getMessages()
+ {
+ return messages.toArray(new PagedMessage[messages.size()]);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -133,6 +133,14 @@
return "PageCacheImpl::page=" + page.getPageId() + "
numberOfMessages = " + messages.length;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getMessages()
+ */
+ public PagedMessage[] getMessages()
+ {
+ return messages;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -143,6 +143,74 @@
return getPageCache(pos.getPageNr());
}
+ public PageCache getPageCache(final long pageId)
+ {
+ try
+ {
+ boolean needToRead = false;
+ PageCache cache = null;
+ synchronized (softCache)
+ {
+ if (pageId > pagingStore.getCurrentWritingPage())
+ {
+ return null;
+ }
+
+ cache = softCache.get(pageId);
+ if (cache == null)
+ {
+ if (!pagingStore.checkPage((int)pageId))
+ {
+ return null;
+ }
+
+ cache = createPageCache(pageId);
+ needToRead = true;
+ // anyone reading from this cache will have to wait reading to finish
first
+ // we also want only one thread reading this cache
+ cache.lock();
+ softCache.put(pageId, cache);
+ }
+ }
+
+ // Reading is done outside of the synchronized block, however
+ // the page stays locked until the entire reading is finished
+ if (needToRead)
+ {
+ Page page = null;
+ try
+ {
+ page = pagingStore.createPage((int)pageId);
+
+ page.open();
+
+ List<PagedMessage> pgdMessages = page.read(storageManager);
+ cache.setMessages(pgdMessages.toArray(new
PagedMessage[pgdMessages.size()]));
+ }
+ finally
+ {
+ try
+ {
+ if (page != null)
+ {
+ page.close();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ }
+ cache.unlock();
+ }
+ }
+
+ return cache;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Couldn't complete paging due to an IO
Exception on Paging - " + e.getMessage(), e);
+ }
+ }
+
public void addPageCache(PageCache cache)
{
synchronized (softCache)
@@ -337,9 +405,31 @@
{
for (Page depagedPage : depagedPages)
{
- depagedPage.delete();
+ PageCache cache;
+ PagedMessage[] pgdMessages;
synchronized (softCache)
{
+ cache = softCache.remove((long)depagedPage.getPageId());
+ }
+
+ if (cache == null)
+ {
+ // The page is not on cache any more
+ // We need to read the page-file before deleting it
+ // to make sure we remove any large-messages pending
+ depagedPage.open();
+ List<PagedMessage> pgdMessagesList =
depagedPage.read(storageManager);
+ depagedPage.close();
+ pgdMessages = pgdMessagesList.toArray(new
PagedMessage[pgdMessagesList.size()]);
+ }
+ else
+ {
+ pgdMessages = cache.getMessages();
+ }
+
+ depagedPage.delete(pgdMessages);
+ synchronized (softCache)
+ {
softCache.remove((long)depagedPage.getPageId());
}
}
@@ -422,79 +512,6 @@
}
- private PageCache getPageCache(final long pageId)
- {
- try
- {
- boolean needToRead = false;
- PageCache cache = null;
- synchronized (softCache)
- {
- if (pageId > pagingStore.getCurrentWritingPage())
- {
- return null;
- }
-
- cache = softCache.get(pageId);
- if (cache == null)
- {
- if (!pagingStore.checkPage((int)pageId))
- {
- return null;
- }
-
- cache = createPageCache(pageId);
- needToRead = true;
- // anyone reading from this cache will have to wait reading to finish
first
- // we also want only one thread reading this cache
- cache.lock();
- softCache.put(pageId, cache);
- }
- }
-
- // Reading is done outside of the synchronized block, however
- // the page stays locked until the entire reading is finished
- if (needToRead)
- {
- Page page = null;
- try
- {
- page = pagingStore.createPage((int)pageId);
-
- page.open();
-
- List<PagedMessage> pgdMessages = page.read();
-
- for (PagedMessage pdgMessage : pgdMessages)
- {
- pdgMessage.initMessage(storageManager);
- }
- cache.setMessages(pgdMessages.toArray(new
PagedMessage[pgdMessages.size()]));
- }
- finally
- {
- try
- {
- if (page != null)
- {
- page.close();
- }
- }
- catch (Throwable ignored)
- {
- }
- cache.unlock();
- }
- }
-
- return cache;
- }
- catch (Exception e)
- {
- throw new RuntimeException("Couldn't complete paging due to an IO
Exception on Paging - " + e.getMessage(), e);
- }
- }
-
// Inner classes -------------------------------------------------
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -231,8 +232,8 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- // PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() +
- // " now since it's the current page");
+ PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() +
+ " now since it's the current page");
}
else
{
@@ -846,7 +847,7 @@
private final long pageId;
// Confirmed ACKs on this page
- private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
+ private final Set<PagePosition> acks = Collections.synchronizedSet(new
LinkedHashSet<PagePosition>());
private WeakReference<PageCache> cache;
@@ -934,8 +935,6 @@
public void addACK(final PagePosition posACK)
{
- removedReferences.add(posACK);
- acks.add(posACK);
if (isTrace)
{
@@ -944,11 +943,14 @@
(confirmed.get() + 1) +
" pendingTX = " + pendingTX +
", page = " +
- pageId);
+ pageId + " posACK = " + posACK);
}
+ removedReferences.add(posACK);
+ boolean added = acks.add(posACK);
+
// Negative could mean a bookmark on the first element for the page (example
-1)
- if (posACK.getMessageNr() >= 0)
+ if (added && posACK.getMessageNr() >= 0)
{
confirmed.incrementAndGet();
checkDone();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -28,6 +28,7 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.utils.DataConstants;
/**
@@ -101,7 +102,7 @@
this.pageCache = pageCache;
}
- public List<PagedMessage> read() throws Exception
+ public List<PagedMessage> read(StorageManager storage) throws Exception
{
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
@@ -140,6 +141,7 @@
// constraint was already checked
throw new IllegalStateException("Internal error, it wasn't
possible to locate END_BYTE " + b);
}
+ msg.initMessage(storage);
messages.add(msg);
}
else
@@ -218,13 +220,29 @@
file.close();
}
- public boolean delete() throws Exception
+ public boolean delete(final PagedMessage[] messages) throws Exception
{
if (storageManager != null)
{
storageManager.pageDeleted(storeName, pageId);
}
+ if (messages != null)
+ {
+ for (PagedMessage msg : messages)
+ {
+ if (msg.getMessage().isLargeMessage())
+ {
+ LargeServerMessage lmsg = (LargeServerMessage)msg.getMessage();
+
+ // Remember, cannot call delete directly here
+ // Because the large-message may be linked to another message
+ // or it may still being delivered even though it has been acked already
+ lmsg.decrementDelayDeletionCount();
+ }
+ }
+ }
+
try
{
if (suspiciousRecords)
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -79,9 +79,11 @@
{
if (largeMessageLazyData != null)
{
- message = storage.createLargeMessage();
+ LargeServerMessage lgMessage = storage.createLargeMessage();
+ message = lgMessage;
HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
message.decodeHeadersAndProperties(buffer);
+ lgMessage.incrementDelayDeletionCount();
largeMessageLazyData = null;
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -462,13 +462,12 @@
currentPage = createPage(currentPageId);
currentPage.open();
- List<PagedMessage> messages = currentPage.read();
+ List<PagedMessage> messages = currentPage.read(storageManager);
LivePageCache pageCache = new LivePageCacheImpl(currentPage);
for (PagedMessage msg : messages)
{
- msg.initMessage(storageManager);
pageCache.addLiveMessage(msg);
}
@@ -646,7 +645,7 @@
{
stopPaging();
returnPage.open();
- returnPage.delete();
+ returnPage.delete(null);
// This will trigger this address to exit the page mode,
// and this will make HornetQ start using the journal again
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -129,11 +129,6 @@
public static final byte SECURITY_RECORD = 26;
- // type + expiration + timestamp + priority
- public static final int SIZE_FIELDS = DataConstants.SIZE_INT + DataConstants.SIZE_LONG
+
- DataConstants.SIZE_LONG +
- DataConstants.SIZE_BYTE;
-
// Message journal record types
public static final byte ADD_LARGE_MESSAGE = 30;
@@ -1142,12 +1137,24 @@
continue;
}
+
+ // Redistribution could install a Redistributor while we are still loading
records, what will be an issue with prepared ACKs
+ // We make sure te Queue is paused before we reroute values.
+ queue.pause();
Collection<AddMessageRecord> valueRecords = queueRecords.values();
+
+ long currentTime = System.currentTimeMillis();
for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
+
+ if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <=
currentTime)
+ {
+ scheduledDeliveryTime = 0;
+ record.message.removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+ }
if (scheduledDeliveryTime != 0)
{
@@ -1217,6 +1224,11 @@
{
messageJournal.perfBlast(perfBlastPages);
}
+
+ for (Queue queue : queues.values())
+ {
+ queue.resume();
+ }
if (System.getProperty("org.hornetq.opt.directblast") != null)
{
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
@@ -479,6 +480,16 @@
session.addMetaData(message.getKey(), message.getData());
break;
}
+ case PacketImpl.SESS_ADD_METADATA2:
+ {
+ SessionAddMetaDataMessageV2 message =
(SessionAddMetaDataMessageV2)packet;
+ if (message.isRequiresConfirmations())
+ {
+ response = new NullResponseMessage();
+ }
+ session.addMetaData(message.getKey(), message.getData());
+ break;
+ }
}
}
catch (HornetQXAException e)
@@ -576,6 +587,20 @@
}
}
+ public void closeListeners()
+ {
+ List<CloseListener> listeners = remotingConnection.removeCloseListeners();
+
+ for (CloseListener closeListener: listeners)
+ {
+ closeListener.connectionClosed();
+ if (closeListener instanceof FailureListener)
+ {
+ remotingConnection.removeFailureListener((FailureListener)closeListener);
+ }
+ }
+ }
+
public int transferConnection(final CoreRemotingConnection newConnection, final int
lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is
occurring- otherwise packets might get
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -235,17 +235,15 @@
response = new ReattachSessionResponseMessage(-1, false);
}
+ log.debug("Reattaching request from " +
connection.getRemoteAddress());
+
+
ServerSessionPacketHandler sessionHandler =
protocolManager.getSessionHandler(request.getName());
- if (!server.checkActivate())
+ if (!server.checkActivate() || sessionHandler == null)
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
- if (sessionHandler == null)
- {
- response = new ReattachSessionResponseMessage(-1, false);
- }
else
{
if (sessionHandler.getChannel().getConfirmationWindowSize() == -1)
@@ -253,7 +251,10 @@
// Even though session exists, we can't reattach since confi window
size == -1,
// i.e. we don't have a resend cache for commands, so we just close
the old session
// and let the client recreate
+
+ log.warn("Reattach request from " +
connection.getRemoteAddress() + " failed as there is no confirmationWindowSize
configured, which may be ok for your system");
+ sessionHandler.closeListeners();
sessionHandler.close();
response = new ReattachSessionResponseMessage(-1, false);
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -41,6 +41,8 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.REPLICATION_PREPARE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@@ -81,12 +83,9 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -117,6 +116,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -519,6 +519,11 @@
packet = new SessionAddMetaDataMessage();
break;
}
+ case SESS_ADD_METADATA2:
+ {
+ packet = new SessionAddMetaDataMessageV2();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -186,6 +186,8 @@
public static final byte SESS_ADD_METADATA = 104;
+ public static final byte SESS_ADD_METADATA2 = 105;
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -14,12 +14,18 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
/**
*
- * A Ping
+ * Ping is sent on the client side at {@link ClientSessionFactoryImpl}
+ * At the server's side is treated at {@link RemotingServiceImpl}
*
+ * @See {@link RemotingConnection#checkDataReceived()}
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -18,6 +18,8 @@
/**
* A SessionAddMetaDataMessage
+ *
+ * Packet deprecated: It exists only to support old formats
*
* @author <a href="mailto:hgao@redhat.com>Howard Gao</a>
*
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
(rev 0)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * A SessionAddMetaDataMessage
+ *
+ * This packet replaces {@link SessionAddMetaDataMessage}
+ *
+ * @author Clebert Suconic</a>
+ *
+ *
+ */
+public class SessionAddMetaDataMessageV2 extends PacketImpl
+{
+ private String key;
+ private String data;
+ /**
+ * It won require confirmation during failover / reconnect
+ */
+ private boolean requiresConfirmation = true;
+
+ public SessionAddMetaDataMessageV2()
+ {
+ super(PacketImpl.SESS_ADD_METADATA2);
+ }
+
+ public SessionAddMetaDataMessageV2(String k, String d)
+ {
+ this();
+ key = k;
+ data = d;
+ }
+
+ public SessionAddMetaDataMessageV2(String k, String d, boolean requiresConfirmation)
+ {
+ this();
+ key = k;
+ data = d;
+ this.requiresConfirmation = requiresConfirmation;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(key);
+ buffer.writeString(data);
+ buffer.writeBoolean(requiresConfirmation);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ key = buffer.readString();
+ data = buffer.readString();
+ requiresConfirmation = buffer.readBoolean();
+ }
+
+ @Override
+ public final boolean isRequiresConfirmations()
+ {
+ return requiresConfirmation;
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public String getData()
+ {
+ return data;
+ }
+
+}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -533,7 +533,7 @@
{
if (deletePages)
{
- page.delete();
+ page.delete(null);
}
}
else
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -749,7 +749,7 @@
if(clusterConnections.containsKey(config.getName()))
{
- log.warn("Clustwer Configuration '" + config.getConnectorName() +
"' already exists. The cluster connection will not be deployed.");
+ log.warn("Cluster Configuration '" + config.getConnectorName() +
"' already exists. The cluster connection will not be deployed.", new
Exception ("trace"));
return;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -416,7 +416,7 @@
if (interrupted)
{
interrupted = false;
- throw new IOException(new InterruptedException());
+ throw new IOException("Lock was interrupted");
}
}
while (lock == null);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -898,8 +898,35 @@
acknowledge(tx, messageReference);
count++;
}
+
+
+ if (pageIterator != null)
+ {
+ // System.out.println("QueueMemorySize before depage = " +
queueMemorySize.get());
+ while (pageIterator.hasNext())
+ {
+ PagedReference reference = pageIterator.next();
+ pageIterator.remove();
+
+ if (filter == null || filter.match(reference.getMessage()))
+ {
+ count++;
+ pageSubscription.ack(reference);
+ }
+ else
+ {
+ addTail(reference, false);
+ }
+ }
+ }
tx.commit();
+
+
+ if (filter != null && pageIterator != null)
+ {
+ scheduleDepage();
+ }
return count;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -447,6 +447,11 @@
{
run();
}
+
+ public String toString()
+ {
+ return "Temporary Cleaner for queue " + bindingName;
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -506,7 +506,7 @@
}
}
- public void afterRollback()
+ public synchronized void afterRollback()
{
if (operations != null)
{
@@ -517,7 +517,7 @@
}
}
- public void beforeCommit() throws Exception
+ public synchronized void beforeCommit() throws Exception
{
if (operations != null)
{
@@ -528,7 +528,7 @@
}
}
- public void beforePrepare() throws Exception
+ public synchronized void beforePrepare() throws Exception
{
if (operations != null)
{
@@ -539,7 +539,7 @@
}
}
- public void beforeRollback() throws Exception
+ public synchronized void beforeRollback() throws Exception
{
if (operations != null)
{
@@ -550,7 +550,7 @@
}
}
- public void afterPrepare()
+ public synchronized void afterPrepare()
{
if (operations != null)
{
Property changes on: trunk/hornetq-core-client
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-jboss-as-integration
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-jms
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-jms-client
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-logging
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-ra
___________________________________________________________________
Added: svn:ignore
+ .project
Property changes on: trunk/hornetq-ra/hornetq-ra-jar
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Modified:
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
---
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -467,7 +467,7 @@
//
if (xaResource == null)
{
- xaResource = xaSession.getXAResource();
+ xaResource = new HornetQRAXAResource(this, xaSession.getXAResource());
}
if (HornetQRAManagedConnection.trace)
@@ -475,7 +475,6 @@
HornetQRAManagedConnection.log.trace("XAResource=" + xaResource);
}
- xaResource = new HornetQRAXAResource(this, xaResource);
return xaResource;
}
Modified:
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAProperties.java
===================================================================
---
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAProperties.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAProperties.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -13,6 +13,7 @@
package org.hornetq.ra;
import java.io.Serializable;
+import java.util.Hashtable;
import org.hornetq.core.logging.Logger;
@@ -60,6 +61,10 @@
private long setupInterval = DEFAULT_SETUP_INTERVAL;
+ private Hashtable<?,?> jndiParams;
+
+ private boolean useJNDI;
+
/**
* Constructor
*/
@@ -127,7 +132,37 @@
this.password = password;
}
+ /**
+ * @return the useJNDI
+ */
+ public boolean isUseJNDI()
+ {
+ return useJNDI;
+ }
+
/**
+ * @param value the useJNDI to set
+ */
+ public void setUseJNDI(final boolean value)
+ {
+ useJNDI = value;
+ }
+
+ /**
+ *
+ * @return return the jndi params to use
+ */
+ public Hashtable<?,?> getParsedJndiParams()
+ {
+ return jndiParams;
+ }
+
+
+ public void setParsedJndiParams(Hashtable<?,?> params)
+ {
+ jndiParams = params;
+ }
+ /**
* Get the use XA flag
* @return The value
*/
@@ -202,4 +237,5 @@
return "HornetQRAProperties[localTx=" + localTx +
", userName=" + userName + ", password=" + password +
"]";
}
+
}
Modified:
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
---
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -14,6 +14,7 @@
import java.io.Serializable;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -103,6 +104,8 @@
private TransactionManager tm;
+ private String unparsedJndiParams;
+
/**
* Constructor
*/
@@ -1090,7 +1093,42 @@
raProperties.setPassword(password);
}
+ /**
+ * @return the useJNDI
+ */
+ public boolean isUseJNDI()
+ {
+ return raProperties.isUseJNDI();
+ }
+
/**
+ * @param value the useJNDI to set
+ */
+ public void setUseJNDI(final boolean value)
+ {
+ raProperties.setUseJNDI(value);
+ }
+
+ /**
+ *
+ * @return return the jndi params to use
+ */
+ public String getJndiParams()
+ {
+ return unparsedJndiParams;
+ }
+
+ public void setJndiParams(String jndiParams)
+ {
+ unparsedJndiParams = jndiParams;
+ raProperties.setParsedJndiParams(Util.parseHashtableConfig(jndiParams));
+ }
+
+ public Hashtable<?,?> getParsedJndiParams()
+ {
+ return raProperties.getParsedJndiParams();
+ }
+ /**
* Get the client ID
*
* @return The value
@@ -1648,5 +1686,4 @@
cf.setConnectionLoadBalancingPolicyClassName(val5);
}
}
-
}
Modified: trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/Util.java
===================================================================
--- trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/Util.java 2011-04-20
22:22:34 UTC (rev 10549)
+++ trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/Util.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -13,10 +13,7 @@
package org.hornetq.ra;
import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import javax.naming.Context;
import javax.transaction.TransactionManager;
@@ -171,11 +168,37 @@
* @return the object
* @throws Exception for any error
*/
- public static Object lookup(final Context context, final String name, final Class
clazz) throws Exception
+ public static Object lookup(final Context context, final String name, final
Class<?> clazz) throws Exception
{
return context.lookup(name);
}
+ /**
+ * Used on parsing JNDI Configuration
+ * @param config
+ * @return
+ */
+ public static Hashtable<?,?> parseHashtableConfig(final String config)
+ {
+ Hashtable<String,String> hashtable = new Hashtable<String, String>();
+
+ String[] topElements = config.split(";");
+
+ for (String element : topElements)
+ {
+ String expression[] = element.split("=");
+
+ if (expression.length != 2)
+ {
+ throw new IllegalArgumentException("Invalid expression " + element
+ " at " + config);
+ }
+
+ hashtable.put(expression[0].trim(), expression[1].trim());
+ }
+
+ return hashtable;
+ }
+
public static List<Map<String, Object>> parseConfig(final String config)
{
List<Map<String, Object>> result =new ArrayList<Map<String,
Object>>();
Modified:
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
---
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -397,7 +397,15 @@
if (spec.isUseJNDI())
{
- Context ctx = new InitialContext();
+ Context ctx;
+ if(spec.getParsedJndiParams() == null)
+ {
+ ctx = new InitialContext();
+ }
+ else
+ {
+ ctx = new InitialContext(spec.getParsedJndiParams());
+ }
HornetQActivation.log.debug("Using context " + ctx.getEnvironment() +
" for " + spec);
if (HornetQActivation.trace)
{
Modified:
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java
===================================================================
---
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -13,6 +13,7 @@
package org.hornetq.ra.inflow;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -84,8 +85,12 @@
/** Transaction timeout */
private Integer transactionTimeout;
- private boolean useJNDI = true;
+ private Boolean useJNDI = true;
+ private String jndiParams = null;
+
+ private Hashtable parsedJndiParams;
+
/* use local tx instead of XA*/
private Boolean localTx;
@@ -137,6 +142,10 @@
*/
public boolean isUseJNDI()
{
+ if(useJNDI == null)
+ {
+ return ra.isUseJNDI();
+ }
return useJNDI;
}
@@ -149,6 +158,34 @@
}
/**
+ *
+ * @return return the jndi params to use
+ */
+ public String getJndiParams()
+ {
+ if(jndiParams == null)
+ {
+ return ra.getJndiParams();
+ }
+ return jndiParams;
+ }
+
+ public void setJndiParams(String jndiParams)
+ {
+ this.jndiParams = jndiParams;
+ parsedJndiParams = Util.parseHashtableConfig(jndiParams);
+ }
+
+ public Hashtable<?,?> getParsedJndiParams()
+ {
+ if(parsedJndiParams == null)
+ {
+ return ra.getParsedJndiParams();
+ }
+ return parsedJndiParams;
+ }
+
+ /**
* Set the resource adapter
* @param ra The resource adapter
* @exception ResourceException Thrown if incorrect resource adapter
Property changes on: trunk/hornetq-ra/hornetq-ra-rar
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Modified: trunk/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml
===================================================================
--- trunk/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml 2011-04-20 22:22:34 UTC (rev
10549)
+++ trunk/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml 2011-04-21 22:08:46 UTC (rev
10550)
@@ -83,6 +83,12 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
+ <description>The jndi params to use to look up the jms resources if local
jndi is not to be used</description>
+ <config-property-name>JndiParams</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+
<config-property-value>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory;java.naming.provider.url=jnp://localhost:1199;java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces</config-property-value>
+ </config-property>
+ <config-property>
<description>The discovery group address</description>
<config-property-name>DiscoveryAddress</config-property-name>
<config-property-type>java.lang.String</config-property-type>
Property changes on: trunk/hornetq-rest
___________________________________________________________________
Added: svn:ignore
+ .project
Property changes on: trunk/hornetq-rest/hornetq-rest
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-service-sar
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-spring-integration
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/hornetq-twitter-integration
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/tests
___________________________________________________________________
Modified: svn:ignore
- build
logs
output
*.log
ObjectStore
+ build
logs
output
*.log
ObjectStore
.project
Property changes on: trunk/tests/concurrent-tests
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/tests/integration-tests
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -19,11 +19,13 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -30,8 +30,8 @@
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CoreClientTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DurableQueueTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DurableQueueTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DurableQueueTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -16,10 +16,11 @@
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
/**
* A TemporaryQueueTest
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -20,12 +20,13 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -23,6 +23,7 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
@@ -32,6 +33,7 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.jms.server.management.NullInitialContext;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
/**
*
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -18,8 +18,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.DataConstants;
public class InVMNonPersistentMessageBufferTest extends ServiceTestBase
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -14,13 +14,14 @@
package org.hornetq.tests.integration.client;
import static org.hornetq.tests.util.RandomUtil.randomString;
-
-import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.SpawnedVMSupport;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
@@ -36,10 +37,12 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.version.impl.VersionImpl;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.VersionLoader;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.client;
+import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import javax.transaction.xa.XAResource;
@@ -31,7 +32,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -139,17 +140,16 @@
}
}
-
public void testLargeBufferTransacted() throws Exception
{
doTestLargeBuffer(true);
}
-
+
public void testLargeBufferNotTransacted() throws Exception
{
doTestLargeBuffer(false);
}
-
+
public void doTestLargeBuffer(boolean transacted) throws Exception
{
final int journalsize = 100 * 1024;
@@ -162,10 +162,10 @@
{
Configuration config = createDefaultConfig(isNetty());
config.setJournalFileSize(journalsize);
-
+
config.setJournalBufferSize_AIO(10 * 1024);
config.setJournalBufferSize_NIO(10 * 1024);
-
+
server = createServer(true, config);
server.start();
@@ -179,11 +179,10 @@
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
Message clientFile = session.createMessage(true);
- for (int i = 0 ; i < messageSize; i++)
+ for (int i = 0; i < messageSize; i++)
{
clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
}
-
producer.send(clientFile);
@@ -197,26 +196,24 @@
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
ClientMessage msg1 = consumer.receive(5000);
assertNotNull(msg1);
-
+
Assert.assertNotNull(msg1);
-
- for (int i = 0 ; i < messageSize; i++)
+
+ for (int i = 0; i < messageSize; i++)
{
- //System.out.print(msg1.getBodyBuffer().readByte() + " ");
- //if (i % 100 == 0) System.out.println();
- assertEquals("position = " + i, getSamplebyte(i),
msg1.getBodyBuffer().readByte());
+ // System.out.print(msg1.getBodyBuffer().readByte() + " ");
+ // if (i % 100 == 0) System.out.println();
+ assertEquals("position = " + i, getSamplebyte(i),
msg1.getBodyBuffer().readByte());
}
-
+
msg1.acknowledge();
-
+
consumer.close();
-
-
+
if (transacted)
{
session.commit();
}
-
session.close();
@@ -880,7 +877,6 @@
producer2.send(msg1);
-
session.commit();
ClientMessage msg2 = consumer2.receive(10000);
@@ -939,9 +935,8 @@
server.start();
-
locator.setMinLargeMessageSize(200);
-
+
locator.setCacheLargeMessagesClient(true);
ClientSessionFactory sf = locator.createSessionFactory();
@@ -968,19 +963,19 @@
session.commit();
compareString(messageSize, msgReceived);
-
+
msgReceived.getBodyBuffer().readerIndex(0);
-
+
producer.send(msgReceived);
session.commit();
-
+
ClientMessage msgReceived2 = consumer.receive(10000);
msgReceived2.acknowledge();
compareString(messageSize, msgReceived2);
-
+
session.commit();
session.close();
@@ -1016,7 +1011,7 @@
assertNotNull(msg);
for (long i = 0; i < messageSize; i++)
{
- Assert.assertEquals("position " + i, UnitTestCase.getSamplebyte(i),
msg.getBodyBuffer().readByte());
+ Assert.assertEquals("position " + i, UnitTestCase.getSamplebyte(i),
msg.getBodyBuffer().readByte());
}
}
@@ -2365,7 +2360,6 @@
server.start();
-
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
@@ -2455,7 +2449,298 @@
}
}
}
+
+ // JBPAPP-6237
+ public void testPageOnLargeMessageMultipleQueues() throws Exception
+ {
+ Configuration config = createDefaultConfig(isNetty());
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ HashMap<String, AddressSettings> map = new HashMap<String,
AddressSettings>();
+
+ AddressSettings value = new AddressSettings();
+ map.put(LargeMessageTest.ADDRESS.toString(), value);
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ final int numberOfBytes = 1024;
+
+ final int numberOfBytesBigMessage = 400000;
+
+ try
+ {
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
+
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
+
+ message.getBodyBuffer().writerIndex(0);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
+ }
+
+ producer.send(message);
+ }
+
+ ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
+
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ server.stop();
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
+
+ ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ session.commit();
+
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+
+ Assert.assertNotNull(messageLarge);
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ messageLarge.acknowledge();
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
+ }
+
+ if (i < 4)
+ session.rollback();
+ else
+ session.commit();
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ }
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+ // JBPAPP-6237
+ public void testPageOnLargeMessageMultipleQueues2() throws Exception
+ {
+ Configuration config = createDefaultConfig(isNetty());
+
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ HashMap<String, AddressSettings> map = new HashMap<String,
AddressSettings>();
+
+ AddressSettings value = new AddressSettings();
+ map.put(LargeMessageTest.ADDRESS.toString(), value);
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ final int numberOfBytes = 1024;
+
+ final int numberOfBytesBigMessage = 400000;
+
+ try
+ {
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setCompressLargeMessage(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ int msgId = 0;
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("msgID", msgId++);
+
+ message.putBooleanProperty("TestLarge", false);
+
+ message.getBodyBuffer().writerIndex(0);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
+ }
+
+ producer.send(message);
+ }
+
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
+ }
+
+ session.close();
+
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
+
+ ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+
+ session.start();
+
+ for (int received = 0 ; received < 5; received++)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ assertFalse(message2.getBooleanProperty("TestLarge"));
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(messageLarge);
+
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+
+ messageLarge.acknowledge();
+
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
+ }
+ }
+
+ session.rollback();
+ }
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ }
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testSendStreamingSingleMessage() throws Exception
{
ClientSession session = null;
@@ -2469,7 +2754,6 @@
server.start();
-
locator.setMinLargeMessageSize(100 * 1024);
ClientSessionFactory sf = locator.createSessionFactory();
@@ -2629,18 +2913,17 @@
try
{
LargeServerMessageImpl fileMessage = new
LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
-
+
fileMessage.setMessageID(1005);
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
{
fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
}
-
+
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, LARGE_MESSAGE_SIZE);
-
fileMessage.releaseResources();
session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageConcurrencyTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageConcurrencyTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -21,8 +21,8 @@
import org.hornetq.api.core.client.*;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
/**
*
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageDurabilityTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -19,8 +19,8 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.*;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessagePriorityTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -17,14 +17,20 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -262,6 +268,60 @@
}
+ public void testManyMessages() throws Exception
+ {
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString address = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+
+ for (int i = 0 ; i < 777; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.setPriority((byte)5);
+ msg.putBooleanProperty("fast", false);
+ producer.send(msg);
+ }
+
+ for (int i = 0 ; i < 333; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.setPriority((byte)6);
+ msg.putBooleanProperty("fast", true);
+ producer.send(msg);
+ }
+
+ ClientConsumer consumer = session.createConsumer(queue);
+
+ session.start();
+
+
+ for (int i = 0 ; i < 333; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertTrue(msg.getBooleanProperty("fast"));
+ }
+
+ for (int i = 0 ; i < 777; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertFalse(msg.getBooleanProperty("fast"));
+ }
+
+ consumer.close();
+
+ session.deleteQueue(queue);
+
+ session.close();
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -36,14 +36,17 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingSyncTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -15,19 +15,44 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.ServiceTestBase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -2934,7 +2934,260 @@
}
+ public void testTwoQueuesDifferentFilters() throws Exception
+ {
+ boolean persistentMessages = true;
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 200;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ // note: if you want to change this, numberOfMessages has to be a multiple of
NQUEUES
+ int NQUEUES = 2;
+
+
+ for (int i = 0 ; i < NQUEUES; i++)
+ {
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i),
true);
+ }
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % NQUEUES);
+ message.putIntProperty("id", i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ for (int nqueue = 0; nqueue < NQUEUES; nqueue++)
+ {
+ ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue));
+
+ for (int i = 0; i < (numberOfMessages /NQUEUES); i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ assertEquals(nqueue,
message.getIntProperty("propTest").intValue());
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ session.commit();
+ }
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.getCursorProvier().cleanup();
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (store.isPaging() && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(100);
+ }
+
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
+ public void testTwoQueues() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=1"), null, true);
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=2"), null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ for (int msg = 1; msg <= 2; msg++)
+ {
+ ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=" + msg));
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ //assertEquals(msg,
message.getIntProperty("propTest").intValue());
+
+ System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+ }
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.getCursorProvier().cleanup();
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (store.isPaging() && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(100);
+ }
+
+ store.getCursorProvier().cleanup();
+
+ Thread.sleep(1000);
+
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerCloseTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -18,11 +18,13 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -38,8 +38,8 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/QueueBrowserTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -15,6 +15,7 @@
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/RequestorTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -18,14 +18,22 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientRequestor;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -56,7 +64,7 @@
{
final SimpleString key = RandomUtil.randomSimpleString();
long value = RandomUtil.randomLong();
- SimpleString requestAddress = RandomUtil.randomSimpleString();
+ SimpleString requestAddress = new SimpleString("AdTest");
SimpleString requestQueue = RandomUtil.randomSimpleString();
final ClientSession session = sf.createSession(false, true, true);
@@ -76,9 +84,60 @@
Assert.assertNotNull("reply was not received", reply);
Assert.assertEquals(value, reply.getObjectProperty(key));
+ Thread.sleep(5000);
session.close();
}
+ public void testManyRequestsOverBlocked() throws Exception
+ {
+ final SimpleString key = RandomUtil.randomSimpleString();
+ long value = RandomUtil.randomLong();
+
+ AddressSettings settings = new AddressSettings();
+ settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ settings.setMaxSizeBytes(1024);
+ service.getAddressSettingsRepository().addMatch("#", settings);
+
+ SimpleString requestAddress = new SimpleString("RequestAddress");
+
+ SimpleString requestQueue = new SimpleString("RequestAddress Queue");
+
+ final ClientSession sessionRequest = sf.createSession(false, true, true);
+
+ sessionRequest.createQueue(requestAddress, requestQueue);
+
+ sessionRequest.start();
+
+ ClientConsumer requestConsumer = sessionRequest.createConsumer(requestQueue);
+ requestConsumer.setMessageHandler(new SimpleMessageHandler(key, sessionRequest));
+
+
+ for (int i = 0 ; i < 2000; i++)
+ {
+ if (i % 100 == 0)
+ {
+ System.out.println(i);
+ }
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.start();
+
+ ClientRequestor requestor = new ClientRequestor(session, requestAddress);
+ ClientMessage request = session.createMessage(false);
+ request.putLongProperty(key, value);
+
+ ClientMessage reply = requestor.request(request, 5000);
+ Assert.assertNotNull("reply was not received", reply);
+ reply.acknowledge();
+ Assert.assertEquals(value, reply.getObjectProperty(key));
+ requestor.close();
+ session.close();
+ }
+
+ sessionRequest.close();
+
+ }
+
public void testTwoRequests() throws Exception
{
final SimpleString key = RandomUtil.randomSimpleString();
@@ -223,6 +282,7 @@
service.start();
locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator.setAckBatchSize(0);
sf = locator.createSessionFactory();
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SelfExpandingBufferTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SelfExpandingBufferTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SelfExpandingBufferTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -20,8 +20,8 @@
import org.hornetq.api.core.client.*;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -34,8 +34,8 @@
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
/**
*
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -23,7 +23,13 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
@@ -34,14 +40,15 @@
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
/**
* A TemporaryQueueTest
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Clebert Suconic
*/
public class TemporaryQueueTest extends ServiceTestBase
{
@@ -287,7 +294,7 @@
session.close();
}
- public void _testQueueWithWildcard3() throws Exception
+ public void testQueueWithWildcard3() throws Exception
{
session.createQueue("a.b", "queue1");
session.createTemporaryQueue("a.#", "queue2");
@@ -322,7 +329,41 @@
session2.close();
}
+
+ public void testRecreateConsumerOverServerFailure() throws Exception
+ {
+ ServerLocator serverWithReattach = createLocator();
+ serverWithReattach.setReconnectAttempts(-1);
+ serverWithReattach.setRetryInterval(1000);
+ serverWithReattach.setConfirmationWindowSize(-1);
+ ClientSessionFactory reattachSF = serverWithReattach.createSessionFactory();
+
+ ClientSession session = reattachSF.createSession(false, false);
+ session.createTemporaryQueue("tmpAd", "tmpQ");
+ ClientConsumer consumer = session.createConsumer("tmpQ");
+
+ ClientProducer prod = session.createProducer("tmpAd");
+
+ session.start();
+
+ RemotingConnectionImpl conn =
(RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+ conn.fail(new HornetQException(HornetQException.IO_ERROR));
+
+ prod.send(session.createMessage(false));
+ session.commit();
+
+ assertNotNull(consumer.receive(1000));
+
+ session.close();
+
+ reattachSF.close();
+
+ serverWithReattach.close();
+
+
+ }
+
public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
{
session.close();
@@ -415,12 +456,18 @@
server = createServer(false, configuration);
server.start();
- locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
- locator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+ locator = createLocator();
sf = locator.createSessionFactory();
session = sf.createSession(false, true, true);
}
+ protected ServerLocator createLocator()
+ {
+ ServerLocator retlocator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ retlocator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+ return retlocator;
+ }
+
@Override
protected void tearDown() throws Exception
{
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.clientcrash;
@@ -76,7 +76,7 @@
session.start();
// receive a message from the queue
- Message messageFromClient = consumer.receive(10000);
+ Message messageFromClient = consumer.receive(500000);
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT,
messageFromClient.getBodyBuffer().readString());
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -20,6 +20,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -38,6 +38,7 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -13,7 +13,18 @@
package org.hornetq.tests.integration.cluster.distribution;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
@@ -333,6 +344,118 @@
verifyReceiveAll(20, 1);
}
+ public void testRedistributeWithScheduling() throws Exception
+ {
+ setupCluster(false);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setRedeliveryDelay(10000);
+ servers[0].getAddressSettingsRepository().addMatch("queues.testaddress",
setting);
+ servers[0].getAddressSettingsRepository().addMatch("queue0", setting);
+ servers[1].getAddressSettingsRepository().addMatch("queue0", setting);
+ servers[1].getAddressSettingsRepository().addMatch("queues.testaddress",
setting);
+
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+
+ ClientSession session0 = sfs[0].createSession(false, false, false);
+
+ ClientProducer prod0 = session0.createProducer("queues.testaddress");
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ ClientMessage msg = session0.createMessage(true);
+ msg.putIntProperty("key", i);
+
+ byte[] bytes = new byte[24];
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ bb.putLong((long)i);
+
+ msg.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
+
+ prod0.send(msg);
+
+ session0.commit();
+ }
+
+ session0.close();
+
+ session0 = sfs[0].createSession(true, false, false);
+
+ ClientConsumer consumer0 = session0.createConsumer("queue0");
+
+ session0.start();
+
+ ArrayList<Xid> xids = new ArrayList<Xid>();
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ Xid xid = newXID();
+
+ session0.start(xid, XAResource.TMNOFLAGS);
+
+ ClientMessage msg = consumer0.receive(5000);
+
+ msg.acknowledge();
+
+ session0.end(xid, XAResource.TMSUCCESS);
+
+ session0.prepare(xid);
+
+ xids.add(xid);
+ }
+
+ session0.close();
+
+ sfs[0].close();
+ sfs[0] = null;
+
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+
+ ClientSession session1 = sfs[1].createSession(false, false);
+ session1.start();
+ ClientConsumer consumer1 = session1.createConsumer("queue0");
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ session0 = sfs[0].createSession(true, false, false);
+
+ for (Xid xid: xids)
+ {
+ session0.rollback(xid);
+ }
+
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ ClientMessage msg = consumer1.receive(15000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session1.commit();
+
+ }
+
public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws
Exception
{
setupCluster(false);
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+/**
+ * A MessageRedistributionWithDiscoveryTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase
+{
+ private static final Logger log =
Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
+
+ protected static final String groupAddress = getUDPDiscoveryAddress();
+
+ protected static final int groupPort = getUDPDiscoveryPort();
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ setupCluster();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ for (int i = 0; i < servers.length; i++)
+ {
+ if (servers[i] != null)
+ {
+ servers[i].stop();
+ servers[i] = null;
+ }
+ }
+ super.tearDown();
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ for (int i = 0; i < 5; i++)
+ {
+ setServer(forwardWhenNoConsumers, i);
+ }
+ }
+
+ /**
+ * @param forwardWhenNoConsumers
+ */
+ protected void setServer(final boolean forwardWhenNoConsumers, int server)
+ {
+ setupLiveServerWithDiscovery(server,
+ SymmetricClusterWithDiscoveryTest.groupAddress,
+ SymmetricClusterWithDiscoveryTest.groupPort,
+ isFileStorage(),
+ isNetty(),
+ false);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setRedeliveryDelay(0);
+ setting.setRedistributionDelay(0);
+
+ servers[server].getAddressSettingsRepository().addMatch("#", setting);
+
+ setupDiscoveryClusterConnection("cluster" + server, server,
"dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ }
+
+ public void testRedistributeWithPreparedAndRestart() throws Exception
+ {
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+
+ ClientSession session0 = sfs[0].createSession(false, false, false);
+
+ ClientProducer prod0 = session0.createProducer("queues.testaddress");
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = session0.createMessage(true);
+
+ msg.putIntProperty("key", i);
+
+ prod0.send(msg);
+
+ session0.commit();
+ }
+
+ session0.close();
+
+ session0 = sfs[0].createSession(true, false, false);
+
+ ClientConsumer consumer0 = session0.createConsumer("queue0");
+
+ session0.start();
+
+ ArrayList<Xid> xids = new ArrayList<Xid>();
+
+ for (int i = 0; i < 100; i++)
+ {
+ Xid xid = newXID();
+
+ session0.start(xid, XAResource.TMNOFLAGS);
+
+ ClientMessage msg = consumer0.receive(5000);
+
+ msg.acknowledge();
+
+ session0.end(xid, XAResource.TMSUCCESS);
+
+ session0.prepare(xid);
+
+ xids.add(xid);
+ }
+
+ session0.close();
+
+ sfs[0].close();
+ sfs[0] = null;
+
+ servers[0].stop();
+ servers[0] = null;
+
+ setServer(false, 0);
+
+ startServers(1, 2);
+
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ ClientSession session1 = sfs[1].createSession(false, false);
+
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ session1.start();
+ ClientConsumer consumer1 = session1.createConsumer("queue0");
+
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ session0 = sfs[0].createSession(true, false, false);
+
+ for (Xid xid : xids)
+ {
+ session0.rollback(xid);
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = consumer1.receive(15000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session1.commit();
+
+ }
+
+}
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+
+/**
+ * Validating failover when the size of the message Size > flow Control &&
message Size < minLargeMessageSize
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class AlmostLargeAsynchronousFailoverTest extends AsynchronousFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void createConfigs() throws Exception
+ {
+ super.createConfigs();
+ liveServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ }
+
+ protected ServerLocatorInternal getServerLocator() throws Exception
+ {
+ ServerLocatorInternal locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(1024 * 1024);
+ locator.setProducerWindowSize(10 * 1024);
+ return locator;
+ }
+
+ protected void addPayload(ClientMessage message)
+ {
+ message.putBytesProperty("payload", new byte[20 * 1024]);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -243,6 +243,10 @@
DelegatingSession.debug = false;
}
}
+
+ protected void addPayload(ClientMessage msg)
+ {
+ }
private void doTestNonTransactional(final TestRunner runner) throws Exception
{
@@ -274,6 +278,8 @@
message.getBodyBuffer().writeString("message" + i);
message.putIntProperty("counter", i);
+
+ addPayload(message);
producer.send(message);
@@ -402,7 +408,10 @@
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new
SimpleString("id:" + i +
",exec:" +
executionId));
+
+ addPayload(message);
+
producer.send(message);
}
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * A FailoverOnFlowControlTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class FailoverOnFlowControlTest extends FailoverTestBase
+{
+
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testOverflowSend() throws Exception
+ {
+ ServerLocator locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ locator.setProducerWindowSize(1000);
+ final ArrayList<ClientSession> sessionList = new
ArrayList<ClientSession>();
+ Interceptor interceptorClient = new Interceptor()
+ {
+ AtomicInteger count = new AtomicInteger(0);
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ System.out.println("Intercept..." + packet.getClass().getName());
+
+ if (packet instanceof SessionProducerCreditsMessage )
+ {
+ SessionProducerCreditsMessage credit =
(SessionProducerCreditsMessage)packet;
+
+ System.out.println("Credits: " + credit.getCredits());
+ if (count.incrementAndGet() == 2)
+ {
+ try
+ {
+ crash(sessionList.get(0));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+ };
+
+ locator.addInterceptor(interceptorClient);
+
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ ClientSession session = sf.createSession(true, true);
+ sessionList.add(session);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.getBodyBuffer().writeBytes(new byte[5000]);
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ locator.close();
+ }
+
+
+ protected void createConfigs() throws Exception
+ {
+ super.createConfigs();
+ liveServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ }
+
+ protected ServerLocatorInternal getServerLocator() throws Exception
+ {
+ ServerLocatorInternal locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(1024 * 1024);
+ locator.setProducerWindowSize(10 * 1024);
+ return locator;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean
live)
+ {
+ return getInVMTransportAcceptorConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean
live)
+ {
+ return getInVMConnectorTransportConfiguration(live);
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -12,6 +12,17 @@
*/
package org.hornetq.tests.integration.cluster.failover;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -12,6 +12,16 @@
*/
package org.hornetq.tests.integration.cluster.failover;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.util.ServiceTestBase;
+
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+
+/**
+ * A NettyAsynchronousReattachTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class NettyAsynchronousReattachTest extends NettyAsynchronousFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+
+ protected void crash(final ClientSession... sessions) throws Exception
+ {
+ for (ClientSession session : sessions)
+ {
+ ClientSessionInternal internalSession = (ClientSessionInternal) session;
+ internalSession.getConnection().fail(new
HornetQException(HornetQException.NOT_CONNECTED, "oops"));
+ }
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -16,9 +16,9 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.tests.cluster.reattach.MultiThreadRandomReattachTestBase;
/**
*
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,1476 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.reattach;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQBytesMessage;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A MultiThreadRandomReattachTestBase
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public abstract class MultiThreadRandomReattachTestBase extends
MultiThreadReattachSupport
+{
+ private final Logger log = Logger.getLogger(getClass());
+
+ // Constants -----------------------------------------------------
+
+ private static final int RECEIVE_TIMEOUT = 30000;
+
+ private final int LATCH_WAIT = getLatchWait();
+
+ private final int NUM_THREADS = getNumThreads();
+
+ // Attributes ----------------------------------------------------
+ protected static final SimpleString ADDRESS = new
SimpleString("FailoverTestAddress");
+
+ protected HornetQServer liveServer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testA() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestA(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+
+ }
+
+ public void testB() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestB(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testC() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestC(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testD() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestD(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testE() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestE(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testF() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestF(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testG() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestG(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testH() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestH(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testI() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestI(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testJ() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestJ(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testK() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestK(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testL() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestL(sf);
+ }
+ }, NUM_THREADS, true, 10);
+ }
+
+ // public void testM() throws Exception
+ // {
+ // runTestMultipleThreads(new RunnableT()
+ // {
+ // public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ // {
+ // doTestM(sf, threadNum);
+ // }
+ // }, NUM_THREADS);
+ // }
+
+ public void testN() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestN(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ // Added do replicate HORNETQ-264
+ public void testO() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestO(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected abstract void start() throws Exception;
+
+ protected abstract void setBody(ClientMessage message) throws Exception;
+
+ protected abstract boolean checkSize(ClientMessage message);
+
+ protected ClientSession createAutoCommitSession(final ClientSessionFactory sf) throws
Exception
+ {
+ ClientSession session = sf.createSession(false, true, true);
+ session.addMetaData("someData", RandomUtil.randomString());
+ session.addMetaData("someData2", RandomUtil.randomString());
+ return session;
+ }
+
+ protected ClientSession createTransactionalSession(final ClientSessionFactory sf)
throws Exception
+ {
+ ClientSession session = sf.createSession(false, false, false);
+ session.addMetaData("someData", RandomUtil.randomString());
+ session.addMetaData("someData2", RandomUtil.randomString());
+
+ return session;
+ }
+
+ protected void doTestA(final ClientSessionFactory sf, final int threadNum, final
ClientSession session2) throws Exception
+ {
+ SimpleString subName = new SimpleString("sub" + threadNum);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null,
false);
+
+ ClientProducer producer =
session.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientConsumer consumer = session.createConsumer(subName);
+
+ final int numMessages = 100;
+
+ sendMessages(session, producer, numMessages, threadNum);
+
+ session.start();
+
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+
+ producer.close();
+
+ consumer.close();
+
+ session.deleteQueue(subName);
+
+ session.close();
+ }
+
+ protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createAutoCommitSession(sf);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createAutoCommitSession(sf);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+ sessSend.addMetaData("some-data", RandomUtil.randomString());
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ }
+
+ protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+ s.addMetaData("some-data", RandomUtil.randomString());
+
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createTransactionalSession(sf);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+ sessSend.addMetaData("some-data", RandomUtil.randomString());
+
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+
+ handler.reset();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+ s.addMetaData("some-data", RandomUtil.randomString());
+
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+ sessConsume.addMetaData("data", RandomUtil.randomString());
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+ sessSend.addMetaData("some-data", RandomUtil.randomString());
+
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ handlers.clear();
+
+ // Set handlers to null
+ for (ClientConsumer consumer : consumers)
+ {
+ consumer.setMessageHandler(null);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed on rollback: " +
handler.failure);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ // Now with synchronous receive()
+
+ protected void doTestE(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+ s.addMetaData("some-data", RandomUtil.randomString());
+
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+ sessConsume.addMetaData("some-data", RandomUtil.randomString());
+
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+ sessSend.addMetaData("some-data", RandomUtil.randomString());
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestF(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+ s.addMetaData("data", RandomUtil.randomString());
+
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+ sessConsume.addMetaData("data", RandomUtil.randomString());
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+ sessSend.addMetaData("data", RandomUtil.randomString());
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+ s.addMetaData("data", RandomUtil.randomString());
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+ sessConsume.addMetaData("data", RandomUtil.randomString());
+
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+ sessSend.addMetaData("data", RandomUtil.randomString());
+
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+ s.addMetaData("data", RandomUtil.randomString());
+
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+ sessConsume.addMetaData("data", RandomUtil.randomString());
+
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+ sessSend.addMetaData("data", RandomUtil.randomString());
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+ sessCreate.addMetaData("data", RandomUtil.randomString());
+
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+ sess.addMetaData("data", RandomUtil.randomString());
+
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+ sessCreate.addMetaData("data", RandomUtil.randomString());
+
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+ sess.addMetaData("data", RandomUtil.randomString());
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestK(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession s = sf.createSession(false, false, false);
+ s.addMetaData("data", RandomUtil.randomString());
+
+ s.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ consumer.close();
+ }
+
+ s.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ s.close();
+ }
+
+ /*
+ * This test tests failure during create connection
+ */
+ protected void doTestL(final ClientSessionFactory sf) throws Exception
+ {
+ final int numSessions = 100;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.addMetaData("data", RandomUtil.randomString());
+
+ session.close();
+ }
+ }
+
+ protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+ sess.addMetaData("data", RandomUtil.randomString());
+
+ sess.stop();
+
+ sess.start();
+
+ sess.stop();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestO(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ for (int i = 0; i < 100; i++)
+ {
+ Assert.assertNull(consumer.receiveImmediate());
+ }
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected int getLatchWait()
+ {
+ return 60000;
+ }
+
+ protected int getNumIterations()
+ {
+ return 2;
+ }
+
+ protected int getNumThreads()
+ {
+ return 10;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ log.info("************ Starting test " + getName());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (liveServer != null && liveServer.isStarted())
+ {
+ liveServer.stop();
+ }
+
+ liveServer = null;
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private void runTestMultipleThreads(final RunnableT runnable,
+ final int numThreads,
+ final boolean failOnCreateConnection) throws
Exception
+ {
+ runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
+ }
+
+ private void runTestMultipleThreads(final RunnableT runnable,
+ final int numThreads,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+
+ runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(),
failOnCreateConnection, failDelay);
+ }
+
+ /**
+ * @return
+ */
+ @Override
+ protected ServerLocator createLocator() throws Exception
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
+ return locator;
+ }
+
+ @Override
+ protected void stop() throws Exception
+ {
+ liveServer.stop();
+
+ System.gc();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ private void sendMessages(final ClientSession sessSend,
+ final ClientProducer producer,
+ final int numMessages,
+ final int threadNum) throws Exception
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQBytesMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("threadnum"), threadNum);
+ message.putIntProperty(new SimpleString("count"), i);
+ setBody(message);
+ producer.send(message);
+ }
+ }
+
+ private void consumeMessages(final Set<ClientConsumer> consumers, final int
numMessages, final int threadNum) throws Exception
+ {
+ // We make sure the messages arrive in the order they were sent from a particular
producer
+ Map<ClientConsumer, Map<Integer, Integer>> counts = new
HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+ if (consumerCounts == null)
+ {
+ consumerCounts = new HashMap<Integer, Integer>();
+ counts.put(consumer, consumerCounts);
+ }
+
+ ClientMessage msg =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ int tn = (Integer)msg.getObjectProperty(new
SimpleString("threadnum"));
+ int cnt = (Integer)msg.getObjectProperty(new
SimpleString("count"));
+
+ Integer c = consumerCounts.get(tn);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ throw new Exception("Invalid count, expected " + tn + ":
" + c + " got " + cnt);
+ }
+
+ c++;
+
+ // Wrap
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ consumerCounts.put(tn, c);
+
+ msg.acknowledge();
+ }
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private class MyHandler implements MessageHandler
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ private final Map<Integer, Integer> counts = new HashMap<Integer,
Integer>();
+
+ volatile String failure;
+
+ final int tn;
+
+ final int numMessages;
+
+ volatile boolean done;
+
+ synchronized void reset()
+ {
+ counts.clear();
+
+ done = false;
+
+ failure = null;
+
+ latch = new CountDownLatch(1);
+ }
+
+ MyHandler(final int threadNum, final int numMessages)
+ {
+ tn = threadNum;
+
+ this.numMessages = numMessages;
+ }
+
+ public synchronized void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException me)
+ {
+ log.error("Failed to process", me);
+ }
+
+ if (done)
+ {
+ return;
+ }
+
+ int threadNum = (Integer)message.getObjectProperty(new
SimpleString("threadnum"));
+ int cnt = (Integer)message.getObjectProperty(new
SimpleString("count"));
+
+ Integer c = counts.get(threadNum);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ failure = "Invalid count, expected " + threadNum + ":" +
c + " got " + cnt;
+ log.error(failure);
+
+ latch.countDown();
+ }
+
+ if (!checkSize(message))
+ {
+ failure = "Invalid size on message";
+ log.error(failure);
+ latch.countDown();
+ }
+
+ if (tn == threadNum && c == numMessages - 1)
+ {
+ done = true;
+ latch.countDown();
+ }
+
+ c++;
+ // Wrap around at numMessages
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ counts.put(threadNum, c);
+
+ }
+ }
+}
\ No newline at end of file
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.reattach;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.invm.InVMConnector;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A MultiThreadFailoverSupport
+ *
+ * @author <a href="mailto:time.fox@jboss.org">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ * Created Mar 17, 2009 11:15:02 AM
+ *
+ *
+ */
+public abstract class MultiThreadReattachSupport extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Attributes ----------------------------------------------------
+
+ private Timer timer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected abstract void start() throws Exception;
+
+ protected abstract void stop() throws Exception;
+
+ protected abstract ServerLocator createLocator() throws Exception;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ timer = new Timer();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+ timer = null;
+ super.tearDown();
+ }
+
+ protected boolean shouldFail()
+ {
+ return true;
+ }
+
+ protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
+ final int numThreads,
+ final int numIts,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+ for (int its = 0; its < numIts; its++)
+ {
+ log.info("Beginning iteration " + its);
+
+ start();
+
+ final ServerLocator locator = createLocator();
+
+ final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ Failer failer = startFailer(failDelay, session, failOnCreateConnection);
+
+ class Runner extends Thread
+ {
+ private volatile Throwable throwable;
+
+ private final RunnableT test;
+
+ private final int threadNum;
+
+ Runner(final RunnableT test, final int threadNum)
+ {
+ this.test = test;
+
+ this.threadNum = threadNum;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ test.run(sf, threadNum);
+ }
+ catch (Throwable t)
+ {
+ throwable = t;
+
+ log.error("Failed to run test", t);
+
+ // Case a failure happened here, it should print the Thread dump
+ // Sending it to System.out, as it would show on the Tests report
+ System.out.println(UnitTestCase.threadDump(" - fired by
MultiThreadRandomReattachTestBase::runTestMultipleThreads (" +
t.getLocalizedMessage() +
+ ")"));
+ }
+ }
+ }
+
+ do
+ {
+ List<Runner> threads = new ArrayList<Runner>();
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ Runner runner = new Runner(runnable, i);
+
+ threads.add(runner);
+
+ runner.start();
+ }
+
+ for (Runner thread : threads)
+ {
+ thread.join();
+
+ if (thread.throwable != null)
+ {
+ throw new Exception("Exception on thread " + thread,
thread.throwable);
+ }
+ }
+
+ runnable.checkFail();
+
+ }
+ while (!failer.isExecuted());
+
+ InVMConnector.resetFailures();
+
+ session.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+
+ sf.close();
+
+ stop();
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session, final boolean
failOnCreateConnection)
+ {
+ Failer failer = new Failer(session, failOnCreateConnection);
+
+ // This is useful for debugging.. just change shouldFail to return false, and
Failer will not be executed
+ if (shouldFail())
+ {
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+ }
+
+ return failer;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ protected abstract class RunnableT extends Thread
+ {
+ private volatile String failReason;
+
+ private volatile Throwable throwable;
+
+ public void setFailed(final String reason, final Throwable throwable)
+ {
+ failReason = reason;
+ this.throwable = throwable;
+ }
+
+ public void checkFail()
+ {
+ if (throwable != null)
+ {
+ log.error("Test failed: " + failReason, throwable);
+ }
+ if (failReason != null)
+ {
+ Assert.fail(failReason);
+ }
+ }
+
+ public abstract void run(final ClientSessionFactory sf, final int threadNum) throws
Exception;
+ }
+
+ private class Failer extends TimerTask
+ {
+ private final ClientSession session;
+
+ private boolean executed;
+
+ private final boolean failOnCreateConnection;
+
+ public Failer(final ClientSession session, final boolean failOnCreateConnection)
+ {
+ this.session = session;
+
+ this.failOnCreateConnection = failOnCreateConnection;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ log.info("** Failing connection");
+
+ RemotingConnectionImpl conn =
(RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+
+ if (failOnCreateConnection)
+ {
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ }
+ else
+ {
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED,
"blah"));
+ }
+
+ log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ return executed;
+ }
+ }
+
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -31,6 +31,7 @@
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.server.HornetQServer;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -16,15 +16,20 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.server.HornetQServer;
@@ -77,7 +82,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -94,10 +99,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
-
System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -139,6 +144,77 @@
}
/*
+ * Test failure on connection, but server is still up so should immediately reconnect
+ */
+ public void testOverflowCredits() throws Exception
+ {
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = 1;
+
+ locator.setRetryInterval(retryInterval);
+ locator.setRetryIntervalMultiplier(retryMultiplier);
+ locator.setReconnectAttempts(reconnectAttempts);
+ locator.setConfirmationWindowSize(1024 * 1024);
+ locator.setProducerWindowSize(1000);
+
+ final AtomicInteger count = new AtomicInteger(0);
+
+ Interceptor intercept = new Interceptor()
+ {
+
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ System.out.println("Intercept..." + packet.getClass().getName());
+
+ if (packet instanceof SessionProducerCreditsMessage )
+ {
+ SessionProducerCreditsMessage credit =
(SessionProducerCreditsMessage)packet;
+
+ System.out.println("Credits: " + credit.getCredits());
+ if (count.incrementAndGet() == 2)
+ {
+ System.out.println("Failing");
+ connection.fail(new HornetQException(1, "bye"));
+ return false;
+ }
+ }
+ return true;
+ }
+ };
+
+ locator.addInterceptor(intercept);
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ReattachTest.ADDRESS, ReattachTest.ADDRESS, null, false);
+
+ ClientProducer producer = session.createProducer(ReattachTest.ADDRESS);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBodyBuffer().writeBytes(new byte[5000]);
+ producer.send(message);
+ }
+
+ session.close();
+
+ sf.close();
+ }
+
+ /*
* Test failure on connection, simulate failure to create connection for a while, then
* allow connection to be recreated
*/
@@ -154,7 +230,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -167,10 +243,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -244,7 +320,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -277,10 +353,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -358,7 +434,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -371,10 +447,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -448,7 +524,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ final ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
session = sf.createSession();
@@ -558,7 +634,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ final ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
InVMConnector.failOnCreateConnection = true;
@@ -656,7 +732,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -697,11 +773,11 @@
//
// //Should throw exception since didn't reconnect
- //
+ //
// try
// {
// session.start();
- //
+ //
// fail("Should throw exception");
// }
// catch (HornetQException e)
@@ -728,7 +804,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -741,10 +817,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -791,12 +867,11 @@
final int reconnectAttempts = -1;
-
locator.setRetryInterval(retryInterval);
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -809,10 +884,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -884,12 +959,11 @@
final int reconnectAttempts = -1;
-
locator.setRetryInterval(retryInterval);
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -902,10 +976,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -967,7 +1041,7 @@
locator.setReconnectAttempts(reconnectAttempts);
locator.setMaxRetryInterval(maxRetryInterval);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -980,10 +1054,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -1045,7 +1119,7 @@
service.start();
- locator = createFactory(false);
+ locator = createFactory(false);
}
@Override
@@ -1054,7 +1128,7 @@
InVMConnector.resetFailures();
locator.close();
-
+
service.stop();
Assert.assertEquals(0, InVMRegistry.instance.size());
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -14,6 +14,7 @@
package org.hornetq.tests.integration.core.deployers.impl;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.deployers.DeploymentManager;
import org.hornetq.core.deployers.impl.FileDeploymentManager;
import org.hornetq.core.deployers.impl.QueueDeployer;
@@ -21,7 +22,9 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.jms.client;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.jms.cluster;
@@ -40,6 +40,7 @@
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/ReplicatedJMSFailoverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/ReplicatedJMSFailoverTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/ReplicatedJMSFailoverTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.jms.cluster;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.jms.connection;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.deployers.DeploymentManager;
import org.hornetq.core.deployers.impl.FileDeploymentManager;
import org.hornetq.core.logging.Logger;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -35,6 +35,7 @@
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/config/JMSConfigurationTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/config/JMSConfigurationTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/config/JMSConfigurationTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.jms.server.config;
@@ -36,8 +36,8 @@
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.RandomUtil;
import java.util.ArrayList;
import java.util.List;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -22,6 +22,7 @@
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
@@ -39,11 +40,11 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
@@ -1014,7 +1015,112 @@
serverManager.destroyQueue(otherQueueName);
}
+
+ public void testDeleteWithPaging() throws Exception
+ {
+ AddressSettings pagedSetting = new AddressSettings();
+ pagedSetting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ pagedSetting.setPageSizeBytes(10 * 1024);
+ pagedSetting.setMaxSizeBytes(100 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", pagedSetting);
+
+ serverManager.createQueue(true, "pagedTest", null, true,
"/queue/pagedTest");
+
+ HornetQQueue pagedQueue =
(HornetQQueue)context.lookup("/queue/pagedTest");
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod = session.createProducer(pagedQueue.getAddress());
+
+ ClientMessage msg = session.createMessage(true);
+
+ msg.getBodyBuffer().writeBytes(new byte[90 * 1024]);
+ for (int i = 0 ; i < 100; i++)
+ {
+ prod.send(msg);
+ }
+
+ JMSQueueControl control = createManagementControl(pagedQueue);
+
+ assertEquals(100, control.removeMessages(" "));
+
+
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(pagedQueue.getAddress());
+
+ assertNull(consumer.receive(300));
+
+
+ session.close();
+
+ sf.close();
+ locator.close();
+ }
+
+
+ public void testDeleteWithPagingAndFilter() throws Exception
+ {
+ AddressSettings pagedSetting = new AddressSettings();
+ pagedSetting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ pagedSetting.setPageSizeBytes(10 * 1024);
+ pagedSetting.setMaxSizeBytes(100 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", pagedSetting);
+
+ serverManager.createQueue(true, "pagedTest", null, true,
"/queue/pagedTest");
+
+ HornetQQueue pagedQueue =
(HornetQQueue)context.lookup("/queue/pagedTest");
+
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod = session.createProducer(pagedQueue.getAddress());
+ for (int i = 0 ; i < 200; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.getBodyBuffer().writeBytes(new byte[90 * 1024]);
+ msg.putBooleanProperty("even", i % 2 == 0);
+ prod.send(msg);
+ }
+
+ JMSQueueControl control = createManagementControl(pagedQueue);
+
+ assertEquals(100, control.removeMessages("even=true"));
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(pagedQueue.getAddress());
+
+
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertFalse(msg.getBooleanProperty("even").booleanValue());
+ }
+
+ assertNull(consumer.receive(300));
+
+
+ session.close();
+
+
+ sf.close();
+ locator.close();
+ }
+
public void testMoveMessageToUnknownQueue() throws Exception
{
String unknwonQueue = RandomUtil.randomString();
@@ -1070,7 +1176,7 @@
conf.setJMXManagementEnabled(true);
conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf.setFileDeploymentEnabled(false);
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
server.start();
serverManager = new JMSServerManagerImpl(server);
@@ -1106,9 +1212,14 @@
protected JMSQueueControl createManagementControl() throws Exception
{
- return ManagementControlHelper.createJMSQueueControl(queue, mbeanServer);
+ return createManagementControl(queue);
}
+ protected JMSQueueControl createManagementControl(HornetQQueue queueParameter) throws
Exception
+ {
+ return ManagementControlHelper.createJMSQueueControl(queueParameter, mbeanServer);
+ }
+
// Private -------------------------------------------------------
private Connection createConnection() throws JMSException
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -91,7 +91,7 @@
conf.setSecurityEnabled(false);
conf.setJMXManagementEnabled(true);
conf.getAcceptorConfigurations().add(new TransportConfiguration(acceptorFactory));
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
server.start();
context = new InVMContext();
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.jms.server.management;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.journal;
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,702 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.largemessage;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A LargeMessageTestBase
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ * Created Oct 29, 2008 11:43:52 AM
+ *
+ *
+ */
+public abstract class LargeMessageTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
+
+ protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ protected HornetQServer server;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ LargeMessageTestBase.log.warn(e.getMessage(), e);
+ }
+ }
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean restartOnXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery) throws Exception
+ {
+ testChunks(isXA,
+ restartOnXA,
+ rollbackFirstSend,
+ useStreamOnConsume,
+ realFiles,
+ preAck,
+ sendingBlocking,
+ testBrowser,
+ useMessageConsumer,
+ numberOfMessages,
+ numberOfBytes,
+ waitOnConsumer,
+ delayDelivery,
+ -1,
+ 10 * 1024);
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean restartOnXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery,
+ final int producerWindow,
+ final int minSize) throws Exception
+ {
+ clearData();
+
+ server = createServer(realFiles);
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ try
+ {
+
+ if (sendingBlocking)
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ }
+
+ if (producerWindow > 0)
+ {
+ locator.setConfirmationWindowSize(producerWindow);
+ }
+
+ locator.setMinLargeMessageSize(minSize);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session;
+
+ Xid xid = null;
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ if (rollbackFirstSend)
+ {
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session,
producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles && restartOnXA)
+ {
+ server.stop();
+ server.start();
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ Assert.assertEquals(1, xids.length);
+ Assert.assertEquals(xid, xids[0]);
+
+ session.rollback(xid);
+ producer = session.createProducer(ADDRESS);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ validateNoFilesOnLargeDir();
+ }
+
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session,
producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles && restartOnXA)
+ {
+ server.stop();
+ server.start();
+ //we need to recreate sf's
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ Assert.assertEquals(1, xids.length);
+ Assert.assertEquals(xid, xids[0]);
+
+ producer = session.createProducer(ADDRESS);
+
+ session.commit(xid, false);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+
+ server = createServer(realFiles);
+ server.start();
+
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientConsumer consumer = null;
+
+ for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
+ {
+ session.stop();
+
+ // first time with a browser
+ consumer = session.createConsumer(ADDRESS, null, iteration == 0);
+
+ if (useMessageConsumer)
+ {
+ final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ MessageHandler handler = new MessageHandler()
+ {
+ int msgCounter;
+
+ public void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getObjectProperty(new
SimpleString("original-time"));
+ Assert.assertTrue(System.currentTimeMillis() - originalTime +
"<" + delayDelivery,
+ System.currentTimeMillis() - originalTime
>= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ Assert.assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on
multiple scheduledMessages with
+ // the same
+ // scheduled delivery time
+ Assert.assertEquals(msgCounter,
+ ((Integer)message.getObjectProperty(new
SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ @Override
+ public void write(final byte b[]) throws IOException
+ {
+ if (b[0] ==
UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ LargeMessageTestBase.log.debug("Read position
" + bytesRead.get() + " on consumer");
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("Received invalid
packet at position " + bytesRead.get());
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (b == UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("byte not as
expected!");
+ }
+ }
+ });
+
+ Assert.assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+
+ HornetQBuffer buffer = message.getBodyBuffer();
+ buffer.resetReaderIndex();
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0)
+ {
+ LargeMessageTestBase.log.debug("Read " + b +
" bytes");
+ }
+
+ Assert.assertEquals(UnitTestCase.getSamplebyte(b),
buffer.readByte());
+ }
+
+ try
+ {
+ buffer.readByte();
+ Assert.fail("Supposed to throw an exception");
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ LargeMessageTestBase.log.warn("Got an error", e);
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ latchDone.countDown();
+ msgCounter++;
+ }
+ }
+ };
+
+ session.start();
+
+ consumer.setMessageHandler(handler);
+
+ Assert.assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+ }
+ else
+ {
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ System.currentTimeMillis();
+
+ ClientMessage message = consumer.receive(waitOnConsumer +
delayDelivery);
+
+ Assert.assertNotNull(message);
+
+ System.currentTimeMillis();
+
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getObjectProperty(new
SimpleString("original-time"));
+ Assert.assertTrue(System.currentTimeMillis() - originalTime +
"<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >=
delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ Assert.assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple
scheduledMessages with the same
+ // scheduled delivery time
+ Assert.assertEquals(i,
+ ((Integer)message.getObjectProperty(new
SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ @Override
+ public void write(final byte b[]) throws IOException
+ {
+ if (b[0] == UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("Received invalid packet
at position " + bytesRead.get());
+ }
+
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (bytesRead.get() % (1024l * 1024l) == 0)
+ {
+ LargeMessageTestBase.log.debug("Read " +
bytesRead.get() + " bytes");
+ }
+ if (b == (byte)'a')
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("byte not as
expected!");
+ }
+ }
+ });
+
+ Assert.assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+ HornetQBuffer buffer = message.getBodyBuffer();
+ buffer.resetReaderIndex();
+
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0l)
+ {
+ LargeMessageTestBase.log.debug("Read " + b + "
bytes");
+ }
+ Assert.assertEquals(UnitTestCase.getSamplebyte(b),
buffer.readByte());
+ }
+ }
+
+ }
+
+ }
+ consumer.close();
+
+ if (iteration == 0)
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+ else
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+
+ session.close();
+
+ Assert.assertEquals(0,
((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0,
((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ /**
+ * @param useFile
+ * @param numberOfMessages
+ * @param numberOfIntegers
+ * @param delayDelivery
+ * @param testTime
+ * @param session
+ * @param producer
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws HornetQException
+ */
+ private void sendMessages(final int numberOfMessages,
+ final long numberOfBytes,
+ final long delayDelivery,
+ final ClientSession session,
+ final ClientProducer producer) throws Exception
+ {
+ LargeMessageTestBase.log.debug("NumberOfBytes = " + numberOfBytes);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ // If the test is using more than 1M, we will only use the Streaming, as it
require too much memory from the
+ // test
+ if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
+ {
+ LargeMessageTestBase.log.debug("Sending message (stream)" + i);
+
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
+ }
+ else
+ {
+ LargeMessageTestBase.log.debug("Sending message (array)" + i);
+ byte[] bytes = new byte[(int)numberOfBytes];
+ for (int j = 0; j < bytes.length; j++)
+ {
+ bytes[j] = UnitTestCase.getSamplebyte(j);
+ }
+ message.getBodyBuffer().writeBytes(bytes);
+ }
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ if (delayDelivery > 0)
+ {
+ long time = System.currentTimeMillis();
+ message.putLongProperty(new SimpleString("original-time"), time);
+ message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time +
delayDelivery);
+
+ producer.send(message);
+ }
+ else
+ {
+ producer.send(message);
+ }
+ }
+ }
+
+ protected HornetQBuffer createLargeBuffer(final int numberOfIntegers)
+ {
+ HornetQBuffer body = HornetQBuffers.fixedBuffer(DataConstants.SIZE_INT *
numberOfIntegers);
+
+ for (int i = 0; i < numberOfIntegers; i++)
+ {
+ body.writeInt(i);
+ }
+
+ return body;
+
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session, final
int numberOfBytes) throws Exception
+ {
+ return createLargeClientMessage(session, numberOfBytes, true);
+ }
+
+ protected ClientMessage createLargeClientMessage (final ClientSession session, final
byte[] buffer, final boolean durable) throws Exception
+ {
+ ClientMessage msgs = session.createMessage(durable);
+ msgs.getBodyBuffer().writeBytes(buffer);
+ return msgs;
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session,
+ final long numberOfBytes,
+ final boolean persistent) throws
Exception
+ {
+
+ ClientMessage clientMessage = session.createMessage(persistent);
+
+
clientMessage.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
+
+ return clientMessage;
+ }
+
+ /**
+ * @param session
+ * @param queueToRead
+ * @param numberOfIntegers
+ * @throws HornetQException
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ protected void readMessage(final ClientSession session, final SimpleString
queueToRead, final int numberOfBytes) throws HornetQException,
+
FileNotFoundException,
+
IOException
+ {
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueToRead);
+
+ ClientMessage clientMessage = consumer.receive(5000);
+
+ Assert.assertNotNull(clientMessage);
+
+ clientMessage.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+ }
+
+ protected OutputStream createFakeOutputStream() throws Exception
+ {
+
+ return new OutputStream()
+ {
+ private boolean closed = false;
+
+ private int count;
+
+ @Override
+ public void close() throws IOException
+ {
+ super.close();
+ closed = true;
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (count++ % 1024 * 1024 == 0)
+ {
+ LargeMessageTestBase.log.debug("OutputStream received " + count
+ " bytes");
+ }
+ if (closed)
+ {
+ throw new IOException("Stream was closed");
+ }
+ }
+
+ };
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/logging/LogDelegateTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/logging/LogDelegateTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/logging/LogDelegateTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.logging;
@@ -16,6 +16,7 @@
import junit.framework.Assert;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.logging.impl.JULLogDelegate;
import org.hornetq.core.logging.impl.JULLogDelegateFactory;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -27,6 +27,7 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.ServiceTestBase;
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQActivationTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQActivationTest.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQActivationTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.tests.integration.ra.HornetQRATestBase.MyBootstrapContext;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A HornetQActivationTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class HornetQActivationTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testValidateJNDIParameters() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setJndiParams("a=b;c=d;url=a1,a2,a3");
+
+ assertEquals("b", spec.getParsedJndiParams().get("a"));
+ assertEquals("d", spec.getParsedJndiParams().get("c"));
+ assertEquals("a1,a2,a3",
spec.getParsedJndiParams().get("url"));
+
+
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -19,8 +19,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -27,9 +27,11 @@
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -22,6 +22,8 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -12,6 +12,9 @@
*/
package org.hornetq.tests.integration.scheduling;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -32,6 +35,7 @@
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic">Clebert Suconic</a>
*/
public class ScheduledMessageTest extends ServiceTestBase
{
@@ -52,6 +56,14 @@
{
super.setUp();
clearData();
+ startServer();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void startServer() throws Exception
+ {
configuration = createDefaultConfig();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
@@ -64,7 +76,7 @@
protected void tearDown() throws Exception
{
locator.close();
-
+
if (server != null)
{
try
@@ -282,10 +294,10 @@
session.createQueue(atestq, atestq, null, true);
ClientProducer producer = session.createProducer(atestq);
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.getBodyBuffer().writeString("testINVMCoreClient");
message.setDurable(true);
long time = System.currentTimeMillis();
@@ -484,6 +496,47 @@
session.close();
}
+
+ public void testManyMessagesSameTime() throws Exception
+ {
+
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(false, false, false);
+ session.createQueue(atestq, atestq, null, true);
+ ClientProducer producer = session.createProducer(atestq);
+ long time = System.currentTimeMillis();
+ time += 1000;
+
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("value", i);
+ message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(message);
+ }
+
+ session.commit();
+
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ ClientMessage message = consumer.receive(15000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ assertEquals(i, message.getIntProperty("value").intValue());
+ }
+
+ session.commit();
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ session.close();
+ }
+
public void testScheduledAndNormalMessagesDeliveredCorrectly(final boolean recover)
throws Exception
{
@@ -607,7 +660,76 @@
Assert.assertNull(consumer.receiveImmediate());
session.close();
}
+
+
+ public void testPendingACKOnPrepared() throws Exception
+ {
+
+ int NUMBER_OF_MESSAGES = 100;
+
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(true, false, false);
+ session.createQueue(atestq, atestq, null, true);
+ ClientProducer producer = session.createProducer(atestq);
+
+ long scheduled = System.currentTimeMillis() + 1000;
+ for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("value", i);
+ msg.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, scheduled);
+ producer.send(msg);
+ }
+
+ session.close();
+
+
+ for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
+ {
+ Xid xid = newXID();
+
+ session = sessionFactory.createSession(true, false, false);
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ session.close();
+ }
+
+ sessionFactory.close();
+ locator.close();
+
+ server.stop();
+
+ startServer();
+
+ sessionFactory = locator.createSessionFactory();
+
+ session = sessionFactory.createSession(false, false);
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ assertNull(consumer.receive(1000));
+
+ session.close();
+
+ sessionFactory.close();
+
+ }
+
public void testScheduledDeliveryTX() throws Exception
{
scheduledDelivery(true);
@@ -618,6 +740,116 @@
scheduledDelivery(false);
}
+ public void testRedeliveryAfterPrepare() throws Exception
+ {
+ AddressSettings qs = new AddressSettings();
+ qs.setRedeliveryDelay(5000l);
+ server.getAddressSettingsRepository().addMatch(atestq.toString(), qs);
+
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(false, false, false);
+
+ session.createQueue(atestq, atestq, true);
+
+ ClientProducer producer = session.createProducer(atestq);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("key", i);
+ producer.send(msg);
+ session.commit();
+ }
+
+ session.close();
+
+ session = sessionFactory.createSession(true, false, false);
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ ArrayList<Xid> xids = new ArrayList<Xid>();
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ Xid xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ xids.add(xid);
+ }
+
+ session.rollback(xids.get(0));
+ xids.set(0, null);
+ session.close();
+
+ server.stop();
+
+ configuration = createDefaultConfig();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.getAddressesSettings().put(atestq.toString(), qs);
+
+ server = createServer(true, configuration);
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ final AtomicInteger count = new AtomicInteger(0);
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, false);
+ session.start();
+ ClientConsumer cons = session.createConsumer(atestq);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = cons.receive(100000);
+ assertNotNull(msg);
+ System.out.println("Received message " + msg);
+ count.incrementAndGet();
+ msg.acknowledge();
+ session.commit();
+ }
+ session.close();
+ sf.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ count.set(-1);
+ }
+ }
+ };
+
+ t.start();
+
+ sessionFactory = locator.createSessionFactory();
+
+ session = sessionFactory.createSession(true, false, false);
+
+ for (Xid xid : xids)
+ {
+ if (xid != null)
+ {
+ session.rollback(xid);
+ }
+ }
+
+ session.close();
+
+ t.join();
+
+ assertEquals(100, count.get());
+ }
+
// Private -------------------------------------------------------
private void scheduledDelivery(final boolean tx) throws Exception
@@ -770,10 +1002,10 @@
private ClientMessage createDurableMessage(final ClientSession session, final String
body)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- true,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ true,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.getBodyBuffer().writeString(body);
return message;
}
Property changes on: trunk/tests/jms-tests
___________________________________________________________________
Modified: svn:ignore
- build
target
+ build
target
.classpath
.project
Modified:
trunk/tests/jms-tests/src/test/java/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
---
trunk/tests/jms-tests/src/test/java/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/jms-tests/src/test/java/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -1408,6 +1408,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addMetaDataV1(java.lang.String,
java.lang.String)
+ */
+ public void addMetaDataV1(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
Property changes on: trunk/tests/joram-tests
___________________________________________________________________
Modified: svn:ignore
- build
target
+ build
target
.classpath
.project
Property changes on: trunk/tests/performance-tests
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/tests/soak-tests
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/tests/stress-tests
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Property changes on: trunk/tests/timing-tests
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Modified:
trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/util/UUIDTest.java
===================================================================
---
trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/util/UUIDTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/util/UUIDTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -13,19 +13,12 @@
package org.hornetq.tests.timing.util;
-import junit.framework.Assert;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.UUIDGenerator;
-
-import java.util.HashSet;
-import java.util.Set;
-
/**
*
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
*/
-public class UUIDTest extends UnitTestCase
+public class UUIDTest extends org.hornetq.tests.unit.util.UUIDTest
{
// Constants -----------------------------------------------------
@@ -38,34 +31,11 @@
// Public --------------------------------------------------------
- public void testManyUUIDs() throws Exception
- {
- final int MANY_TIMES = getTimes();
- Set<String> uuidsSet = new HashSet<String>();
-
- UUIDGenerator gen = UUIDGenerator.getInstance();
- for (int i = 0; i < MANY_TIMES; i++)
- {
- uuidsSet.add(gen.generateStringUUID());
- }
-
- // we put them in a set to check duplicates
- Assert.assertEquals(MANY_TIMES, uuidsSet.size());
- }
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ @Override
protected int getTimes()
{
return 1000000;
Property changes on: trunk/tests/unit-tests
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.project
Modified:
trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -348,7 +348,7 @@
public void delete() throws Exception
{
- if (!open)
+ if (open)
{
close();
}
Modified:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
===================================================================
---
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -63,7 +63,7 @@
FileDeploymentManagerTest.log.debug("Filename is " + filename);
- File file = new File("target/tmpfiles/" + filename);
+ File file = new File("tests/tmpfiles/" + filename);
FileDeploymentManagerTest.log.debug(file.getAbsoluteFile());
@@ -99,7 +99,7 @@
FileDeploymentManagerTest.log.debug("Filename is " + filename);
- File file = new File("target/tmpfiles/" + filename);
+ File file = new File("tests/tmpfiles/" + filename);
FileDeploymentManagerTest.log.debug(file.getAbsoluteFile());
@@ -134,9 +134,9 @@
String filename2 = "fdm_test_file.xml2";
String filename3 = "fdm_test_file.xml3";
- File file1 = new File("target/tmpfiles/" + filename1);
- File file2 = new File("target/tmpfiles/" + filename2);
- File file3 = new File("target/tmpfiles/" + filename3);
+ File file1 = new File("tests/tmpfiles/" + filename1);
+ File file2 = new File("tests/tmpfiles/" + filename2);
+ File file3 = new File("tests/tmpfiles/" + filename3);
file1.createNewFile();
file2.createNewFile();
@@ -231,7 +231,7 @@
String filename = "fdm_test_file.xml1";
- File file = new File("target/tmpfiles/" + filename);
+ File file = new File("tests/tmpfiles/" + filename);
file.createNewFile();
long oldLastModified = file.lastModified();
@@ -280,7 +280,7 @@
String filename = "fdm_test_file.xml1";
- File file = new File("target/tmpfiles/" + filename);
+ File file = new File("tests/tmpfiles/" + filename);
file.createNewFile();
Modified:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
---
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -98,7 +98,7 @@
file.open();
impl = new PageImpl(new SimpleString("something"), new
NullStorageManager(), factory, file, 10);
- List<PagedMessage> msgs = impl.read();
+ List<PagedMessage> msgs = impl.read(new NullStorageManager());
Assert.assertEquals(numberOfElements, msgs.size());
@@ -115,7 +115,7 @@
.array());
}
- impl.delete();
+ impl.delete(null);
Assert.assertEquals(0, factory.listFiles(".page").size());
@@ -170,7 +170,7 @@
file.open();
impl = new PageImpl(new SimpleString("something"), new
NullStorageManager(), factory, file, 10);
- List<PagedMessage> msgs = impl.read();
+ List<PagedMessage> msgs = impl.read(new NullStorageManager());
Assert.assertEquals(numberOfElements, msgs.size());
@@ -187,7 +187,7 @@
.array());
}
- impl.delete();
+ impl.delete(null);
Assert.assertEquals(0, factory.listFiles("page").size());
Modified:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
---
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -93,7 +93,7 @@
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
Modified:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.paging.impl;
-import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -58,6 +57,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -286,7 +286,7 @@
page.open();
- List<PagedMessage> msg = page.read();
+ List<PagedMessage> msg = page.read(new NullStorageManager());
Assert.assertEquals(numMessages, msg.size());
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -374,7 +374,7 @@
page.open();
- List<PagedMessage> msg = page.read();
+ List<PagedMessage> msg = page.read(new NullStorageManager());
page.close();
@@ -399,9 +399,9 @@
newPage.open();
- Assert.assertEquals(1, newPage.read().size());
+ Assert.assertEquals(1, newPage.read(new NullStorageManager()).size());
- newPage.delete();
+ newPage.delete(null);
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -421,7 +421,7 @@
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
Assert.assertEquals(1, msgs.size());
@@ -603,7 +603,7 @@
for (Page page : readPages)
{
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
for (PagedMessage msg : msgs)
@@ -678,7 +678,7 @@
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
@@ -696,7 +696,7 @@
}
lastPage.open();
- List<PagedMessage> lastMessages = lastPage.read();
+ List<PagedMessage> lastMessages = lastPage.read(new NullStorageManager());
lastPage.close();
Assert.assertEquals(1, lastMessages.size());
@@ -856,7 +856,7 @@
if (page != null)
{
page.open();
- List<PagedMessage> messages = page.read();
+ List<PagedMessage> messages = page.read(new
NullStorageManager());
for (PagedMessage pgmsg : messages)
{
@@ -868,7 +868,7 @@
}
page.close();
- page.delete();
+ page.delete(null);
}
else
{
Modified:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/jms/misc/ManifestTest.java
===================================================================
---
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/jms/misc/ManifestTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/jms/misc/ManifestTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -75,7 +75,7 @@
Attributes attrs = manifest.getMainAttributes();
Assert.assertEquals(meta.getProviderVersion(),
attrs.getValue("HornetQ-Version"));
-
Assert.assertEquals("https://svn.jboss.org/repos/hornetq/trunk",
attrs.getValue("HornetQ-SVN-URL"));
+
Assert.assertEquals("https://svn.jboss.org/repos/hornetq/branches/Br...;,
attrs.getValue("HornetQ-SVN-URL"));
}
finally
{
Modified:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
===================================================================
---
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2011-04-20
22:22:34 UTC (rev 10549)
+++
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2011-04-21
22:08:46 UTC (rev 10550)
@@ -78,8 +78,7 @@
ra.setConnectorClassName(InVMConnector.class.getName());
HornetQConnectionFactory factory = ra.getDefaultHornetQConnectionFactory();
Assert.assertEquals(factory.getCallTimeout(), HornetQClient.DEFAULT_CALL_TIMEOUT);
- Assert.assertEquals(factory.getClientFailureCheckPeriod(),
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD);
+ Assert.assertEquals(factory.getClientFailureCheckPeriod(),
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD);
Assert.assertEquals(factory.getClientID(), null);
Assert.assertEquals(factory.getConnectionLoadBalancingPolicyClassName(),
HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME);
@@ -89,21 +88,17 @@
Assert.assertEquals(factory.getDupsOKBatchSize(),
HornetQClient.DEFAULT_ACK_BATCH_SIZE);
Assert.assertEquals(factory.getMinLargeMessageSize(),
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
Assert.assertEquals(factory.getProducerMaxRate(),
HornetQClient.DEFAULT_PRODUCER_MAX_RATE);
- Assert.assertEquals(factory.getConfirmationWindowSize(),
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
+ Assert.assertEquals(factory.getConfirmationWindowSize(),
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
// by default, reconnect attempts is set to -1
Assert.assertEquals(-1, factory.getReconnectAttempts());
Assert.assertEquals(factory.getRetryInterval(),
HornetQClient.DEFAULT_RETRY_INTERVAL);
- Assert.assertEquals(factory.getRetryIntervalMultiplier(),
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER);
- Assert.assertEquals(factory.getScheduledThreadPoolMaxSize(),
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
+ Assert.assertEquals(factory.getRetryIntervalMultiplier(),
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER);
+ Assert.assertEquals(factory.getScheduledThreadPoolMaxSize(),
HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
Assert.assertEquals(factory.getThreadPoolMaxSize(),
HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE);
Assert.assertEquals(factory.getTransactionBatchSize(),
HornetQClient.DEFAULT_ACK_BATCH_SIZE);
Assert.assertEquals(factory.isAutoGroup(), HornetQClient.DEFAULT_AUTO_GROUP);
Assert.assertEquals(factory.isBlockOnAcknowledge(),
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
- Assert.assertEquals(factory.isBlockOnNonDurableSend(),
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
+ Assert.assertEquals(factory.isBlockOnNonDurableSend(),
HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
Assert.assertEquals(factory.isBlockOnDurableSend(),
HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND);
Assert.assertEquals(factory.isPreAcknowledge(),
HornetQClient.DEFAULT_PRE_ACKNOWLEDGE);
Assert.assertEquals(factory.isUseGlobalPools(),
HornetQClient.DEFAULT_USE_GLOBAL_POOLS);
@@ -124,8 +119,7 @@
ra.setConnectorClassName(InVMConnector.class.getName());
HornetQConnectionFactory factory = ra.createHornetQConnectionFactory(new
ConnectionFactoryProperties());
Assert.assertEquals(factory.getCallTimeout(), HornetQClient.DEFAULT_CALL_TIMEOUT);
- Assert.assertEquals(factory.getClientFailureCheckPeriod(),
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD);
+ Assert.assertEquals(factory.getClientFailureCheckPeriod(),
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD);
Assert.assertEquals(factory.getClientID(), null);
Assert.assertEquals(factory.getConnectionLoadBalancingPolicyClassName(),
HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME);
@@ -135,21 +129,17 @@
Assert.assertEquals(factory.getDupsOKBatchSize(),
HornetQClient.DEFAULT_ACK_BATCH_SIZE);
Assert.assertEquals(factory.getMinLargeMessageSize(),
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
Assert.assertEquals(factory.getProducerMaxRate(),
HornetQClient.DEFAULT_PRODUCER_MAX_RATE);
- Assert.assertEquals(factory.getConfirmationWindowSize(),
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
+ Assert.assertEquals(factory.getConfirmationWindowSize(),
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
// by default, reconnect attempts is set to -1
Assert.assertEquals(-1, factory.getReconnectAttempts());
Assert.assertEquals(factory.getRetryInterval(),
HornetQClient.DEFAULT_RETRY_INTERVAL);
- Assert.assertEquals(factory.getRetryIntervalMultiplier(),
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER);
- Assert.assertEquals(factory.getScheduledThreadPoolMaxSize(),
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
+ Assert.assertEquals(factory.getRetryIntervalMultiplier(),
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER);
+ Assert.assertEquals(factory.getScheduledThreadPoolMaxSize(),
HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
Assert.assertEquals(factory.getThreadPoolMaxSize(),
HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE);
Assert.assertEquals(factory.getTransactionBatchSize(),
HornetQClient.DEFAULT_ACK_BATCH_SIZE);
Assert.assertEquals(factory.isAutoGroup(), HornetQClient.DEFAULT_AUTO_GROUP);
Assert.assertEquals(factory.isBlockOnAcknowledge(),
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
- Assert.assertEquals(factory.isBlockOnNonDurableSend(),
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
+ Assert.assertEquals(factory.isBlockOnNonDurableSend(),
HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
Assert.assertEquals(factory.isBlockOnDurableSend(),
HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND);
Assert.assertEquals(factory.isPreAcknowledge(),
HornetQClient.DEFAULT_PRE_ACKNOWLEDGE);
Assert.assertEquals(factory.isUseGlobalPools(),
HornetQClient.DEFAULT_USE_GLOBAL_POOLS);
@@ -204,8 +194,7 @@
Assert.assertEquals(factory.getTransactionBatchSize(), 18);
Assert.assertEquals(factory.isAutoGroup(), !HornetQClient.DEFAULT_AUTO_GROUP);
Assert.assertEquals(factory.isBlockOnAcknowledge(),
!HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
- Assert.assertEquals(factory.isBlockOnNonDurableSend(),
- !HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
+ Assert.assertEquals(factory.isBlockOnNonDurableSend(),
!HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
Assert.assertEquals(factory.isBlockOnDurableSend(),
!HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND);
Assert.assertEquals(factory.isPreAcknowledge(),
!HornetQClient.DEFAULT_PRE_ACKNOWLEDGE);
Assert.assertEquals(factory.isUseGlobalPools(),
!HornetQClient.DEFAULT_USE_GLOBAL_POOLS);
@@ -261,8 +250,7 @@
Assert.assertEquals(factory.getTransactionBatchSize(), 18);
Assert.assertEquals(factory.isAutoGroup(), !HornetQClient.DEFAULT_AUTO_GROUP);
Assert.assertEquals(factory.isBlockOnAcknowledge(),
!HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
- Assert.assertEquals(factory.isBlockOnNonDurableSend(),
- !HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
+ Assert.assertEquals(factory.isBlockOnNonDurableSend(),
!HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND);
Assert.assertEquals(factory.isBlockOnDurableSend(),
!HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND);
Assert.assertEquals(factory.isPreAcknowledge(),
!HornetQClient.DEFAULT_PRE_ACKNOWLEDGE);
Assert.assertEquals(factory.isUseGlobalPools(),
!HornetQClient.DEFAULT_USE_GLOBAL_POOLS);
@@ -409,15 +397,20 @@
public void testValidateProperties() throws Exception
{
- validateGettersAndSetters(new HornetQResourceAdapter(),
"backupTransportConfiguration", "connectionParameters");
+ validateGettersAndSetters(new HornetQResourceAdapter(),
+ "backupTransportConfiguration",
+ "connectionParameters",
+ "jndiParams");
validateGettersAndSetters(new HornetQRAManagedConnectionFactory(),
"connectionParameters",
"sessionDefaultType",
- "backupConnectionParameters");
+ "backupConnectionParameters",
+ "jndiParams");
validateGettersAndSetters(new HornetQActivationSpec(),
"connectionParameters",
"acknowledgeMode",
- "subscriptionDurability");
+ "subscriptionDurability",
+ "jndiParams");
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -463,7 +456,7 @@
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
- HornetQDestination queue = (HornetQDestination)
HornetQJMSClient.createQueue("test");
+ HornetQDestination queue =
(HornetQDestination)HornetQJMSClient.createQueue("test");
session.createQueue(queue.getSimpleAddress(), queue.getSimpleAddress(), true);
session.close();
@@ -497,7 +490,7 @@
activation.start();
activation.stop();
-
+
ra.stop();
locator.close();