Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 21:27:25 -0400 (Wed, 20 Jul 2011)
New Revision: 11013
Added:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-725, HORNETQ-744, HORNETQ-743 - a few paging fixes
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21
01:16:45 UTC (rev 11012)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21
01:27:25 UTC (rev 11013)
@@ -89,6 +89,10 @@
public synchronized PageSubscription createSubscription(long cursorID, Filter filter,
boolean persistent)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " creating subscription " +
cursorID + " with filter " + filter, new Exception ("trace"));
+ }
PageSubscription activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
{
@@ -330,6 +334,11 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Asserting cleanup for address " +
this.pagingStore.getAddress());
+ }
ArrayList<PageSubscription> cursorList = new
ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
@@ -344,9 +353,21 @@
{
if (!cursor.isComplete(minPage))
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + " was considered
incomplete at page " + minPage);
+ }
+
complete = false;
break;
}
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + "was considered
**complete** at page " + minPage);
+ }
+ }
}
if (complete)
@@ -516,12 +537,21 @@
for (PageSubscription cursor : cursorList)
{
long firstPage = cursor.getFirstPage();
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has a cursor " + cursor
+ " with first page=" + firstPage);
+ }
if (firstPage < minPage)
{
minPage = firstPage;
}
}
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has minPage=" + minPage);
+ }
+
return minPage;
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21
01:16:45 UTC (rev 11012)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21
01:27:25 UTC (rev 11013)
@@ -308,7 +308,7 @@
@Override
public String toString()
{
- return "PageSubscriptionImpl [cursorId=" + cursorId + ",
queue=" + queue + "]";
+ return "PageSubscriptionImpl [cursorId=" + cursorId + ",
queue=" + queue + ", filter = " + filter + "]";
}
@@ -648,22 +648,42 @@
Collections.sort(recoveredACK);
boolean first = true;
+
+ long txDeleteCursorOnReload = -1;
for (PagePosition pos : recoveredACK)
{
lastAckedPosition = pos;
- PageCursorInfo positions = getPageInfo(pos);
- if (first)
+ PageCursorInfo pageInfo = getPageInfo(pos);
+
+ if (pageInfo == null)
{
- first = false;
- if (pos.getMessageNr() > 0)
+ log.warn("Couldn't find page cache for page " + pos +
", removing it from the journal");
+ if (txDeleteCursorOnReload == -1)
{
- positions.confirmed.addAndGet(pos.getMessageNr());
+ txDeleteCursorOnReload = store.generateUniqueID();
}
+ store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload,
pos.getRecordID());
+ }
+ else
+ {
+ if (first)
+ {
+ first = false;
+ if (pos.getMessageNr() > 0)
+ {
+ pageInfo.confirmed.addAndGet(pos.getMessageNr());
+ }
+ }
+
+ pageInfo.addACK(pos);
}
-
- positions.addACK(pos);
}
+
+ if (txDeleteCursorOnReload >= 0)
+ {
+ store.commit(txDeleteCursorOnReload);
+ }
recoveredACK.clear();
recoveredACK = null;
@@ -723,6 +743,10 @@
if (create && pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
+ if (cache == null)
+ {
+ return null;
+ }
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(),
cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21
01:16:45 UTC (rev 11012)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21
01:27:25 UTC (rev 11013)
@@ -74,6 +74,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.postoffice.impl.DivertBinding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
@@ -1625,28 +1626,31 @@
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
- Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
-
- PageSubscription subscription =
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(),
filter, true);
+ if (queueBindingInfo.getFilterString() == null ||
!queueBindingInfo.getFilterString().toString().equals(GENERIC_IGNORED_FILTER))
+ {
+ Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
+
+ PageSubscription subscription =
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(),
filter, true);
+
+ Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
+ queueBindingInfo.getAddress(),
+ queueBindingInfo.getQueueName(),
+ filter,
+ subscription,
+ true,
+ false);
+
+ Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue,
nodeManager.getNodeId());
+
+ queues.put(queueBindingInfo.getId(), queue);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerAddress(queueBindingInfo.getAddress());
+ managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
+ }
- Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
- queueBindingInfo.getAddress(),
- queueBindingInfo.getQueueName(),
- filter,
- subscription,
- true,
- false);
-
- Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue,
nodeManager.getNodeId());
-
- queues.put(queueBindingInfo.getId(), queue);
-
- postOffice.addBinding(binding);
-
- managementService.registerAddress(queueBindingInfo.getAddress());
- managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
-
}
for (GroupingInfo groupingInfo : groupingInfos)
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21
01:16:45 UTC (rev 11012)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21
01:27:25 UTC (rev 11013)
@@ -265,7 +265,7 @@
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCount would still be updated after c
- if (strictUpdateDeliveryCount)
+ if (strictUpdateDeliveryCount && !ref.isPaged())
{
if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&& !ref.getQueue().isInternalQueue() && !ref.isPaged())
{
Added:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
(rev 0)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-07-21
01:27:25 UTC (rev 11013)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+public class JMSPagingFileDeleteTest extends JMSTestBase
+{
+ static Logger log = Logger.getLogger(JMSPagingFileDeleteTest.class);
+
+ Topic topic1;
+
+ Connection connection;
+
+ Session session;
+
+ MessageConsumer subscriber1;
+
+ MessageConsumer subscriber2;
+
+ PagingStore pagingStore;
+
+ private static final int MESSAGE_SIZE = 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ private static final int PAGE_MAX = 20 * 1024;
+
+ private static final int RECEIVE_TIMEOUT = 500;
+
+ private static final int MESSAGE_NUM = 50;
+
+ @Override
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+ super.setUp();
+
+ topic1 = createTopic("topic1");
+
+ // Paging Setting
+ AddressSettings setting = new AddressSettings();
+ setting.setPageSizeBytes(JMSPagingFileDeleteTest.PAGE_SIZE);
+ setting.setMaxSizeBytes(JMSPagingFileDeleteTest.PAGE_MAX);
+ server.getAddressSettingsRepository().addMatch("#", setting);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ topic1 = null;
+ super.tearDown();
+ }
+
+ public void testTopics() throws Exception
+ {
+ connection = null;
+
+ try
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(topic1);
+ subscriber1 = session.createDurableSubscriber(topic1,
"subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1,
"subscriber-2");
+
+ // -----------------(Step1) Publish Messages to make Paging Files.
--------------------
+ System.out.println("---------- Send messages. ----------");
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ producer.send(bytesMessage);
+ }
+ System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM +
" messages.");
+
+ pagingStore = server.getPagingManager().getPageStore(new
SimpleString("jms.topic.topic1"));
+ printPageStoreInfo(pagingStore);
+
+ assertTrue(pagingStore.isPaging());
+
+ connection.start();
+
+ // -----------------(Step2) Restart the server.
--------------------------------------
+ stopAndStartServer(); // If try this test without restarting server, please
comment out this line;
+
+ // -----------------(Step3) Subscribe to all the messages from the
topic.--------------
+ System.out.println("---------- Receive all messages. ----------");
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ Message message1 =
subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message1);
+ Message message2 =
subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message2);
+ }
+
+ pagingStore = server.getPagingManager().getPageStore(new
SimpleString("jms.topic.topic1"));
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() &&
pagingStore.isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(pagingStore.isPaging());
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(0, pagingStore.getAddressSize());
+ // assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the
page is 1, but It was not.
+ assertFalse(pagingStore.isPaging()); // I expected IsPaging is false, but It was
true.
+ // If the server is not restart, this test pass.
+
+ // -----------------(Step4) Publish a message. the message is stored in the
paging file.
+ producer = session.createProducer(topic1);
+ bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ producer.send(bytesMessage);
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page
is 1, but It was not.
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void stopAndStartServer() throws Exception
+ {
+ System.out.println("---------- Restart server. ----------");
+ connection.close();
+
+ jmsServer.stop();
+
+ jmsServer.start();
+ jmsServer.activated();
+ registerConnectionFactory();
+
+ printPageStoreInfo(pagingStore);
+ reconnect();
+ }
+
+ private void reconnect() throws Exception
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ connection.start();
+ }
+
+ private void printPageStoreInfo(PagingStore pagingStore) throws Exception
+ {
+ System.out.println("---------- Paging Store Info ----------");
+ System.out.println(" CurrentPage = " + pagingStore.getCurrentPage());
+ System.out.println(" FirstPage = " + pagingStore.getFirstPage());
+ System.out.println(" Number of Pages = " +
pagingStore.getNumberOfPages());
+ System.out.println(" Address Size = " + pagingStore.getAddressSize());
+ System.out.println(" Is Paging = " + pagingStore.isPaging());
+ }
+}
\ No newline at end of file
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21
01:16:45 UTC (rev 11012)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21
01:27:25 UTC (rev 11013)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -497,6 +498,248 @@
}
+ /**
+ * This test will remove all the page directories during a restart, simulating a crash
scenario. The server should still start after this
+ */
+ public void testDeletePhisicalPages() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+ config.setPersistDeliveryCountBeforeDelivery(true);
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ ClientSession sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() &&
queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+
+ server.stop();
+
+ // Deleting the paging data. Simulating a failure
+ // a dumb user, or anything that will remove the data
+ deleteDirectory(new File(getPageDir()));
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+
+ locator = createInVMNonHALocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false, false);
+
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ // assertEquals(numberOfMessages, queue.getMessageCount());
+
+ xids = new LinkedList<Xid>();
+
+ msgReceived = 0;
+ sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testMissingTXEverythingAcked() throws Exception
{
clearData();
@@ -1281,7 +1524,7 @@
sf.close();
locator.close();
}
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -1353,7 +1596,6 @@
t.start();
t.join();
-
assertEquals(0, errors.get());
for (int i = 0; i < 20 &&
server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
@@ -1361,9 +1603,8 @@
// The delete may be asynchronous, giving some time case it eventually happen
asynchronously
Thread.sleep(500);
}
-
- assertFalse
(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
+
assertFalse(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
for (int i = 0; i < 20 &&
server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
{
@@ -1372,7 +1613,6 @@
}
assertEquals(0,
server.getPostOffice().getPagingManager().getTransactions().size());
-
}
finally