Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 21:03:01 -0400 (Fri, 25 Mar 2011)
New Revision: 10372
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/MessageReference.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
merge -r10363:10371 from branch_2_2
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-26 00:38:27
UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-26 01:03:01
UTC (rev 10372)
@@ -1062,8 +1062,6 @@
channel.returnBlocking();
}
-
- channel.setTransferring(false);
}
catch (Throwable t)
{
@@ -1071,6 +1069,7 @@
}
finally
{
+ channel.setTransferring(false);
channel.unlock();
}
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-03-26
00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -37,9 +37,11 @@
private final PagePosition position;
private WeakReference<PagedMessage> message;
-
+
private Long deliveryTime = null;
-
+
+ private int persistedCount;
+
private final PageSubscription subscription;
public ServerMessage getMessage()
@@ -50,10 +52,10 @@
public synchronized PagedMessage getPagedMessage()
{
PagedMessage returnMessage = message != null ? message.get() : null;
-
+
// We only keep a few references on the Queue from paging...
// Besides those references are SoftReferenced on page cache...
- // So, this will unlikely be null,
+ // So, this will unlikely be null,
// unless the Queue has stalled for some time after paging
if (returnMessage == null)
{
@@ -69,7 +71,9 @@
return position;
}
- public PagedReferenceImpl(final PagePosition position, final PagedMessage message,
final PageSubscription subscription)
+ public PagedReferenceImpl(final PagePosition position,
+ final PagedMessage message,
+ final PageSubscription subscription)
{
this.position = position;
this.message = new WeakReference<PagedMessage>(message);
@@ -80,6 +84,16 @@
{
return true;
}
+
+ public void setPersistedCount(int count)
+ {
+ this.persistedCount = count;
+ }
+
+ public int getPersistedCount()
+ {
+ return persistedCount;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-26
00:38:27 UTC (rev 10371)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -744,16 +744,22 @@
public void updateDeliveryCount(final MessageReference ref) throws Exception
{
- DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
-
ref.getDeliveryCount());
+ // no need to store if it's the same value
+ // otherwise the journal will get OME in case of lots of redeliveries
+ if (ref.getDeliveryCount() != ref.getPersistedCount())
+ {
+ ref.setPersistedCount(ref.getDeliveryCount());
+ DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
+
ref.getDeliveryCount());
+
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ JournalStorageManager.UPDATE_DELIVERY_COUNT,
+ updateInfo,
+
+ syncNonTransactional,
+ getContext(syncNonTransactional));
+ }
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.UPDATE_DELIVERY_COUNT,
- updateInfo,
-
- syncNonTransactional,
- getContext(syncNonTransactional));
-
}
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws
Exception
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-03-26
00:38:27 UTC (rev 10371)
+++
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -114,8 +114,6 @@
private final ServerSession session;
- private final OperationContext sessionContext;
-
// Storagemanager here is used to set the Context
private final StorageManager storageManager;
@@ -126,7 +124,6 @@
private final boolean direct;
public ServerSessionPacketHandler(final ServerSession session,
- final OperationContext sessionContext,
final StorageManager storageManager,
final Channel channel)
{
@@ -134,8 +131,6 @@
this.storageManager = storageManager;
- this.sessionContext = sessionContext;
-
this.channel = channel;
this.remotingConnection = channel.getConnection();
@@ -197,7 +192,7 @@
{
byte type = packet.getType();
- storageManager.setContext(sessionContext);
+ storageManager.setContext(session.getSessionContext());
Packet response = null;
boolean flush = false;
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-03-26
00:38:27 UTC (rev 10371)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -129,9 +129,9 @@
Version version = server.getVersion();
int[] compatibleList = version.getCompatibleVersionList();
boolean isCompatibleClient = false;
- for(int i=0; i<compatibleList.length; i++)
+ for (int i = 0; i < compatibleList.length; i++)
{
- if(compatibleList[i] == request.getVersion())
+ if (compatibleList[i] == request.getVersion())
{
isCompatibleClient = true;
break;
@@ -165,22 +165,23 @@
Channel channel = connection.getChannel(request.getSessionChannelID(),
request.getWindowSize());
- ServerSession session = server.createSession(request.getName(),
+ ServerSession session = server.createSession(request.getName(),
request.getUsername(),
request.getPassword(),
- request.getMinLargeMessageSize(),
+ request.getMinLargeMessageSize(),
connection,
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
request.isPreAcknowledge(),
request.isXA(),
request.getDefaultAddress(),
- new
CoreSessionCallback(request.getName(), protocolManager, channel));
+ new
CoreSessionCallback(request.getName(),
+
protocolManager,
+ channel));
+
session.setSessionContext(server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
+
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
-
server.getStorageManager()
-
.newContext(server.getExecutorFactory()
-
.getExecutor()),
server.getStorageManager(),
channel);
channel.setHandler(handler);
@@ -201,11 +202,11 @@
}
}
catch (Exception e)
- {
+ {
log.error("Failed to create session ", e);
-
+
HornetQPacketHandler.log.error("Failed to create session", e);
-
+
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
@@ -225,22 +226,22 @@
private void handleReattachSession(final ReattachSessionMessage request)
{
Packet response = null;
-
+
try
{
-
+
if (!server.isStarted())
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
+
ServerSessionPacketHandler sessionHandler =
protocolManager.getSessionHandler(request.getName());
-
+
if (!server.checkActivate())
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
+
if (sessionHandler == null)
{
response = new ReattachSessionResponseMessage(-1, false);
@@ -252,9 +253,9 @@
// Even though session exists, we can't reattach since confi window
size == -1,
// i.e. we don't have a resend cache for commands, so we just close
the old session
// and let the client recreate
-
+
sessionHandler.close();
-
+
response = new ReattachSessionResponseMessage(-1, false);
}
else
@@ -262,7 +263,7 @@
// Reconnect the channel to the new connection
int serverLastConfirmedCommandID =
sessionHandler.transferConnection(connection,
request.getLastConfirmedCommandID());
-
+
response = new
ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
}
}
@@ -270,7 +271,7 @@
catch (Exception e)
{
HornetQPacketHandler.log.error("Failed to reattach session", e);
-
+
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java 2011-03-26 00:38:27 UTC
(rev 10371)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java 2011-03-26 01:03:01 UTC
(rev 10372)
@@ -47,6 +47,10 @@
int getDeliveryCount();
void setDeliveryCount(int deliveryCount);
+
+ void setPersistedCount(int deliveryCount);
+
+ int getPersistedCount();
void incrementDeliveryCount();
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2011-03-26 00:38:27 UTC (rev
10371)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2011-03-26 01:03:01 UTC (rev
10372)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.utils.json.JSONArray;
/**
@@ -114,6 +115,8 @@
void requestProducerCredits(SimpleString address, int credits) throws Exception;
void close(boolean failed) throws Exception;
+
+ void waitContextCompletion() throws Exception;
void setTransferring(boolean transferring);
@@ -136,4 +139,10 @@
String getLastSentMessageID(String address);
long getCreationTime();
+
+
+ OperationContext getSessionContext();
+
+ void setSessionContext(OperationContext context);
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-26 00:38:27
UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-26 01:03:01
UTC (rev 10372)
@@ -63,11 +63,13 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -649,8 +651,8 @@
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- log.info("closing a session" );
session.close(true);
+ session.waitContextCompletion();
}
remotingService.stop();
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-03-26 00:38:27
UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-03-26 01:03:01
UTC (rev 10372)
@@ -230,6 +230,16 @@
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
+ public void setPersistedCount(int count)
+ {
+ ref.setPersistedCount(count);
+ }
+
+ public int getPersistedCount()
+ {
+ return ref.getPersistedCount();
+ }
+
public boolean isPaged()
{
return false;
Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-26
00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -36,6 +36,8 @@
private volatile int deliveryCount;
+ private volatile int persistedCount;
+
private volatile long scheduledDeliveryTime;
private final ServerMessage message;
@@ -91,6 +93,23 @@
}
// MessageReference implementation -------------------------------
+
+ /**
+ * @return the persistedCount
+ */
+ public int getPersistedCount()
+ {
+ return persistedCount;
+ }
+
+ /**
+ * @param persistedCount the persistedCount to set
+ */
+ public void setPersistedCount(int persistedCount)
+ {
+ this.persistedCount = persistedCount;
+ }
+
public MessageReference copy(final Queue queue)
{
return new MessageReferenceImpl(this, queue);
@@ -109,6 +128,7 @@
public void setDeliveryCount(final int deliveryCount)
{
this.deliveryCount = deliveryCount;
+ this.persistedCount = deliveryCount;
}
public void incrementDeliveryCount()
@@ -145,7 +165,7 @@
{
queue.referenceHandled();
}
-
+
public boolean isPaged()
{
return false;
@@ -167,7 +187,6 @@
queue.acknowledge(tx, this);
}
-
// Public --------------------------------------------------------
@Override
@@ -175,7 +194,9 @@
{
return "Reference[" + getMessage().getMessageID() +
"]:" +
- (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE")
+ ":" + getMessage() ;
+ (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE")
+
+ ":" +
+ getMessage();
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-26 00:38:27
UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-26 01:03:01
UTC (rev 10372)
@@ -42,6 +42,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
@@ -146,6 +147,8 @@
private volatile int timeoutSeconds;
private Map<String, String> metaData;
+
+ private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not
needed to use a concurrentHashMap here
private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new
HashMap<SimpleString, Pair<UUID, AtomicLong>>();
@@ -223,7 +226,23 @@
}
// ServerSession implementation
----------------------------------------------------------------------------
+ /**
+ * @return the sessionContext
+ */
+ public OperationContext getSessionContext()
+ {
+ return sessionContext;
+ }
+ /**
+ * @param sessionContext the sessionContext to set
+ */
+ public void setSessionContext(OperationContext sessionContext)
+ {
+ this.sessionContext = sessionContext;
+ }
+
+
public String getUsername()
{
return username;
@@ -931,27 +950,62 @@
{
setStarted(false);
}
-
- public void close(final boolean failed)
+
+ public void waitContextCompletion()
{
- storageManager.afterCompleteOperations(new IOAsyncTask()
+ OperationContext formerCtx = storageManager.getContext();
+
+ try
{
- public void onError(int errorCode, String errorMessage)
+ try
{
+ if (!storageManager.waitOnOperations(10000))
+ {
+ log.warn("Couldn't finish context execution in 10 seconds",
new Exception ("warning"));
+ }
}
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ finally
+ {
+ storageManager.setContext(formerCtx);
+ }
+ }
- public void done()
+ public void close(final boolean failed)
+ {
+ OperationContext formerCtx = storageManager.getContext();
+
+ try
+ {
+ storageManager.setContext(sessionContext);
+
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- try
+ public void onError(int errorCode, String errorMessage)
{
- doClose(failed);
}
- catch (Exception e)
+
+ public void done()
{
- log.error("Failed to close session", e);
+ try
+ {
+ doClose(failed);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
}
- }
- });
+ });
+ }
+ finally
+ {
+ storageManager.setContext(formerCtx);
+ }
}
public void closeConsumer(final long consumerID) throws Exception
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2011-03-26
00:38:27 UTC (rev 10371)
+++
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -13,13 +13,27 @@
package org.hornetq.tests.integration.client;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -223,6 +237,111 @@
session.close();
}
+
+ public void testInfiniteDedeliveryMessageOnPersistent() throws Exception
+ {
+ internaltestInfiniteDedeliveryMessageOnPersistent(false);
+ }
+
+ private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean strict)
throws Exception
+ {
+ setUp(strict);
+ ClientSession session = factory.createSession(false, false, false);
+
+ RedeliveryConsumerTest.log.info("created");
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+ prod.send(createTextMessage(session, "Hello"));
+ session.commit();
+ session.close();
+
+
+ int expectedCount = 1;
+ for (int i = 0 ; i < 700; i++)
+ {
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(expectedCount, msg.getDeliveryCount());
+
+ if (i % 100 == 0)
+ {
+ expectedCount++;
+ msg.acknowledge();
+ session.rollback();
+ }
+ session.close();
+ }
+
+ factory.close();
+ server.stop();
+
+ setUp(false);
+
+ for (int i = 0 ; i < 700; i++)
+ {
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(expectedCount, msg.getDeliveryCount());
+ session.close();
+ }
+
+ server.stop();
+
+
+ JournalImpl journal = new
JournalImpl(server.getConfiguration().getJournalFileSize(),
+ 2,
+ 0,
+ 0,
+ new
NIOSequentialFileFactory(server.getConfiguration().getJournalDirectory()),
+ "hornetq-data",
+ "hq",
+ 1);
+
+
+ final AtomicInteger updates = new AtomicInteger();
+
+ journal.start();
+ journal.load(new LoaderCallback()
+ {
+
+ public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ if (info.userRecordType == JournalStorageManager.UPDATE_DELIVERY_COUNT)
+ {
+ updates.incrementAndGet();
+ }
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ });
+
+ journal.stop();
+
+
+ assertEquals(7, updates.get());
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -244,7 +363,14 @@
factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
- session.createQueue(ADDRESS, ADDRESS, true);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, true);
+ }
+ catch (HornetQException expected)
+ {
+ // in case of restart
+ }
session.close();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-26
00:38:27 UTC (rev 10371)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -682,8 +682,7 @@
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
server0.start();
-
-
+
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -710,30 +709,30 @@
producer0.send(message);
}
+ server1.start();
- server1.start();
-
// Inserting the duplicateIDs so the bridge will fail in a few
{
long ids[] = new long[100];
-
+
Queue queue = server0.locateQueue(new SimpleString(queueName0));
LinkedListIterator<MessageReference> iterator = queue.iterator();
-
- for (int i = 0 ; i < 100; i++)
+
+ for (int i = 0; i < 100; i++)
{
iterator.hasNext();
ids[i] = iterator.next().getMessage().getMessageID();
}
-
+
iterator.close();
- DuplicateIDCache duplicateTargetCache =
server1.getPostOffice().getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
-
+ DuplicateIDCache duplicateTargetCache = server1.getPostOffice()
+
.getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
+
TransactionImpl tx = new TransactionImpl(server1.getStorageManager());
for (long id : ids)
{
- byte [] duplicateArray =
BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
+ byte[] duplicateArray =
BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
duplicateTargetCache.addToCache(duplicateArray, tx);
}
tx.commit();
@@ -946,9 +945,23 @@
locator.close();
}
- server0.stop();
+ try
+ {
+ server0.stop();
+ }
+ catch(Exception ignored)
+ {
+
+ }
- server1.stop();
+ try
+ {
+ server1.stop();
+ }
+ catch(Exception ignored)
+ {
+
+ }
}
}
@@ -1255,7 +1268,7 @@
protected void tearDown() throws Exception
{
clearData();
- super.setUp();
+ super.tearDown();
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-03-26
00:38:27 UTC (rev 10371)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -25,8 +25,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.server.HornetQServer;
@@ -256,11 +262,6 @@
protected abstract boolean checkSize(ClientMessage message);
- protected int getNumThreads()
- {
- return 10;
- }
-
protected ClientSession createAutoCommitSession(final ClientSessionFactory sf) throws
Exception
{
return sf.createSession(false, true, true);
@@ -1197,6 +1198,11 @@
{
return 2;
}
+
+ protected int getNumThreads()
+ {
+ return 10;
+ }
@Override
protected void setUp() throws Exception
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-03-26
00:38:27 UTC (rev 10371)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-03-26
01:03:01 UTC (rev 10372)
@@ -16,10 +16,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServers;
/**
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-03-26 00:38:27 UTC
(rev 10371)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-03-26 01:03:01 UTC
(rev 10372)
@@ -94,7 +94,7 @@
checkFreePort(5447);
if (InVMRegistry.instance.size() > 0)
{
- System.exit(0);
+ fail("InVMREgistry size > 0");
}
}
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-03-26 00:38:27 UTC (rev
10371)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-03-26 01:03:01 UTC (rev
10372)
@@ -875,26 +875,8 @@
@Override
protected void tearDown() throws Exception
{
- OperationContextImpl.clearContext();
+ cleanupPools();
- deleteDirectory(new File(getTestDir()));
-
- int invmSize = InVMRegistry.instance.size();
- if (invmSize > 0)
- {
- InVMRegistry.instance.clear();
- fail("invm registry still had acceptors registered");
- }
-
- if (AsynchronousFileImpl.getTotalMaxIO() != 0)
- {
- AsynchronousFileImpl.resetMaxAIO();
- Assert.fail("test did not close all its files " +
AsynchronousFileImpl.getTotalMaxIO());
- }
-
- // We shutdown the global pools to give a better isolation between tests
- ServerLocatorImpl.clearThreadPools();
-
Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
for (Thread thread : threadMap.keySet())
{
@@ -967,6 +949,32 @@
super.tearDown();
}
+ /**
+ *
+ */
+ protected void cleanupPools()
+ {
+ OperationContextImpl.clearContext();
+
+ deleteDirectory(new File(getTestDir()));
+
+ int invmSize = InVMRegistry.instance.size();
+ if (invmSize > 0)
+ {
+ InVMRegistry.instance.clear();
+ fail("invm registry still had acceptors registered");
+ }
+
+ if (AsynchronousFileImpl.getTotalMaxIO() != 0)
+ {
+ AsynchronousFileImpl.resetMaxAIO();
+ Assert.fail("test did not close all its files " +
AsynchronousFileImpl.getTotalMaxIO());
+ }
+
+ // We shutdown the global pools to give a better isolation between tests
+ ServerLocatorImpl.clearThreadPools();
+ }
+
protected byte[] autoEncode(final Object... args)
{